美文网首页
RabbitMQ 延时队列

RabbitMQ 延时队列

作者: SheHuan | 来源:发表于2021-06-07 21:41 被阅读0次

    在上一篇文章中,我们学习了死信队列的相关内容,文章最后我们提到,超时消息结合死信队列也可以实现一个延时队列。大致的流程是这样的,如果正常业务队列中的消息设置了过期时间,并在消息过期后,让消息流入一个死信队列,然后消费者监听这个死信队列,实现消息的延时处理,这样就可以实现一个简单的延时队列。

    上边描述的延时队列,其实是存在一些问题的,应付简单的场景还行,如果需要获得更加完善的功能体验,可以选择使用 RabbitMQ 提供的延时消息插件。下边我们分别了解两种实现方式。

    一、准备工作

    创建 SpringBoot 项目,添加依赖以及连接配置:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    application.properties配置 RabbitMQ 服务的相关连接信息:

    server.port=8083
    # rabbitmq 相关配置
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=/
    

    二、延时队列的简单实现

    首先创建过期消息的交换机、队列,以及死信交换机、队列,并完成绑定配置,让过期消息可以流入死信队列:

    @Configuration
    public class RabbitMQConfig {
        // 创建过期消息的交换机
        @Bean
        DirectExchange ttlExchange() {
            return new DirectExchange("ttl.exchange", true, false);
        }
    
        // 创建死信交换机
        @Bean
        DirectExchange deadLetterExchange() {
            return new DirectExchange("dead.letter.exchange", true, false);
        }
    
        // 创建有过期时间的消息队列,并配置死信队列
        @Bean
        Queue ttlQueue() {
            HashMap<String, Object> args = new HashMap<>();
            // 设置队列中消息的过期时间,单位毫秒
            args.put("x-message-ttl", 10 * 60 * 1000);
            // 设置死信交换机
            args.put("x-dead-letter-exchange", "dead.letter.exchange");
            // 设置死信交换机绑定队列的routingKey
            args.put("x-dead-letter-routing-key", "dead.letter");
            return new Queue("ttl.queue", true, false, false, args);
        }
    
        // 创建死信队列
        @Bean
        Queue deadLetterQueue() {
            return new Queue("dead.letter.queue", true);
        }
        
        @Bean
        Binding ttlBinding() {
            return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
        }
    
        @Bean
        Binding deadLetterBinding() {
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
        }
    }
    

    核心的内容就是上边的配置类了,接下来就是发送消息到业务消息队列,并只给死信队列指定消费者,这样发送的消息在正常业务队列过期后,最终会流入死信队列,进而被消费掉。生产者和消费者的代码很简单:

    @Service
    public class DelayedMessageSendService {
        Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void send(String message) {
            rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message);
            logger.info("发送的业务消息:" + message);
        }
    }
    
    @Service
    public class DelayedMessageReceiveService {
        Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
    
        @RabbitListener(queues = "dead.letter.queue")
        public void receive(String message) {
            logger.info("收到的延时消息:" + message);
        }
    }
    

    如果我们有其它不同时间的延时业务需求,就需要在配置类添加更多的和ttl.queue类似配置的过期消息队列,如果新的延时业务需求太多,新消息队列的数量将不可控。

    那如果不给队列配置消息的过期时间,而是在发送消息时单独给每条消息配置呢?这样想想好像解决上边的问题,实现的代码如下:

    @Service
    public class DelayedMessageSendService {
        Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        public void send(String message, Integer delay) {
            MessagePostProcessor processor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    // 设置消息的过期时间,单位毫秒
                    message.getMessageProperties().setExpiration(String.valueOf(delay));
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message, processor);
            System.out.println("发送的业务消息:" + message);
        }
    }
    

    消费者的代码还是上边的。

    我们发送两条消息测试一下:

    delayedMessageSendService.send("hello world", 30000);
    delayedMessageSendService.send("hello rabbitmq", 10000);
    

    仔细观察上边的运行结果,按照预期hello rabbitmq应该在10秒后先被消费,然而由于我们先发送的hello world消息设置的过期时间为30秒,导致hello rabbitmq被阻塞,直到30秒后陆续被消费掉。问题很明显了,后发送消息的过期时间必须大于大于前边已经发送消息的过期时间,这样才能保证延时队列正常工作,但实际使用中几乎不能保证的。

    可以看到,我们简单实现的延时队列虽然可用,但还是存在问题的。使用 RabbitMQ 延时消息插件,就不存在这些问题了,可用性更高!

    三、RabbitMQ 延时消息插件

    1、插件安装

    RabbitMQ 默认是没有内置延时消息插件的,需要我们单独下载安装。下载地址入口:
    https://www.rabbitmq.com/community-plugins.html

    将下载好的插件放到 RabbitMQ 安装目录下的plugins目录,然后进入sbin目录,我安装的 Windows 版的 RabbitMQ,执行rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange命令来安装插件:


    安装好后,重启服务就可以使用了。

    如果使用原生 Java client 操作延时消息插件,可以参考:
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    2、用法介绍

    首先创建延时消息交换机以及队列,创建交换机和之前有所不同,需要使用CustomExchange

    @Configuration
    public class DelayedRabbitMQConfig {
        @Bean
        CustomExchange delayedExchange() {
            HashMap<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
        }
    
        @Bean
        Queue delayedQueue() {
            return new Queue("delayed.queue", true);
        }
    
        @Bean
        Binding delayedBinding() {
            return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed").noargs();
        }
    }
    

    然后创建消费者,监听delayed.queue

    @Service
    public class DelayedMessageReceiveService {
        Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
    
        @RabbitListener(queues = "delayed.queue")
        public void receive2(String message) {
            logger.info("收到的延时消息:" + message);
        }
    }
    

    发送消息时指定消息要延时处理的时间:

    @Service
    public class DelayedMessageSendService {
        Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * @param message
         * @param delay   延时时间,单位毫秒
         */
        public void send2(String message, Integer delay) {
            MessagePostProcessor processor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    // 设置消息延时处理的时间
                    message.getMessageProperties().setDelay(delay);
                    return message;
                }
            };
            rabbitTemplate.convertAndSend("delayed.exchange", "delayed", message, processor);
            logger.info("发送的延时消息:" + message);
        }
    }
    

    然后发送几条消息,测试下效果:

    delayedMessageSendService.send2("hello world", 30000);
    delayedMessageSendService.send2("hello rabbitmq", 10000);
    delayedMessageSendService.send2("hello kitty", 20000);
    

    最终的效果如下,符合我们的预期:


    通过官方插件来实现延时消息队列还是很简单的。

    使用延时队列可以完成很多常见的需求,比如预约商品开售前提醒用户购买、下单后一定时间未支付则取消订单、收到商品后一定时间还没确认收货则系统自动确认收货等。

    关于 RabbitMQ 延时队列的内容就介绍到这里了。

    本文完!

    相关文章

      网友评论

          本文标题:RabbitMQ 延时队列

          本文链接:https://www.haomeiwen.com/subject/uylesltx.html