美文网首页
RabbitMQ死信队列

RabbitMQ死信队列

作者: markeNick | 来源:发表于2020-09-15 12:28 被阅读0次

什么是死信队列

当发生以下任何事件,那么消息将成为死信

  • 消费者使用basic.rejectbasic.nackrequeue参数设置为false来否定该消息

  • 消息在队列中存活时间到达设置的TTL

  • 消息队列的消息数量超过了最大限制

如果配置了死信队列消息,那么该消息将会丢入死信队列,如果没有配置,消息将被丢弃。

过程

为每个需要使用死信的业务队列配置死信交换机,当消息成为死信的时候,将由死信交换机转发到死信队列,然后由死信队列的监听者去处理死信。

业务使用场景

延时任务

例如:下订单超过15分钟未支付自动取消订单,生成的未支付订单放到业务队列中并设置过期时间TTL,当业务队 列中的消息成为死信,交由死信交换机转发到死信队列,死信队列监听者再去判断订单是否完成支付,没 有完成支付的就关闭订单。

队列的消息设置了过期时间TTL,但RabbitMQ不会监听所有消息,之会监听队列第一个入队的消息,只有该消息过期了,才会监听下一个,因此有可能后进入但是TTL短的消息一直待在队列中,直到第一个入队的出队。

死信队列demo

准备

  • 两个springboot项目,rabbitmq-provider,rabbitmq-comsumer

  • 版本号:2.1.7.RELEASE

  • 依赖:

    <!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • application.yml

    server:
      port: 9001
    spring:
      application:
        name: rabbitmq-provider
      rabbitmq:
        host: 192.168.1.45
        port: 5672
        username: admin
        password: admin
        #virtual-host: xxxx
        listener:
          type: simple
          simple:
            # 信息被拒绝后是否重回队列
            default-requeue-rejected: false
            # 消息确认: 手动
            acknowledge-mode: manual
    

rabbitmq-provider

1、创建RabbitmqConfig.java

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitmqConfig {

    public static final String BUSINESS_EXCHANGE = "dead.letter.business.exchange";
    public static final String BUSINESS_QUEUE_A = "dead.letter.business.queue.a";
    public static final String BUSINESS_QUEUE_B = "dead.letter.business.queue.b";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_A = "dead.letter.queue.a";
    public static final String DEAD_LETTER_QUEUE_B = "dead.letter.queue.b";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead.letter.queue.a.routing.key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead.letter.queue.b.routing.key";

    /**
     * 业务交换机
     */
    @Bean
    public FanoutExchange businessExchange() {
        return new FanoutExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 死信交换机
     */
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 业务队列A
     */
    @Bean
    public Queue businessQueueA() {
        Map<String, Object> args = new HashMap<>(4);
        // 当前队列绑定到死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 当前队列的死信路由
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE_A).withArguments(args).build();
    }

    /**
     * 业务队列B
     */
    @Bean
    public Queue businessQueueB() {
        Map<String, Object> args = new HashMap<>(4);
        // 当前队列绑定到死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 当前队列的死信路由
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE_B).withArguments(args).build();
    }

    /**
     * 死信队列A
     */
    @Bean
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUE_A);
    }

    /**
     * 死信队列B
     */
    @Bean
    public Queue deadLetterQueueB() {
        return new Queue(DEAD_LETTER_QUEUE_B);
    }

    /**
     * 绑定业务队列A到业务交换机
     */
    @Bean
    public Binding bindingBusinessQueueA() {
        return BindingBuilder.bind(businessQueueA()).to(businessExchange());
    }

    /**
     * 绑定业务队列B到业务交换机
     */
    @Bean
    public Binding bindingBusinessQueueB() {
        return BindingBuilder.bind(businessQueueB()).to(businessExchange());
    }

    /**
     * 绑定死信队列A到死信交换机
     */
    @Bean Binding bindingDeadLetterQueueA() {
        return BindingBuilder.bind(deadLetterQueueA())
                             .to(deadLetterExchange())
                             .with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }

    /**
     * 绑定死信队列B到死信交换机
     */
    @Bean Binding bindingDeadLetterQueueB() {
        return BindingBuilder.bind(deadLetterQueueB())
                             .to(deadLetterExchange())
                             .with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }
}

2、控制器提供一个发送消息的方法

@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("/sendMsg")
public String sendMsg() {

    String msg = "hello dead letter queue";
    rabbitTemplate.convertAndSend("dead.letter.business.exchange", null, msg);

    return "success";
}

3、启动项目,调用接口发送消息

rabbitmq-consumer

1、创建业务队列消费者BusinessReceiver.java

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class BusinessReceiver {

    @RabbitListener(queues = "dead.letter.business.queue.a")
    public void receiverA(Message message, Channel channel) throws Exception {
        // 这里由业务队列A的消费者来产生死信
        String msg = new String(message.getBody());
        System.out.println("receiverA拒绝消费消息A: " + msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        
    }

    @RabbitListener(queues = "dead.letter.business.queue.b")
    public void receiverB(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println("receiverB接收消息B: " + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

2、创建死信队列消费者DeadLetterReceiver.java

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterReceiver {

    @RabbitListener(queues = "dead.letter.queue.a")
    public void deadReceiverA(Message message, Channel channel) throws Exception {

        String msg = new String(message.getBody());
        System.out.println("deadReceiverA:死信信息A:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "dead.letter.queue.b")
    public void deadReceiverB(Message message, Channel channel) throws Exception {
        
        String msg = new String(message.getBody());
        System.out.println("deadReceiverB:死信信息B:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

3、启动项目,观察控制台输出

receiverB接收消息B: hello dead letter queue
receiverA拒绝消费消息A: hello dead letter queue
deadReceiverA:死信信息A:hello dead letter queue

相关文章

  • 【深度知识】RabbitMQ死信队列的原理及GO实现

    1. 摘要 本文按照以下目前讲解RabbitMQ死信队列的内容,包括:(1)死信队列是什么?(2)如何配置死信队列...

  • rabbitmq延迟队列

    一、讲解RabbitMQ的的死信队列+ TTL 二、RabbitMQ的延迟队列和应⽤场景 1、简介 2、业界的⼀些...

  • RabbitMQ之认识死信队列一

    前言 RabbitMQ 有个队列叫死信队列,死信队列可以做蛮多事的,比如可以让消息半个小时后消费,规定每天几点钟消...

  • RabbitMQ消息中间件技术精讲17 高级篇十 死信队列

    死信队列介绍 本文是《RabbitMQ精讲系列》中第十七:RabbitMQ消息中间件技术精讲17 高级篇十 死信队...

  • RabbitMQ死信队列

    死信队列介绍 死信队列:DLX(dead-letter-exchange) 利用DLX,当消息在一个队列中变成死信...

  • RabbitMQ死信队列

    什么是死信队列 当发生以下任何事件,那么消息将成为死信 消费者使用basic.reject或 basic.nack...

  • 死信队列 (rabbitMQ)

    转载:http://www.imooc.com/article/283645 1.什么是死信队列 想必有些小伙伴应...

  • RabbitMQ 死信队列

    死信队列 "死信"模式 指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人...

  • RabbitMQ死信队列

    SpringBoot 是为了简化 Spring 应用的创建、运行、调试、部署等一系列问题而诞生的产物,自动装配的特...

  • rabbitmq 死信队列

    死信队列: DLX,dead-letter-exchange 利用 dlx,当消息在一个队列中变成死信 (dead...

网友评论

      本文标题:RabbitMQ死信队列

      本文链接:https://www.haomeiwen.com/subject/eqxfyktx.html