美文网首页
RabbitMQ 延时队列

RabbitMQ 延时队列

作者: SheHuan | 来源:发表于2021-06-07 21:41 被阅读0次

在上一篇文章中,我们学习了死信队列的相关内容,文章最后我们提到,超时消息结合死信队列也可以实现一个延时队列。大致的流程是这样的,如果正常业务队列中的消息设置了过期时间,并在消息过期后,让消息流入一个死信队列,然后消费者监听这个死信队列,实现消息的延时处理,这样就可以实现一个简单的延时队列。

上边描述的延时队列,其实是存在一些问题的,应付简单的场景还行,如果需要获得更加完善的功能体验,可以选择使用 RabbitMQ 提供的延时消息插件。下边我们分别了解两种实现方式。

一、准备工作

创建 SpringBoot 项目,添加依赖以及连接配置:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties配置 RabbitMQ 服务的相关连接信息:

server.port=8083
# rabbitmq 相关配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

二、延时队列的简单实现

首先创建过期消息的交换机、队列,以及死信交换机、队列,并完成绑定配置,让过期消息可以流入死信队列:

@Configuration
public class RabbitMQConfig {
    // 创建过期消息的交换机
    @Bean
    DirectExchange ttlExchange() {
        return new DirectExchange("ttl.exchange", true, false);
    }

    // 创建死信交换机
    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange", true, false);
    }

    // 创建有过期时间的消息队列,并配置死信队列
    @Bean
    Queue ttlQueue() {
        HashMap<String, Object> args = new HashMap<>();
        // 设置队列中消息的过期时间,单位毫秒
        args.put("x-message-ttl", 10 * 60 * 1000);
        // 设置死信交换机
        args.put("x-dead-letter-exchange", "dead.letter.exchange");
        // 设置死信交换机绑定队列的routingKey
        args.put("x-dead-letter-routing-key", "dead.letter");
        return new Queue("ttl.queue", true, false, false, args);
    }

    // 创建死信队列
    @Bean
    Queue deadLetterQueue() {
        return new Queue("dead.letter.queue", true);
    }
    
    @Bean
    Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");
    }
}

核心的内容就是上边的配置类了,接下来就是发送消息到业务消息队列,并只给死信队列指定消费者,这样发送的消息在正常业务队列过期后,最终会流入死信队列,进而被消费掉。生产者和消费者的代码很简单:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(String message) {
        rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message);
        logger.info("发送的业务消息:" + message);
    }
}
@Service
public class DelayedMessageReceiveService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @RabbitListener(queues = "dead.letter.queue")
    public void receive(String message) {
        logger.info("收到的延时消息:" + message);
    }
}

如果我们有其它不同时间的延时业务需求,就需要在配置类添加更多的和ttl.queue类似配置的过期消息队列,如果新的延时业务需求太多,新消息队列的数量将不可控。

那如果不给队列配置消息的过期时间,而是在发送消息时单独给每条消息配置呢?这样想想好像解决上边的问题,实现的代码如下:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(String message, Integer delay) {
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息的过期时间,单位毫秒
                message.getMessageProperties().setExpiration(String.valueOf(delay));
                return message;
            }
        };
        rabbitTemplate.convertAndSend("ttl.exchange", "ttl", message, processor);
        System.out.println("发送的业务消息:" + message);
    }
}

消费者的代码还是上边的。

我们发送两条消息测试一下:

delayedMessageSendService.send("hello world", 30000);
delayedMessageSendService.send("hello rabbitmq", 10000);

仔细观察上边的运行结果,按照预期hello rabbitmq应该在10秒后先被消费,然而由于我们先发送的hello world消息设置的过期时间为30秒,导致hello rabbitmq被阻塞,直到30秒后陆续被消费掉。问题很明显了,后发送消息的过期时间必须大于大于前边已经发送消息的过期时间,这样才能保证延时队列正常工作,但实际使用中几乎不能保证的。

可以看到,我们简单实现的延时队列虽然可用,但还是存在问题的。使用 RabbitMQ 延时消息插件,就不存在这些问题了,可用性更高!

三、RabbitMQ 延时消息插件

1、插件安装

RabbitMQ 默认是没有内置延时消息插件的,需要我们单独下载安装。下载地址入口:
https://www.rabbitmq.com/community-plugins.html

将下载好的插件放到 RabbitMQ 安装目录下的plugins目录,然后进入sbin目录,我安装的 Windows 版的 RabbitMQ,执行rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange命令来安装插件:


安装好后,重启服务就可以使用了。

如果使用原生 Java client 操作延时消息插件,可以参考:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2、用法介绍

首先创建延时消息交换机以及队列,创建交换机和之前有所不同,需要使用CustomExchange

@Configuration
public class DelayedRabbitMQConfig {
    @Bean
    CustomExchange delayedExchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    Queue delayedQueue() {
        return new Queue("delayed.queue", true);
    }

    @Bean
    Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed").noargs();
    }
}

然后创建消费者,监听delayed.queue

@Service
public class DelayedMessageReceiveService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @RabbitListener(queues = "delayed.queue")
    public void receive2(String message) {
        logger.info("收到的延时消息:" + message);
    }
}

发送消息时指定消息要延时处理的时间:

@Service
public class DelayedMessageSendService {
    Logger logger = LoggerFactory.getLogger(this.getClass().getSimpleName());

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @param message
     * @param delay   延时时间,单位毫秒
     */
    public void send2(String message, Integer delay) {
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息延时处理的时间
                message.getMessageProperties().setDelay(delay);
                return message;
            }
        };
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed", message, processor);
        logger.info("发送的延时消息:" + message);
    }
}

然后发送几条消息,测试下效果:

delayedMessageSendService.send2("hello world", 30000);
delayedMessageSendService.send2("hello rabbitmq", 10000);
delayedMessageSendService.send2("hello kitty", 20000);

最终的效果如下,符合我们的预期:


通过官方插件来实现延时消息队列还是很简单的。

使用延时队列可以完成很多常见的需求,比如预约商品开售前提醒用户购买、下单后一定时间未支付则取消订单、收到商品后一定时间还没确认收货则系统自动确认收货等。

关于 RabbitMQ 延时队列的内容就介绍到这里了。

本文完!

相关文章

  • SpringBoot 使用RabbitMQ 做延时队列

    SpringBoot 使用RabbitMQ 做延时队列 1.下载并安装erlang和RabbitMQ erlang...

  • spring boot 集成rabbitmq 实现延迟队列

    rabbitmq 实现延迟队列 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息...

  • rabbitmq延时队列

    延时队列在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付...

  • RabbitMQ 延时队列

    在上一篇文章中,我们学习了死信队列的相关内容,文章最后我们提到,超时消息结合死信队列也可以实现一个延时队列。大致的...

  • RabbitMQ 消息队列 延时队列和死信队列

    如果你不熟悉RabbitMQ,本文从图形界面入手介绍 RabbitMQ 的基本使用方法和 延时队列和死信队列的实现...

  • 使用Spring Cloud Stream玩转RabbitMQ,

    前一章我们讲了《SpringBoot RabbitMQ消息队列的重试、超时、延时、死信队列[https://my....

  • RabbitMQ实现延时队列

    接之前分享过的文章【RabbitMQ的死信队列和延时队列】[https://www.jianshu.com/p/2...

  • 基于rabbitmq实现的延时队列(golang版)

    虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的 实现延时队列的基本要素 存在一个倒计时...

  • RabbitMQ实现延时队列

    首先,我们需要知道,Rabbit本身是不支持延时队列的。但是,它有一个死信投递机制,可以曲线救国来实现我们想要的延...

  • rabbitMQ-延时队列

    延时队列我们可以简单粗暴的理解它为延时发送消息的队列 那延时队列的应用场景有哪些呢,比如订单在一段时间内未支付则取...

网友评论

      本文标题:RabbitMQ 延时队列

      本文链接:https://www.haomeiwen.com/subject/uylesltx.html