美文网首页springboot-rabbitmq
springboot-rabbitmq之延迟队列实现(六)

springboot-rabbitmq之延迟队列实现(六)

作者: 前进的码农 | 来源:发表于2020-11-25 10:00 被阅读0次

概念介绍

延迟队列

生产者发送消息后,消费者根据设定的时间,延迟收到消息,rabbitmq默认不带延迟队列功能,我们可以根据rabbitmq的其他几个特性来自己实现,延迟队列。

TTL

RabbitMQ可以对消息和队列设置TTL. 目前有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。

死信队列(DLX)

当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

死信队列形成条件

1、队列消息长度达到限制
2、消费者拒绝接收消息
3、原队列存在过期设置,消息超时未被消费(TTL时间到了)

实现步骤

image.png

声明一个正常的路由和一个正常的队列并给这个队列设置消息过期时间TTL,声明一个交换机和一个队列(这个路由即为死信路由),并把刚刚设置TTL的队列绑定为该死信路由,另外注意消费者消费的是死信路由绑定的队列的消息。

代码实现

路由和队列声明

    public static final String EXCHANGE_NAME="ethan_delay_exchange";
    public static final String EXCHANGE_NAME_DEAD="ethan_delay_exchange_01";

//正常交换机
    @Bean("exchange")
    public Exchange exchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

 //死信交换机
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DEAD).build();
    }

 // 死信交换机绑定的队列
    @Bean("delayQueue")
    public Queue queue(){
        return QueueBuilder.durable("delay_queue_01").build();
    }
 //设置过期时间的队列,注意这个队列要绑定死信路由,注意这里的routingkey不要写错
    @Bean("queue")
    public Queue deadQueue(){
        return QueueBuilder.durable("delay_queue").ttl(10*1000).deadLetterExchange(EXCHANGE_NAME_DEAD).deadLetterRoutingKey("delay_01.msg").build();
    }
 //绑定
    @Bean
    public Binding bing(@Qualifier("exchange") Exchange exchange,
                        @Qualifier("queue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delay.#").noargs();
    }
  //绑定
    @Bean
    public Binding bing01(@Qualifier("deadExchange") Exchange exchange,
                          @Qualifier("delayQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delay_01.#").noargs();
    }

生产消息

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void sendMsg(){
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"delay.welcoml","hello");
    }

消费消息

注意这里消费的队列名称

    @org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "delay_queue_01")
    public void processMessage(String msg) {
        log.info("delaymsg---"+msg);
    }

代码

生产者
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-delay-producer
消费者
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-delay-consumer

相关文章

网友评论

    本文标题:springboot-rabbitmq之延迟队列实现(六)

    本文链接:https://www.haomeiwen.com/subject/eemgiktx.html