美文网首页
【RabbitMQ学习-02】延迟消费问题

【RabbitMQ学习-02】延迟消费问题

作者: Spring_java | 来源:发表于2019-10-10 22:55 被阅读0次

    最近在学习RabbitMQ ,想尝试下延迟消息的问题。

    总体思路,可能和网上的有点不一样,我是需要将消息先发送到ttl延迟队列内,
    当消息到达过期时间后,会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的正常队列内,完成消息消费。
    

    1:TTL(Time To Live)

    消息的过期时间有两种设置方式:

    • 1: 通过队列属性设置消息过期时间
      x-message-ttl ,该属性是在创建队列的时候 ,在arguments的map中配置;该参数的作用是设置当前队列中所有的消息的存活时间
    @Bean("ttlQueue") 
    public Queue queue() {
        Map<String, Object> map = new HashMap<String, Object>(); 
        map.put("x-message-ttl", 10000); // 队列中的消息未被消费 10 秒后过期 
        return new Queue("TTL_QUEUE", true, false, false, map);
    }
    
    • 2: x-expires 该属性也是在arguments中配置;其作用是设置当前队列在N毫秒中(不能为0,且为正整数),就删除该队列;“未使用”意味着队列没有消费者,队列尚未重新声明,并且至少在有效期内未调用basicGet (basicGet 是手动拉取指定队列中的一条消息)

    • 3: 基于单条消息设置过期时间

    MessageProperties messageProperties = new MessageProperties(); 
    messageProperties.setExpiration("4000"); // 消息的过期属性,单位 ms
    Message message = new Message("这条消息 4 秒后过期".getBytes(), messageProperties); 
    rabbitTemplate.send("TTL_EXCHANGE", "test.ttl", message);
    

    如果同时指定了Message TTL 和Queue TTL,则优先较小的那一个。

    2:什么情况下消息会变成死信?

    • 1.消息被消费者拒绝并且未设置重回队列
    • 2.消息过期
    • 3.队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。

    3:配置队列

    • 1:配置第一个正常队列。
    // 配置第一个交换器
        @Bean
        DirectExchange msgDirectExchange(){
            Exchange build = ExchangeBuilder.directExchange(QueueEnum.MSG_QUEUE.getExchangeName()).durable(true).build();
            return (DirectExchange) build;
        }
    
        // 配置第一个消息队列
        @Bean
        public Queue  msgQeue(){
            return new Queue(QueueEnum.MSG_QUEUE.getQueueName());
        }
    
        // 配置交换器和 路由绑定 以及消息队列绑定
        @Bean
        public Binding msgBindng(){
            return BindingBuilder
                    .bind(msgQeue())
                    .to(msgDirectExchange())
                    .with(QueueEnum.MSG_QUEUE.getRouteKey());
        }
    
    
    • 2:配置TTL队列和交换器
        // 配置TTL交换器
        @Bean
        DirectExchange ttlDirectExchange(){
            Exchange build = ExchangeBuilder
                    .directExchange(QueueEnum.MSG_TTL_QUEUE.getExchangeName())
                    .durable(true)
                    .build();
            return (DirectExchange) build;
        }
    
        // 配置TTL消息队列
        @Bean
        public Queue  ttlQueue(){
            return QueueBuilder
                    .durable(QueueEnum.MSG_TTL_QUEUE.getQueueName())
                    // 配置到期后转发的交换  x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
                    .withArgument("x-dead-letter-exchange",QueueEnum.MSG_QUEUE.getExchangeName())
                    // 配置到期后转发到那个路由建上  x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
                    .withArgument("x-dead-letter-routing-key",QueueEnum.MSG_QUEUE.getRouteKey())
                    .build();
        }
    
        // 配置TTL交换器  路由key  以及消息队列 整合
        @Bean
        public  Binding ttlBinding(){
            return BindingBuilder
                    .bind(ttlQueue())
                    .to(ttlDirectExchange())
                    .with(QueueEnum.MSG_TTL_QUEUE.getRouteKey());
    
        }
    

    重点在于:

    x-dead-letter-exchange、x-dead-letter-routing-key两个参数,而这两个参数就是配置队列过期后转发的Exchange、RouteKey。简单点,就是死信属性,就是配置消息到期后,转到哪一个交换器上绑定了那个路由。

    死信交换(Dead Letter Exchanges 简称 DLX)
    当出现"死信"的情况下 rabbitmq 可以对该"死信"进行交换到别的队列上,但是交换的前提是需要为死信配置一个交换机用于死信的交换

    4:配置生产者消费者代码

    • 1:发送到队列的代码
     public void send(Object messageContent, String exchange, String routerKey, final long delayTimes){
            /**
             * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange with a specific routing key.
             *
             * @param exchange the name of the exchange
             * @param routingKey the routing key
             * @param message a message to send
             * @param messagePostProcessor a processor to apply to the message before it is sent
             * @throws AmqpException if there is a problem
             */
            if(!StringUtils.isEmpty(exchange)){
                logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
                rabbitMqTemplate.convertAndSend(exchange,routerKey,messageContent,message -> {
                    /**
                     * 这里是单独的设置了某一条消息的过期时间,而不是整个队列都设置了。
                     */
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                });
            }else{
                logger.error("未找到队列消息:{},所属的交换机", exchange);
            }
        }
    
    • 2: 消费消息的代码

    本次只是为了验证消费,所以没有重试,以及确认等等

    此时,监听的队列是:正常的消息队列 QueueEnum.MSG_QUEUE 而不是 延迟队列

    @Component
    @RabbitListener(queues ="msg02.queue")
    public class MsgConsumer {
        private Logger logger= LoggerFactory.getLogger(MsgConsumer.class);
        @RabbitHandler
        public void handler(String content) {
            logger.info("消费内容:{}", content);
        }
    }
    
    • 3: 测试代码
    @SpringBootTest(classes = RabbitMQApp.class)
    @RunWith(SpringJUnit4ClassRunner.class)
    public class RabbitmqTest {
    
        @Autowired
        MsgSend msgSend;
    
        @Test
        public void sendTest(){
            msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    15*1000);
    
            msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    10*1000);
        }
    }
    

    BUG问题出现了

    • 1:当我把10秒放在前面 15秒放在后面。
     @Test
        public void sendTest(){
            msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    10*1000);
            msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    15*1000);
        }
    

    如图所示的时候:

    结果是完全符合预期的:

    2019-10-10 21:52:26.606  INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer  : 消费内容:测试延迟消费10秒,写入时间:Thu Oct 10 21:52:16 CST 2019
    2019-10-10 21:52:31.605  INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer  : 消费内容:测试延迟消费15秒,写入时间:Thu Oct 10 21:52:16 CST 2019
    

    可以看到,同一秒写入,但是间隔5秒处理消息。就是说消息队列头部的是10秒。第二个是15秒的时候正常处理

    • 2:当我把15秒放在前面,10秒放在后面的时候。
     @Test
        public void sendTest(){
    
            msgSend.send("测试延迟消费15秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    15*1000);
            msgSend.send("测试延迟消费10秒,写入时间:" + new Date(),
                    QueueEnum.MSG_TTL_QUEUE.getExchangeName(),
                    QueueEnum.MSG_TTL_QUEUE.getRouteKey(),
                    10*1000);
        }
    

    出现BUG了。两个消息都是15秒的时候消费。

    2019-10-10 21:53:09.176  INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer  : 消费内容:测试延迟消费15秒,写入时间:Thu Oct 10 21:52:54 CST 2019
    2019-10-10 21:53:09.181  INFO 2488 --- [cTaskExecutor-1] com.xulei.rabbitmq.consumer.MsgConsumer  : 消费内容:测试延迟消费10秒,写入时间:Thu Oct 10 21:52:54 CST 2019
    

    可以看到,他们都是 21:52:54 写入到消息队列中,但是 消费的时间却都是21:53:09 相隔了15秒。

    就是说现在放在队尾的10秒的数据,阻塞住了,必须等对头的消息处理完毕后,才能进行消费。

    不知道怎么回事啊?????

    总结:(参考 https://blog.csdn.net/u011212394/article/details/100086728

    RabbitMQ 本身不支持延迟队列,总的来说有三种实现方案:

    1.先存储到数据库,用定时任务扫描。

    2.利用 RabbitMQ的死信队列(Dead Letter Queue)实现。

    主要过程:

    生产者 —> 原交换机 —> 原队列(超过 TTL 之后) —> 死信交换机 —> 死信队列 —> 最终消费者

    使用死信队列实现延时消息的缺点:

    (1)如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟......需要创建很多交换机和队列来路由消息。

    (2)如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞,即前一条消息没有出队(没有被消费),后面的消息无法投递。比如第一条消息过期 TTL 是 30min,第二条消息 TTL 是 10min。10 分钟后,即使第二条消息应该投递了,但是由于第一条消息 还未出队,所以无法投递。

    (3)可能存在一定时间误差

    相关文章

      网友评论

          本文标题:【RabbitMQ学习-02】延迟消费问题

          本文链接:https://www.haomeiwen.com/subject/tzkzpctx.html