美文网首页
【RabbitMQ的那点事】利用rabbitmq_delayed

【RabbitMQ的那点事】利用rabbitmq_delayed

作者: 伊丽莎白2015 | 来源:发表于2022-05-05 23:42 被阅读0次

官网文章:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq 详细讲了两种队列的方式:

1. 插件的资源相关

1.1 github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/

The most recent release of this plugin targets RabbitMQ 3.9.x. Series earlier than 3.8.x are out of support.
该插件的主要支持版本是3.8.+以及3.9.x。(开头的官网文章提到delayed插件是3.5.5的新的特性,但目前github上下载连接是3.8.+的,所以尽量还是先升级到高的版本,再使用这个插件)。

2. MacOS安装RabbitMQ插件:

2.1 确认RabbitMQ版本:

可以在brew默认的安装路径下找到rabbitmq的目录,里面有版本号。

% pwd
/usr/local/Cellar/rabbitmq
% ls
3.9.16

2.2 下载插件(github):

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载插件页面截图
2.3 把下载的插件放到plugin目录下:

plugin在安装的时候会默认读几个目录,怎么查找这些目录:

% rabbitmq-plugins directories -s

将出输出目录:将上述下载的plugin放到Plugin archives directory下:

  • Plugin archives directory:/usr/local/Cellar/rabbitmq/3.9.16/plugins
  • Plugin expansion directory: /usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand
  • Enabled plugins file: /usr/local/etc/rabbitmq/enabled_plugins

PS. 如果跑rabbitmq-plugins遇到:command not found: rabbitmq-plugins,可以先把rabbitmq加到系统变量中,以下我加到了个人profile中:

%vi ~/.bash_profile

在内容中追加:export PATH=/usr/local/Cellar/rabbitmq/3.9.16/sbin:$PATH
保存退出再source下:

%source ~/.bash_profile

2.4 安装插件

% rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这里遇到一个问题:
由于之前安装过3.6.x版本,导致的一些问题,发现在安装rabbitmq_delayed_message_exchange报错:

rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@localhost:
rabbitmq_delayed_message_exchange
Error:
{:plugins_not_found, [:rabbitmq_management_visualiser]}

参考了文章:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/119

解决plugins_not_found问题
上面说在v3.7.x之后就不用rabbitmq_management_visualiser这个插件了,需要手动删除
记得在#2.3使用命令rabbitmq-plugins directories -s输出的3个目录中,其中一个就是Enabled plugins file: /usr/local/etc/rabbitmq/enabled_plugins,进入该文件,手动把rabbitmq_management_visualiser删除即可。

在删除这个legacy的插件后,安装成功。安装成功后,在本地的Console UI http://localhost:15672/#/exchanges,新建exchange,type就会多一个x-deplayed-message

image.png

3. 与Spring boot集成

依赖加载戳之前的文章 - 【RabbitMQ的那点事】与Spring boot集成:https://www.jianshu.com/p/4a21a7fce14c,这里只实现delayed-message这个功能。

3.1 首先是queue定义,exchange定义以及binding,使用@RabbitListener监听消息的到达

要注意的是x-delayed-message类型的exchange,在Spring用的依然是TopicExchange类,只不过要额外setDelayed=true;

@Slf4j
@Configuration
public class DelayedExchangeConfig {
    @Bean
    public Queue delayedQueue() {
        return new Queue("delay.queue", true);
    }

    @Bean
    public TopicExchange delayedExchange() {
        TopicExchange delayedExchange = new TopicExchange("delayed.exchange");
        delayedExchange.setDelayed(true);
        return delayedExchange;
    }

    @Bean
    public Binding deplayedBindings(Queue delayedQueue, TopicExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.message");
    }

    @RabbitListener(queues = {"delay.queue"})
    public void receiveMessageFromTopic(String message) {
        log.info("Received delayed message: " + message);
    }
}
3.2 定义Producer

需要在Test方法中sleep下,不然上述的@RabbitListener打印不了(主程序先结束了)。
另外,就是delayed时间在message的header中设置:

@Slf4j
@SpringBootTest
public class ProducerServiceTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessageToDelayedExchange() throws InterruptedException {
        log.info("Start to send delayed message...");
        rabbitTemplate.convertAndSend("delayed.exchange", "delayed.message", "hello delayed message!", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息延迟时间,单位 ms
                message.getMessageProperties().setHeader("x-delay", 20 * 1000);
                return message;
            }
        });
        log.info("Finished sending delayed message...");

        Thread.sleep(40000L);
    }
}

启动测试后打印:能看到过了20s后,Consumer才收到消息:

2022-05-05 23:35:47.874 INFO 782 --- [ main] ProducerServiceTest : Start to send delayed message...
2022-05-05 23:35:47.878 INFO 782 --- [ main] ProducerServiceTest : Finished sending delayed message...
2022-05-05 23:36:07.777 INFO 782 --- [ntContainer#0-1] DelayedExchangeConfig : Received delayed message: hello delayed message!


参考:
RabbitMQ 延迟队列:https://blog.csdn.net/LIFE_PLAN/article/details/122993567

相关文章

网友评论

      本文标题:【RabbitMQ的那点事】利用rabbitmq_delayed

      本文链接:https://www.haomeiwen.com/subject/jeibyrtx.html