美文网首页javaRabbitMQ
SpringBoot中RabbitMQ如何实现延迟消息

SpringBoot中RabbitMQ如何实现延迟消息

作者: AbstractCulture | 来源:发表于2020-08-29 15:45 被阅读0次

过期时间TTL

TTL,Time to Live的简称,即过期时间。在RabbitMQ中,你可以对消息或者队列设置过期时间.
你可以通过两种方式来设置消息的TTL:

  1. 通过队列属性设置,即在队列初始化的时候设置一个消息的过期时间,那么进入这个队列的所有消息都会按照这个时间来失效。
    原生Java代码实现为:
        HashMap<String, Object> args = new HashMap<>();
        // 这里为队列设置消息过期时间
        args.put("x-message-ttl", 6000);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
  1. 单独为每个消息设置过期时间。如果同时存在队列设置的消息过期时间和消息本身的过期时间,以更小的为准。
    原生Java代码实现为:
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        // 持久化消息
        builder.deliveryMode(2);
        // 设置过期时间为6S
        builder.expiration("6000");
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish(exchangeName,routingKey,mandatory,properties,message);

死信

  • 被拒绝的消息可以看作死信.
  • 过期的消息可以看作死信.
  • 队列爆满无法投递的消息可以看作死信.

死信队列

DLX,全称为Deal-Letter-Exchange,死信交换机。当消息在一个队列中变成了死信后,它可以被重定向到另一个交换机,而这个用来接收死信的交换机就是DLX,与这个死信交换机绑定的队列我们通常叫死信队列。
Java原生代码实现:

        ConnectionFactory connectionFactory = new ConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 死信交换机
        channel.exchangeDeclare("dlx_exchange", "direct");
        HashMap<String, Object> args = new HashMap<>();
        // 帮当前队列设置参数,如果出现死信,转发到死信交换机
        args.put("x-dead-letter-exchange", "dlx_exchange");
        // 把刚才的参数配置到这个队列
        channel.queueDeclare("myqueue", false, false, false, args);

死信队列的链路过程

image.png

SpringBoot中实现延迟消息

pom.xml 安装依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

配置文件(这里只是简单的配置,生产上根据自己的需求配置)

server:
  port: 8053
spring:
  application:
    name: rabbitmq
  #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.11.131
    port: 5672
    username: root
    password: root

RabbitMQDelayConfig

package com.xjm.mid.compent.rabbitmq.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author jaymin
 */
@Configuration
@Slf4j
public class RabbitMQDelayConfig {

    /**
     * 延迟队列,把消息放在这里会自动过期
     */
    public static final String DELAY_QUEUE = "jay.delay.queue";
    
    /**
     * 延迟队列交换机
     */
    public static final String DELAY_EXCHANGE = "jay.delay.exchange";

    /**
     * 延迟队列路由
     */
    public static final String DELAY_ROUTINGKEY = "jay.delay.routingKey";

    /**
     * 死信队列
     */
    public static final String DEAD_LETTER_QUEUE = "jay.dlx.queue";

    /**
     * 死信交换机
     */
    private static final String DEAD_LETTER_EXCHANGE = "jay.dlx.exchange";

    /**
     * 死信队列路由
     */
    private final String DEAD_LETTER_ROUTINGKEY = "jay.dlx.routingKey";



    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 当交换机无法正确投递消息的时候,RabbitMQ会调用Basic.Return命令将消息返回给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
        return rabbitTemplate;
    }

    /**
     *
     * @return FanoutExchange
     */
    @Bean
    public FanoutExchange delayExchange() {
        return new FanoutExchange(DELAY_EXCHANGE);
    }


    /**
     * 延迟队列配置
     * <p>
     * 1、params.put("x-message-ttl", 5 * 1000);
     * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
     **/
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTINGKEY);
        return new Queue(DELAY_QUEUE, true, false, false, args);
    }

    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange());
    }


    @Bean
    public Queue dlxQueue() {
        return new Queue(DEAD_LETTER_QUEUE, true);
    }

    /**
     *
     **/
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DEAD_LETTER_ROUTINGKEY);
    }


}

生产者

package com.xjm.mid.compent.rabbitmq.web;

import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController
@RequestMapping("/hello")
@Slf4j
public class HelloController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/delay")
    public void sendMsg(){
        Letter letter = new Letter();
        letter.setRecipient("福尔摩斯");
        letter.setContext("您有新的订单需要处理!");
        String expiredTime = "5000";
        rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE,RabbitMQDelayConfig.DELAY_ROUTINGKEY,letter,message->{
            message.getMessageProperties().setExpiration(expiredTime);
            return message;
        });
        log.info("[发送时间] - [{}]", LocalDateTime.now());
    }
}

消息者

package com.xjm.mid.compent.rabbitmq.web;

import com.rabbitmq.client.Channel;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

@Component
@Slf4j
public class CustomerController {

    @RabbitListener(queues = {RabbitMQDelayConfig.DEAD_LETTER_QUEUE})
    public void listenerDelayQueue(Letter letter, Message message, Channel channel) {
        log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), letter.toString());
        try {
            // TODO 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("出了点小BUG,问题很大");
        }
    }
}

TTL无法解决的问题

死信队列确实可以用来支持延迟消息的发送,但是由于队列的原因,所有的消息都是FIFO的,因此,放到TTL队列中的消息最好是保持过期时间一致;如果读者想实现不同的过期时间的消息都放一个队列中,那么我建议安装rabbitmq_delayed_message_exchange来解决这个问题。

相关文章

网友评论

    本文标题:SpringBoot中RabbitMQ如何实现延迟消息

    本文链接:https://www.haomeiwen.com/subject/qwcjsktx.html