美文网首页
RabbitMq 延迟队列

RabbitMq 延迟队列

作者: 子丿龙 | 来源:发表于2021-03-08 23:34 被阅读0次

    一、延迟队列

    延迟队列就是比普通队列多了一个延迟属性。
    单从字面意思,可以理解为这个队列是延迟的,但我们普遍默认的,是我们要实现的功能是延迟,而和队列本身是延迟队列或不是延迟队列无关。看完本篇文章就会明白,即使队列是普通队列,单给消息设置一个延迟时间,依然可以实现延迟功能。

    注:不给队列加延迟属性,单给消息设置一个延迟时间,需要延迟插件才能实现

    二、RabbitMQ中的 TTL 属性

    TTL(Time To Live):表示消息的存活时间(毫秒)
    如果一个队列或者一条消息设置了TTL,那么如果消息没有被及时处理,则会变为死信。

    有两种设置TTL方式付下:
    • 方式1:给队列设置TTL,那么进入该队列的消息,统一有一个存活时间
     @Bean("delayQueueA")
        public Queue delayQueueA() {
            return QueueBuilder.durable(DELAY_QUEUEA_NAME)
                    .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                    .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
                    .ttl(6000)//设置队列TTL 6秒(底层就是设置 " x-message-ttl ")
                    .build();
    
    • 方式2:给单个消息设置TTL
     @GetMapping("/delayMsg")
        public void sendMsg2(String msg, Integer delayTime) {
            log.info(delayTime/1000 + "s");
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEC_ROUTING_KEY, 
                    msg+delayTime, message -> {
                //单个消息设置TTL 
                message.getMessageProperties().setExpiration(delayTime.toString());
                return message;
            });
        }
    

    注意:如果两种方式都设置了,那么选TTL最短时间

    有两种方式可实现延迟功能:

    三、第一种延迟功能实现方式代码示例,给队列设置延迟

    先看看消息流向图:


    image.png
    这种方式,就是利用死信机制,先让消息流向设置了延迟时间的 延迟队列,待消息到期后成为死信,便自动流向 死信队列,最后我们监听 死信队列 的消息,然后消费。(延迟队列、死信队列和正常队列是一样的,正常监听消费即可)
    • 创建交换机、队列、绑定关系代码如下:
    package com.zilong.mqpractice.delayqueue;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @Slf4j
    public class RabbitMQConfig {
        public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
        public static final String DELAY_QUEUEA_NAME = "delay.queuea";
        public static final String DELAY_QUEUEB_NAME = "delay.queueb";
        public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queuea.routingkey";
        public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queueb.routingkey";
        public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
        public static final String DEAD_LETTER_QUEUEA_NAME = "deadletter.queuea";
        public static final String DEAD_LETTER_QUEUEB_NAME = "deadletter.queueb";
        public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "deadletter.delay_6s.routingkey";
        public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "deadletter.delay_30s.routingkey";
    
        @Bean("delayExchange")
        public DirectExchange delayExchange() {
            log.info("create----> delayExchange");
            return new DirectExchange(DELAY_EXCHANGE_NAME);
        }
    
    
        @Bean("delayQueueA")
        public Queue delayQueueA() {
            return QueueBuilder.durable(DELAY_QUEUEA_NAME)
                    .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                    .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)
                    .ttl(6000)
                    .build();
        }
    
        @Bean("delayQueueB")
        public Queue delayQueueB() {
            return QueueBuilder.durable(DELAY_QUEUEB_NAME)
                    .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                    .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY)
                    .ttl(30000)
                    .build();
        }
    
        @Bean
        public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                     @Qualifier("delayExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    
        }
    
        @Bean
        public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                     @Qualifier("delayExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    
        }
    
    
        @Bean("deadLetterExchange")
        public DirectExchange deadLetterExchange() {
            return new DirectExchange(DEAD_LETTER_EXCHANGE);
        }
    
        @Bean("deadLetterQueueA")
        public Queue deadLetterQueueA() {
            return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME)
                    .build();
        }
    
        @Bean("deadLetterQueueB")
        public Queue deadLetterQueueB() {
            return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME)
                    .build();
        }
    
        @Bean
        public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                          @Qualifier("deadLetterExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    
        }
    
        @Bean
        public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                          @Qualifier("deadLetterExchange") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    
        }
    
    }
    
    
    • 创建消息生产者代码如下:
    package com.zilong.mqpractice.delayqueue;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Slf4j
    @RestController
    @RequestMapping("/delayMsg")
    public class Cotroller {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/6s/send")
        public void sendMsg(String msg) {
            log.info("6s");
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
        }
    
        @GetMapping("/30s/send")
        public void sendMsg2(String msg) {
            log.info("30s");
            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
        }
    }
    
    
    • 创建消费者代码如下:
    package com.zilong.mqpractice.delayqueue;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @Slf4j
    public class Consumer {
    
    
        @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME)
        public void receiveA(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody());
            log.info("收到6s,死信A: " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    
        @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME)
        public void receiveB(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody());
            log.info("收到30s,死信B: " + msg);
    
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    
    2021-03-08 23:02:11.711  INFO 83451 --- [nio-8006-exec-3] c.z.mqpractice.delayqueue.Cotroller      : 6s
    2021-03-08 23:02:13.378  INFO 83451 --- [nio-8006-exec-4] c.z.mqpractice.delayqueue.Cotroller      : 30s
    2021-03-08 23:02:17.845  INFO 83451 --- [ntContainer#3-1] c.zilong.mqpractice.delayqueue.Consumer  : 收到6s,死信A: 短时间
    2021-03-08 23:02:43.548  INFO 83451 --- [ntContainer#2-1] c.zilong.mqpractice.delayqueue.Consumer  : 收到30s,死信B: 长时间
    

    四、第二种延迟功能实现方式代码示例,给消息设置延迟

    这种方式是最简便的方式,不需要第一种那么复杂的消息流向
    先看消息就想图:


    image.png
    利用rabbitmq延迟插件,创建一个延迟交换机,然后消息生产时设置上延迟属性(setDelay()方法,而不是setExpiration()方法),队列也用普通队列即可,不需要设置延迟属性,消费者只需要监听该队列即可。
    • 创建交换机、队列、绑定关系代码如下:
    package com.zilong.mqpractice.delayqueue2;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @Slf4j
    public class RabbitMQConfig3 {
        public static final String DELAY_EXCHANGE_NAME = "delay.exchange2";
        public static final String DELAY_QUEUEC_NAME = "delay.queuec";
        public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queuec.routingkey";
    
        @Bean("delayExchange2")
        public DirectExchange delayExchange2() {
            log.info("create----> delayExchange2");
            DirectExchange directExchange = new DirectExchange(DELAY_EXCHANGE_NAME);
            //给交换机设置延迟属性
            directExchange.setDelayed(true);
            return directExchange;
        }
    
        @Bean("delayQueueC")
        public Queue delayQueueC() {
            //普通的队列
            return QueueBuilder.durable(DELAY_QUEUEC_NAME)
                    .build();
        }
    
        @Bean
        public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                     @Qualifier("delayExchange2") DirectExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    
        }
    
    
    }
    
    
    • 创建消息生产者代码如下:
    package com.zilong.mqpractice.delayqueue2;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Slf4j
    @RestController
    @RequestMapping("/delayMsg2")
    public class Cotroller3 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("/send")
        public void sendMsg(String msg, Integer delayTime) {
            log.info(delayTime / 1000 + "s");
            rabbitTemplate.convertAndSend(RabbitMQConfig3.DELAY_EXCHANGE_NAME, RabbitMQConfig3.DELAY_QUEUEC_ROUTING_KEY,
                    msg + delayTime, message -> {
                        //给消息设置延迟属性,不同于setExpiration()
                        message.getMessageProperties().setDelay(delayTime);
                        return message;
                    });
        }
    }
    
    
    
    • 创建消费者代码如下:
    package com.zilong.mqpractice.delayqueue2;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    @Component
    @Slf4j
    public class Consumer3 {
    
    
        @RabbitListener(queues = RabbitMQConfig3.DELAY_QUEUEC_NAME)
        public void receivec(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody());
            log.info("收到死信C: " + msg);
    
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    
    2021-03-08 23:30:12.225  INFO 96631 --- [nio-8006-exec-1] c.z.mqpractice.delayqueue2.Cotroller3    : 6s
    2021-03-08 23:30:16.317  INFO 96631 --- [nio-8006-exec-2] c.z.mqpractice.delayqueue2.Cotroller3    : 30s
    2021-03-08 23:30:18.368  INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3     : 收到死信C: 插件延迟消息6000
    2021-03-08 23:30:46.417  INFO 96631 --- [ntContainer#4-1] c.z.mqpractice.delayqueue2.Consumer3     : 收到死信C: 插件延迟消息30000
    

    五、小结

    实现延迟功能,就两种方式。
    1.将队列设置为延迟队列。缺点是所有消息都固定的延迟时间,想要改变,就要重建一个延迟队列
    2.利用延迟插件给消息设置延迟时间。缺点是插件会影响效率(没试过,官网说的)

    疑问:用延迟队列,然后用setExpiration()给消息设置延迟时间可以么?

    不可以,队列特点是先进先出,这样做,虽然有延迟时间,但是rabbitmq会按顺序检测信息是否死亡。例如:
    先发送messageA:延迟60s,然后发送messageB:延迟6s,那么rabbitmq会先检测messageA是否变为死信,如果messageA变为死信,rabbitmq会将其丢到死信队列,然后rabbitmq才去检测messageB是否是死信.导致的结果就是,messageB会等待messageA60s被消费后,自己才能被检测到,然后被消费(messageB自己的延迟时间已经倒计时完毕,不会等messageA 60s过后才开始倒计时,也就是messageB不会等到66s,他已经是死信,只是rabbitmq没检测它)。所以,给消息设置延迟时间,还是需要按照官网方式,用延迟插件实现。

    相关文章

      网友评论

          本文标题:RabbitMq 延迟队列

          本文链接:https://www.haomeiwen.com/subject/pymmqltx.html