美文网首页
rabbitmq发送延迟消息

rabbitmq发送延迟消息

作者: 无尘粉笔 | 来源:发表于2020-02-28 10:36 被阅读0次

    软件准备

    erlang

    本文使用的版本是:Erlang 20.3

    RabbitMQ

    本文使用的是 window 版本的RabbitMQ,版本号是:3.7.4

    rabbitmq_delayed_message_exchange插件

    插件下载地址:http://www.rabbitmq.com/community-plugins.html

    打开网址后,ctrl + f,搜索 rabbitmq_delayed_message_exchange 。

    [图片上传失败...(image-568eaa-1582857295002)]

    千万记住,一定选好版本号,由于我使用的是RabbitMQ 3.7.4,因此对应的 rabbitmq_delayed_message_exchange 插件也必须选择3.7.x的。

    如果没有选对版本,在使用延迟消息的时候,会遇到各种各样的奇葩问题,而且网上还找不到解决方案。我因为这个问题,折腾了整整一个晚上。请牢记,要选对插件版本。

    下载完插件后,将其放置到RabbitMQ安装目录下的 plugins 目录下,并使用如下命令启动这个插件:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    如果启动成功会出现如下信息:

    The following plugins have been enabled: rabbitmq_delayed_message_exchange

    启动插件成功后,记得重启一下RabbitMQ,让其生效。

    集成RabbitMQ

    这个就非常简单了,直接在maven工程的pom.xml文件中加入

    <``dependency``>

    <``groupId``>org.springframework.boot</``groupId``>

    <``artifactId``>spring-boot-starter-amqp</``artifactId``>

    </``dependency``>

    Spring Boot的版本我使用的是 2.0.1.RELEASE .

    接下来在 application.properties 文件中加入redis配置:

    spring.rabbitmq.host=127.0.0.1

    spring.rabbitmq.port=5672

    spring.rabbitmq.username=guest

    spring.rabbitmq.password=guest

    定义ConnectionFactory和RabbitTemplate

    也很简单,代码如下:

    package com.mq.rabbitmq;

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

    import org.springframework.amqp.rabbit.connection.ConnectionFactory;

    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    import org.springframework.boot.context.properties.ConfigurationProperties;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    @Configuration

    @ConfigurationProperties``(prefix = "spring.rabbitmq"``)

    public class RabbitMqConfig {

    private String host;

    private int port;

    private String userName;

    private String password;

    @Bean

    public ConnectionFactory connectionFactory() {

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);

    cachingConnectionFactory.setUsername(userName);

    cachingConnectionFactory.setPassword(password);

    cachingConnectionFactory.setVirtualHost(``"/"``);

    cachingConnectionFactory.setPublisherConfirms(``true``);

    return cachingConnectionFactory;

    }

    @Bean

    public RabbitTemplate rabbitTemplate() {

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());

    return rabbitTemplate;

    }

    public String getHost() {

    return host;

    }

    public void setHost(String host) {

    this``.host = host;

    }

    public int getPort() {

    return port;

    }

    public void setPort(``int port) {

    this``.port = port;

    }

    public String getUserName() {

    return userName;

    }

    public void setUserName(String userName) {

    this``.userName = userName;

    }

    public String getPassword() {

    return password;

    }

    public void setPassword(String password) {

    this``.password = password;

    }

    }

    Exchange和Queue配置

    package com.mq.rabbitmq;

    import org.springframework.amqp.core.*;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import java.util.HashMap;

    import java.util.Map;

    @Configuration

    public class QueueConfig {

    @Bean

    public CustomExchange delayExchange() {

    Map<String, Object> args = new HashMap<>();

    args.put(``"x-delayed-type"``, "direct"``);

    return new CustomExchange(``"test_exchange"``, "x-delayed-message"``,``true``, false``,args);

    }

    @Bean

    public Queue queue() {

    Queue queue = new Queue(``"test_queue_1"``, true``);

    return queue;

    }

    @Bean

    public Binding binding() {

    return BindingBuilder.bind(queue()).to(delayExchange()).with(``"test_queue_1"``).noargs();

    }

    }

    这里要特别注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的类型必须是 x-delayed-message 。

    实现消息发送

    package com.mq.rabbitmq;

    import org.springframework.amqp.AmqpException;

    import org.springframework.amqp.core.Message;

    import org.springframework.amqp.core.MessagePostProcessor;

    import org.springframework.amqp.rabbit.core.RabbitTemplate;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Service;

    import java.text.SimpleDateFormat;

    import java.util.Date;

    @Service

    public class MessageServiceImpl {

    @Autowired

    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String queueName,String msg) {

    SimpleDateFormat sdf = new SimpleDateFormat(``"yyyy-MM-dd HH:mm:ss"``);

    System.out.println(``"消息发送时间:"``+sdf.format(``new Date()));

    rabbitTemplate.convertAndSend(``"test_exchange"``, queueName, msg, new MessagePostProcessor() {

    @Override

    public Message postProcessMessage(Message message) throws AmqpException {

    message.getMessageProperties().setHeader(``"x-delay"``,``3000``);

    return message;

    }

    });

    }

    }

    注意在发送的时候,必须加上一个header

    x-delay

    在这里我设置的延迟时间是3秒。

    消息消费者

    package com.mq.rabbitmq;

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;

    import org.springframework.amqp.rabbit.annotation.RabbitListener;

    import org.springframework.stereotype.Component;

    import java.text.SimpleDateFormat;

    import java.util.Date;

    @Component

    public class MessageReceiver {

    @RabbitListener``(queues = "test_queue_1"``)

    public void receive(String msg) {

    SimpleDateFormat sdf = new SimpleDateFormat(``"yyyy-MM-dd HH:mm:ss"``);

    System.out.println(``"消息接收时间:"``+sdf.format(``new Date()));

    System.out.println(``"接收到的消息:"``+msg);

    }

    }

    运行Spring Boot程序和发送消息

    直接在main方法里运行Spring Boot程序,Spring Boot会自动解析 MessageReceiver 类的。

    接下来只需要用Junit运行一下发送消息的接口即可。

    package com.mq.rabbitmq;

    import org.junit.Test;

    import org.junit.runner.RunWith;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.boot.test.context.SpringBootTest;

    import org.springframework.test.context.junit4.SpringRunner;

    @RunWith``(SpringRunner.``class``)

    @SpringBootTest

    public class RabbitmqApplicationTests {

    @Autowired

    private MessageServiceImpl messageService;

    @Test

    public void send() {

    messageService.sendMsg(``"test_queue_1"``,``"hello i am delay msg"``);

    }

    }

    相关文章

      网友评论

          本文标题:rabbitmq发送延迟消息

          本文链接:https://www.haomeiwen.com/subject/zdykhhtx.html