美文网首页javaRabbitMQ
SpringBoot中RabbitMQ如何实现延迟消息

SpringBoot中RabbitMQ如何实现延迟消息

作者: AbstractCulture | 来源:发表于2020-08-29 15:45 被阅读0次

    过期时间TTL

    TTL,Time to Live的简称,即过期时间。在RabbitMQ中,你可以对消息或者队列设置过期时间.
    你可以通过两种方式来设置消息的TTL:

    1. 通过队列属性设置,即在队列初始化的时候设置一个消息的过期时间,那么进入这个队列的所有消息都会按照这个时间来失效。
      原生Java代码实现为:
            HashMap<String, Object> args = new HashMap<>();
            // 这里为队列设置消息过期时间
            args.put("x-message-ttl", 6000);
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
    
    1. 单独为每个消息设置过期时间。如果同时存在队列设置的消息过期时间和消息本身的过期时间,以更小的为准。
      原生Java代码实现为:
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            // 持久化消息
            builder.deliveryMode(2);
            // 设置过期时间为6S
            builder.expiration("6000");
            AMQP.BasicProperties properties = builder.build();
            channel.basicPublish(exchangeName,routingKey,mandatory,properties,message);
    

    死信

    • 被拒绝的消息可以看作死信.
    • 过期的消息可以看作死信.
    • 队列爆满无法投递的消息可以看作死信.

    死信队列

    DLX,全称为Deal-Letter-Exchange,死信交换机。当消息在一个队列中变成了死信后,它可以被重定向到另一个交换机,而这个用来接收死信的交换机就是DLX,与这个死信交换机绑定的队列我们通常叫死信队列。
    Java原生代码实现:

            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            // 死信交换机
            channel.exchangeDeclare("dlx_exchange", "direct");
            HashMap<String, Object> args = new HashMap<>();
            // 帮当前队列设置参数,如果出现死信,转发到死信交换机
            args.put("x-dead-letter-exchange", "dlx_exchange");
            // 把刚才的参数配置到这个队列
            channel.queueDeclare("myqueue", false, false, false, args);
    

    死信队列的链路过程

    image.png

    SpringBoot中实现延迟消息

    pom.xml 安装依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    

    配置文件(这里只是简单的配置,生产上根据自己的需求配置)

    server:
      port: 8053
    spring:
      application:
        name: rabbitmq
      #配置rabbitMq 服务器
      rabbitmq:
        host: 192.168.11.131
        port: 5672
        username: root
        password: root
    

    RabbitMQDelayConfig

    package com.xjm.mid.compent.rabbitmq.config;
    
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author jaymin
     */
    @Configuration
    @Slf4j
    public class RabbitMQDelayConfig {
    
        /**
         * 延迟队列,把消息放在这里会自动过期
         */
        public static final String DELAY_QUEUE = "jay.delay.queue";
        
        /**
         * 延迟队列交换机
         */
        public static final String DELAY_EXCHANGE = "jay.delay.exchange";
    
        /**
         * 延迟队列路由
         */
        public static final String DELAY_ROUTINGKEY = "jay.delay.routingKey";
    
        /**
         * 死信队列
         */
        public static final String DEAD_LETTER_QUEUE = "jay.dlx.queue";
    
        /**
         * 死信交换机
         */
        private static final String DEAD_LETTER_EXCHANGE = "jay.dlx.exchange";
    
        /**
         * 死信队列路由
         */
        private final String DEAD_LETTER_ROUTINGKEY = "jay.dlx.routingKey";
    
    
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // 当交换机无法正确投递消息的时候,RabbitMQ会调用Basic.Return命令将消息返回给生产者
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
            return rabbitTemplate;
        }
    
        /**
         *
         * @return FanoutExchange
         */
        @Bean
        public FanoutExchange delayExchange() {
            return new FanoutExchange(DELAY_EXCHANGE);
        }
    
    
        /**
         * 延迟队列配置
         * <p>
         * 1、params.put("x-message-ttl", 5 * 1000);
         * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
         * 2、rabbitTemplate.convertAndSend(book, message -> {
         * message.getMessageProperties().setExpiration(2 * 1000 + "");
         * return message;
         * });
         * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
         **/
        @Bean
        public Queue delayQueue() {
            Map<String, Object> args = new HashMap<>();
            // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
            args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTINGKEY);
            return new Queue(DELAY_QUEUE, true, false, false, args);
        }
    
        @Bean
        public Binding delayBinding() {
            return BindingBuilder.bind(delayQueue()).to(delayExchange());
        }
    
    
        @Bean
        public Queue dlxQueue() {
            return new Queue(DEAD_LETTER_QUEUE, true);
        }
    
        /**
         *
         **/
        @Bean
        public DirectExchange dlxExchange() {
            return new DirectExchange(DEAD_LETTER_EXCHANGE);
        }
    
        @Bean
        public Binding dlxBinding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DEAD_LETTER_ROUTINGKEY);
        }
    
    
    }
    

    生产者

    package com.xjm.mid.compent.rabbitmq.web;
    
    import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
    import com.xjm.mid.compent.rabbitmq.model.Letter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    
    @RestController
    @RequestMapping("/hello")
    @Slf4j
    public class HelloController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/delay")
        public void sendMsg(){
            Letter letter = new Letter();
            letter.setRecipient("福尔摩斯");
            letter.setContext("您有新的订单需要处理!");
            String expiredTime = "5000";
            rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE,RabbitMQDelayConfig.DELAY_ROUTINGKEY,letter,message->{
                message.getMessageProperties().setExpiration(expiredTime);
                return message;
            });
            log.info("[发送时间] - [{}]", LocalDateTime.now());
        }
    }
    
    

    消息者

    package com.xjm.mid.compent.rabbitmq.web;
    
    import com.rabbitmq.client.Channel;
    import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayConfig;
    import com.xjm.mid.compent.rabbitmq.model.Letter;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.time.LocalDateTime;
    
    @Component
    @Slf4j
    public class CustomerController {
    
        @RabbitListener(queues = {RabbitMQDelayConfig.DEAD_LETTER_QUEUE})
        public void listenerDelayQueue(Letter letter, Message message, Channel channel) {
            log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), letter.toString());
            try {
                // TODO 通知 MQ 消息已被成功消费,可以ACK了
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                log.error("出了点小BUG,问题很大");
            }
        }
    }
    

    TTL无法解决的问题

    死信队列确实可以用来支持延迟消息的发送,但是由于队列的原因,所有的消息都是FIFO的,因此,放到TTL队列中的消息最好是保持过期时间一致;如果读者想实现不同的过期时间的消息都放一个队列中,那么我建议安装rabbitmq_delayed_message_exchange来解决这个问题。

    相关文章

      网友评论

        本文标题:SpringBoot中RabbitMQ如何实现延迟消息

        本文链接:https://www.haomeiwen.com/subject/qwcjsktx.html