美文网首页
死信队列实现延迟消息

死信队列实现延迟消息

作者: 倚仗听江 | 来源:发表于2024-07-09 15:24 被阅读0次

    实现延迟消息在RabbitMQ中主要依赖于两个关键机制:TTL(Time-To-Live)和死信交换机(Dead Letter Exchange,简称DLX)。以下是实现延迟消息处理的详细原理:

    1. TTL 队列
      TTL 是 RabbitMQ 中的一个特性,允许为队列中的消息设置一个生存时间。当消息在队列中停留的时间超过了这个设定的时间,消息就会被认为是“过期”,并成为死信消息。在RabbitMQ中,你可以为整个队列设置TTL,这意味着队列中的每条消息都会继承这个TTL值。

    2. 死信交换机(DLX)
      死信交换机是一种特殊的交换机,用于处理死信消息。当消息在队列中过期、被拒绝或达到最大重试次数时,RabbitMQ 会将这些消息视为死信,并根据队列的配置将它们发送到DLX。DLX通常被配置为直连交换机(Direct Exchange),这意味着消息将被直接路由到与之绑定的队列。

    3. 配置延迟队列
      为了实现延迟消息,你需要创建一个具有TTL特性的队列,并配置其x-dead-letter-exchange和x-dead-letter-routing-key参数。这两个参数分别指定了当消息过期时,应该将消息发送到哪个交换机(DLX)以及使用哪个路由键。这些参数是在队列创建时设置的,一旦设置,就不能更改。

    4. 绑定死信队列到DLX
      为了确保过期消息能被正确处理,你需要创建一个死信队列,并将其绑定到DLX上。这个绑定需要使用与延迟队列的x-dead-letter-routing-key相同的路由键。这样,当延迟队列中的消息过期并成为死信时,RabbitMQ会根据配置将消息发送到DLX,再由DLX根据路由键将消息发送到死信队列中。

    5. 处理延迟消息
      一旦延迟消息过期并被发送到死信队列,你就可以像处理普通消息一样处理这些延迟消息。你可以设置消费者监听死信队列,当消息到达时,消费者将执行相应的业务逻辑。

    以下是示例代码:

    配置类

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitMQDelayConfig {
    
        public static final String DELAY_QUEUE_NAME = "delay_queue";
        public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
        public static final String DLX_QUEUE_NAME = "dlx_queue";
      
        /**
         * 让RabbitTemplate自动处理序列化
         * @return
         */
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * 创建一个具有TTL的延迟队列。
         * 当消息在队列中停留的时间超过TTL(60秒),它将变成死信消息。
         * 注意:这里配置了死信交换机和死信路由键,但队列本身并不直接绑定到死信交换机上。
         */
        @Bean
        Queue delayQueue() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 60000); // 设置消息的TTL为60秒
            args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); // 指定死信交换机
            args.put("x-dead-letter-routing-key", DLX_QUEUE_NAME); // 指定死信队列的路由键
            return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
        }
    
        /**
         * 创建死信交换机,通常配置为直连交换机(Direct Exchange)。
         * 当延迟队列中的消息过期,它们将被发送到这个交换机。
         */
        @Bean
        DirectExchange dlxExchange() {
            return new DirectExchange(DLX_EXCHANGE_NAME);
        }
    
        /**
         * 创建死信队列,用于接收延迟队列中过期的消息。
         * 这个队列必须绑定到死信交换机上,使用与延迟队列配置相同的路由键。
         */
        @Bean
        Queue dlxQueue() {
            return QueueBuilder.durable(DLX_QUEUE_NAME).build();
        }
    
        /**
         * 绑定死信队列到死信交换机。
         * 使用与延迟队列配置的`x-dead-letter-routing-key`相同的路由键。
         * 这一步是必要的,确保死信交换机能正确地将消息路由到死信队列。
         */
        @Bean
        Binding bindDlxQueueToDlx(DirectExchange dlxExchange, Queue dlxQueue) {
            return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_QUEUE_NAME);
        }
    }
    

    生产者

    // 不需要设置任何额外的延迟属性,直接发送消息
    rabbitTemplate.convertAndSend("delay_queue", new DelayedMessage("1", "dlx delay message"));
    

    消费者

    @Component
    @Slf4j
    public class DelayedMessageConsumer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RabbitListener(queues = RabbitMQDelayConfig.DLX_QUEUE_NAME)
        public void consumeDelayedMessage(@Payload DelayedMessage message, Message originalMessage, Channel channel) {
            try {
                log.info("Processing delayed message: {}", message);
    
                // 执行消息处理逻辑
                // ...
    
                // 如果一切顺利,手动确认消息
                channel.basicAck(originalMessage.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                log.error("Error processing message: {}, Error: {}", message, e);
    
                try {
                    // 在发生错误时拒绝消息,可以选择是否重新入队
                    channel.basicNack(originalMessage.getMessageProperties().getDeliveryTag(), false, shouldRequeue());
                } catch (IOException ioException) {
                    log.error("Failed to nack message due to IO exception", ioException);
                }
            }
        }
    
        private boolean shouldRequeue() {
            // 根据具体情况决定是否需要重新入队
            // 例如,对于暂时性的错误,可以设置为true
            return false;
        }
    }
    

    相关文章

      网友评论

          本文标题:死信队列实现延迟消息

          本文链接:https://www.haomeiwen.com/subject/akqzcjtx.html