美文网首页
RabbitMQ 死信队列

RabbitMQ 死信队列

作者: SheHuan | 来源:发表于2021-06-01 21:55 被阅读0次

    一、认识死信队列

    首先了解一下什么是死信,官方将其翻译为单词Dead Letter。死信,其实这是 RabbitMQ 中一种消息类型,和普通的消息在本质上没有什么区别,更多的是一种业务上的划分。如果队列中的消息出现以下情况之一,就会变成死信:

    • 消息接收时被拒绝会变成死信,例如调用channel.basicNackchannel.basicReject ,并设置requeuefalse
    • 如果给消息队列设置了消息的过期时间(x-message-ttl),或者发送消息时设置了当前消息的过期时间,当消息在队列中的存活时间大于过期时间时,就会变成死信。
    • 如果给消息队列设置了最大容量(x-max-length),队列已经满了,后续再进来的消息会溢出,无法被队列接收就会变成死信。

    如果不对死信做任何处理,则消息会被直接丢弃。一般死信都是那些在业务上未被正常处理的消息,我们可以考虑用一个队列来接收这些死信消息,接收死信消息的队列就是死信队列,它就是一个普通的消息队列,没有什么特殊的,只是我们在业务上赋予了它特殊的职责罢了,后期再根据实际情况处理死信队列中的消息即可。

    二、准备工作

    创建一个 SpringBoot 项目,添加 RabbitMQ 依赖,并添加需要的配置:

    # rabbitmq 相关配置
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    # 设置消费者需要手动确认消息
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    

    接下来创建一个死信队列、交换机,并完成绑定,这里的交换机也可以称作死信交换机,交换机的类型没有特殊的要求根据实际需求选择即可:

    @Configuration
    public class DeadLetterRabbitMQConfig {
        // 创建交换机
        @Bean
        DirectExchange deadLetterExchange() {
            return new DirectExchange("dead.letter.exchange", true, false);
        }
    
        // 创建死信队列
        @Bean
        Queue deadLetterQueue() {
            return new Queue("dead.letter.queue", true);
        }
    
        // 绑定队列和交换机
        @Bean
        Binding deadLetterBinding() {
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
        }
    }
    

    三、死信队列用法

    这里我们根据文章开头描述的,正常消息变成死信的几种场景分别来看死信队列的用法。

    1、消息被拒绝

    首先创建处理业务消息的交换机、队列:

    @Configuration
    public class BusinessRabbitMQConfig {
        // 创建交换机
        @Bean
        DirectExchange businessExchange() {
            return new DirectExchange("business.exchange", true, false);
        }
    
        // 创建业务消息队列
        @Bean
        Queue businessQueue1() {
            HashMap<String, Object> args = new HashMap<>();
            // 设置死信交换机
            args.put("x-dead-letter-exchange", "dead.letter.exchange");
            // 设置死信交换机绑定队列的routingKey
            args.put("x-dead-letter-routing-key", "dead.letter");
            return new Queue("business.queue1", true, false, false, args);
        }
    
        @Bean
        Binding businessBinding1() {
            return BindingBuilder.bind(businessQueue1()).to(businessExchange()).with("business1");
        }
    }
    

    创建business.queue1时,我们给它配置了前边创建死信交换机、以及 routingKey,这样就完成了业务消息队列和死信队列的绑定,业务消息被拒绝后,就会进入死信队列。

    注意,如果队列已经创建,之后再修改队列的配置参数,则不会生效,需要删除掉队列重新创建

    接下来,创建消费者来消费business.queue1中的业务消息,为了突出效果,直接让消费者拒绝掉消息,不了解消息确认机制的可以翻阅之前的文章:

    @Service
    public class BusinessReceiveService {
        @RabbitListener(queues = "business.queue1")
        public void receive(String msg, Channel channel, Message message) {
            try {
                // 拒绝消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("拒绝的业务消息:" + msg);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
    

    发送消息的服务很简单:

    @Service
    public class BusinessSendService {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void send(String routingKey, String message) {
            rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
            System.out.println("发送的业务消息:" + message);
        }
    }
    

    启动项目后,通过测试类发送一条消息:

    @SpringBootTest
    class DeadLetterApplicationTests {
        @Autowired
        BusinessSendService businessSendService;
    
        @Test
        void contextLoads() {
            String routingKey = "business1";
            String message = routingKey + "-data-" + System.currentTimeMillis();
            businessSendService.send(routingKey, message);
        }
    }
    

    按照预期,消息最终会流入死信队列。可以通过 RabbitMQ 的后台管理界面查看具体的效果:


    2、消息过期

    BusinessRabbitMQConfig中再添加一个business.queue2业务消息队列,设置队列中消息的过期时间为10秒,同样设置好死信队列:

    @Bean
    Queue businessQueue2() {
        HashMap<String, Object> args = new HashMap<>();
        // 设置队列中消息的过期时间,单位毫秒
        args.put("x-message-ttl", 10000);
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        args.put("x-dead-letter-routing-key", "dead.letter");
        return new Queue("business.queue2", true, false, false, args);
    }
    
    @Bean
    Binding businessBinding2() {
        return BindingBuilder.bind(businessQueue2()).to(businessExchange()).with("business2");
    }
    

    不用给business.queue2配置消费者,重启项目,直接发送一条消息,让它自动过期即可:

    String routingKey = "business2";
    String message = routingKey + "-data-" + System.currentTimeMillis();
    businessSendService.send(routingKey, message);
    

    等待10秒后,消息会自动流入死信队列:

    除了给队列设置消息的超时时间,也可以在发送消息时配置,有兴趣的可以自己尝试:

    public void send2(String routingKey, String message) {
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("10000");
                    return message;
                }
            };
        rabbitTemplate.convertAndSend("business.exchange", routingKey, message);
        System.out.println("发送的业务消息:" + message);
    }
    

    3、消息溢出

    由于消息队列满了,导致消息溢出而进入死信队列的场景也比较简单。

    BusinessRabbitMQConfig中再添加一个business.queue3业务消息队列,设置队列的大小为10,同样设置好死信队列:

    @Bean
    Queue businessQueue3() {
        HashMap<String, Object> args = new HashMap<>();
        // 设置消息队列的大小
        args.put("x-max-length", 10);
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        args.put("x-dead-letter-routing-key", "dead.letter");
        return new Queue("business.queue3", true, false, false, args);
    }
    
    @Bean
    Binding businessBinding3() {
        return BindingBuilder.bind(businessQueue3()).to(businessExchange()).with("business3");
    }
    

    business.queue3也不设置消费者,重启项目,发送15条消息:

    for (int i = 0; i < 15; i++) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String routingKey = "business3";
        String message = routingKey + "-data-" + System.currentTimeMillis();
        businessSendService.send(routingKey, message);
    }
    

    按照预期business.queue3最终会有10条消息,剩下的5条进入死信队列:

    四、小结

    关于死信队列的用法就介绍到这里了,还是很简单的。在一些重要的业务场景中,为了防止有些消息由于各种原因未被正常消费而丢失掉,可以考虑使用死信队列来保存这些消息,以方便后期排查问题使用,这样总比后期再去复现错误要简单的多。其实,延时队列也可以结合死信队列来实现,本文消息过期例子就是它的雏形,后边的文章我们再详细探讨。

    本文完!

    相关文章

      网友评论

          本文标题:RabbitMQ 死信队列

          本文链接:https://www.haomeiwen.com/subject/jkzzjltx.html