美文网首页
死信队列实现延迟消息

死信队列实现延迟消息

作者: 倚仗听江 | 来源:发表于2024-07-09 15:24 被阅读0次

实现延迟消息在RabbitMQ中主要依赖于两个关键机制:TTL(Time-To-Live)和死信交换机(Dead Letter Exchange,简称DLX)。以下是实现延迟消息处理的详细原理:

  1. TTL 队列
    TTL 是 RabbitMQ 中的一个特性,允许为队列中的消息设置一个生存时间。当消息在队列中停留的时间超过了这个设定的时间,消息就会被认为是“过期”,并成为死信消息。在RabbitMQ中,你可以为整个队列设置TTL,这意味着队列中的每条消息都会继承这个TTL值。

  2. 死信交换机(DLX)
    死信交换机是一种特殊的交换机,用于处理死信消息。当消息在队列中过期、被拒绝或达到最大重试次数时,RabbitMQ 会将这些消息视为死信,并根据队列的配置将它们发送到DLX。DLX通常被配置为直连交换机(Direct Exchange),这意味着消息将被直接路由到与之绑定的队列。

  3. 配置延迟队列
    为了实现延迟消息,你需要创建一个具有TTL特性的队列,并配置其x-dead-letter-exchange和x-dead-letter-routing-key参数。这两个参数分别指定了当消息过期时,应该将消息发送到哪个交换机(DLX)以及使用哪个路由键。这些参数是在队列创建时设置的,一旦设置,就不能更改。

  4. 绑定死信队列到DLX
    为了确保过期消息能被正确处理,你需要创建一个死信队列,并将其绑定到DLX上。这个绑定需要使用与延迟队列的x-dead-letter-routing-key相同的路由键。这样,当延迟队列中的消息过期并成为死信时,RabbitMQ会根据配置将消息发送到DLX,再由DLX根据路由键将消息发送到死信队列中。

  5. 处理延迟消息
    一旦延迟消息过期并被发送到死信队列,你就可以像处理普通消息一样处理这些延迟消息。你可以设置消费者监听死信队列,当消息到达时,消费者将执行相应的业务逻辑。

以下是示例代码:

配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQDelayConfig {

    public static final String DELAY_QUEUE_NAME = "delay_queue";
    public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
    public static final String DLX_QUEUE_NAME = "dlx_queue";
  
    /**
     * 让RabbitTemplate自动处理序列化
     * @return
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 创建一个具有TTL的延迟队列。
     * 当消息在队列中停留的时间超过TTL(60秒),它将变成死信消息。
     * 注意:这里配置了死信交换机和死信路由键,但队列本身并不直接绑定到死信交换机上。
     */
    @Bean
    Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 60000); // 设置消息的TTL为60秒
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); // 指定死信交换机
        args.put("x-dead-letter-routing-key", DLX_QUEUE_NAME); // 指定死信队列的路由键
        return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
    }

    /**
     * 创建死信交换机,通常配置为直连交换机(Direct Exchange)。
     * 当延迟队列中的消息过期,它们将被发送到这个交换机。
     */
    @Bean
    DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE_NAME);
    }

    /**
     * 创建死信队列,用于接收延迟队列中过期的消息。
     * 这个队列必须绑定到死信交换机上,使用与延迟队列配置相同的路由键。
     */
    @Bean
    Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    }

    /**
     * 绑定死信队列到死信交换机。
     * 使用与延迟队列配置的`x-dead-letter-routing-key`相同的路由键。
     * 这一步是必要的,确保死信交换机能正确地将消息路由到死信队列。
     */
    @Bean
    Binding bindDlxQueueToDlx(DirectExchange dlxExchange, Queue dlxQueue) {
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(DLX_QUEUE_NAME);
    }
}

生产者

// 不需要设置任何额外的延迟属性,直接发送消息
rabbitTemplate.convertAndSend("delay_queue", new DelayedMessage("1", "dlx delay message"));

消费者

@Component
@Slf4j
public class DelayedMessageConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMQDelayConfig.DLX_QUEUE_NAME)
    public void consumeDelayedMessage(@Payload DelayedMessage message, Message originalMessage, Channel channel) {
        try {
            log.info("Processing delayed message: {}", message);

            // 执行消息处理逻辑
            // ...

            // 如果一切顺利,手动确认消息
            channel.basicAck(originalMessage.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("Error processing message: {}, Error: {}", message, e);

            try {
                // 在发生错误时拒绝消息,可以选择是否重新入队
                channel.basicNack(originalMessage.getMessageProperties().getDeliveryTag(), false, shouldRequeue());
            } catch (IOException ioException) {
                log.error("Failed to nack message due to IO exception", ioException);
            }
        }
    }

    private boolean shouldRequeue() {
        // 根据具体情况决定是否需要重新入队
        // 例如,对于暂时性的错误,可以设置为true
        return false;
    }
}

相关文章

网友评论

      本文标题:死信队列实现延迟消息

      本文链接:https://www.haomeiwen.com/subject/akqzcjtx.html