一.概述
本文介绍Springboot如何集成RabbitMQ
本文内容较多,开始为最简单的自动确认类的消息发送和消费,后续有发送方确认和接收方确认机制示例
关于RabbitMQ安装就不说了,网上有很多
关于RabbitMQ的一些基础概念和知识请查看前几篇文章
二.RabbitMQ管理后台
安装好RabbitMQ并启用管理后台,访问localhost:15672,输入默认的用户密码guest/guest得到如下界面
三.SpringBoot集成
生产者集成
1.创建项目
我们首先创建一个Springboot项目demo-rabbitmq-producer
项目的
pom.xml
文件中引入依赖:
<!--springboot整合rabbitMQ只需引入amqp起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
完整的pom.xml
文件如下(注意demo-rabbitmq-common是上面项目截图的自己定义的模块项目,里面只有一个User实体类,为了后续测试发送实体类消息增加的):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-producer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>demo-rabbit-mq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot整合rabbitMQ只需引入amqp起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--demo-rabbitmq-common-->
<dependency>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
在application.yml
文件中进行配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
注意
:要保证guest用户对虚拟主机有读写操作权限,具体去RabbitMQ后台管理系统配置
2.配置序列化策略
RabbitMQ序列化的选择可以是jdk序列化,hessian,jackson,protobuf等
而对于Java应用默认的序列化采用的是jdk序列化
SimpleMessageConverter对于要发送的消息体body为字节数组时,不进行处理。
消息本身假设是String,则将String转成字节数组,假设是Java对象,则使用jdk序列化将消息转成字节数组,转出来的结果较大,含class类名、类对应方法等信息,因此性能较差。
hessian、protobuf等都是基于压缩反复字段的思想,降低数据传输量以提高性能。
jackson是以json表示来数据传输,性能优于jdk序列化
所以使用RabbitMq作为中间件时,数据量比较大,此时就要考虑使用类似Jackson2JsonMessageConverter、hessian等序列化形式,以此提高性能。
在生产者端增加RabbitMQConfig.java
:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/13/20 4:31 PM
* Description:开启消息发送消息确认
*/
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置序列化策略
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.声明交换机、队列信息
需要注意的是,虽然官方推荐在生产者和消费者都增加交换机、队列和绑定关系声明,但个人更推荐生产环境通过管理后台创建交换机、队列和声明绑定关系
,一方面可以防止因为开发人员的代码错误引发不必要的问题,另一方面也可以防止每次启动创建上述这些东西
而如果我们无论在生产者端还是在消费者端进行声明交换机、队列、将交换机和队列进行绑定,都会自动向MQ申请上述声明,MQ中如果有相同配置的声明则自动返回成功,如果没有则新建一个
本例为了方便,暂时只在生产者端进行声明,当第一次发送消息的时候会去MQ服务器申请声明信息
首先我们来一个直连交换机的例子:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: DirectRabbitConfig
* Author: TP
* Date: 12/13/20 12:01 PM
* Description: 直连交换机Config
*/
@Configuration
public class DirectRabbitConfig {
/**
* 定义队列,名字为testDirectQueue
*/
@Bean
public Queue testDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("testDirectQueue", true, true, false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("testDirectQueue", true);
}
/**
* 定义Direct类型交换机,名字为testDirectExchange
*/
@Bean
DirectExchange testDirectExchange() {
return new DirectExchange("testDirectExchange", true, false);
}
/**
* 将队列和交换机绑定,并设置路由键:testDirectRouting
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("testDirectRouting");
}
}
4.发送消息
package com.tp.demo.rabbitmq.producer.controller.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendDirectMessageController
* Author: TP
* Date: 12/13/20 12:11 PM
* Description:直连交换机消息发送Controller
*/
@RestController
public class SendDirectMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String message = "你好,我是直连交换机过来的一条消息";
//将消息携带绑定键值:testDirectRouting 发送到交换机testDirectExchange
rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", message);
return "ok";
}
}
好了,发送方就是这么简单,我们可以往MQ发送一条消息了
观察管理后台,发现队列中已经有消息了:
消费者集成
1.创建项目
新建一个Springboot项目:demo-rabbitmq-consumer
项目的pom.xml
文件中引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot整合rabbitMQ只需引入amqp起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--demo-rabbitmq-common-->
<dependency>
<groupId>com.tp</groupId>
<artifactId>demo-rabbitmq-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml文件中做如下配置:
server:
port: 7268
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
注意:生产者和消费者的虚拟主机要对应
2.配置序列化策略
因为生产者我们用了Jackson2JsonMessageConverter
进行序列化,所以消费者端我们也使用Jackson2JsonMessageConverter
进行反序列化
新建一个配置类RabbitMQConfig
如下:
package com.tp.demo.rabbitmq.consumer.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/14/20 9:03 AM
* Description:消息消费者配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 消息消费者配置JSON反序列化使用Jackson2JsonMessageConverter,与消息生产者保持一致
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
3.配置监听器
消费者端是通过监听器监听消息的,我们配置一个监听器用于接收上面的队列消息:
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: DirectListener
* Author: TP
* Date: 12/13/20 12:26 PM
* Description:直连交换机消费者监听器
* 直连交换机默认轮询所有消费者
* 如果我们定义了多个消费者监听了同一个队列,会以轮询的方式消费,且不存在重复消费
*/
@Component
public class DirectListener {
@RabbitListener(queues = "testDirectQueue")
public void onMessage(String message) {
System.out.println("DirectListener收到消息:" + message);
}
}
扩展:
- Spring对amqp的支持很灵活,在消费者端我们可以使用
org.springframework.amqp.core.Message
对象统一接收消息,也可以使用你喜欢的任意类型进行接收,但要保证发送时候和接收时候的对象类型要保持一致。(例如String
、Map
、JavaBean
都可以) - 注解@RabbitListener可以定义在方法上,也可以定义在类上,用以声明一个消息监听器。
-- 如果定义在类上,需要配合@RabbitHandler
标注在对应的方法上,指明具体使用哪个方法做监听
-- 如果定义在方法上,则可以省略@RabbitHandler
- 如果我们定义了多个相同配置的消息监听器,消费者会轮询消费,且不会重复消费
启动消费者项目,在控制台会得到如下输出:
Connected to the target VM, address: '127.0.0.1:52798', transport: 'socket'
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.1)
2020-12-14 15:29:50.092 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : Starting DemoRabbitmqConsumerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 98654 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-consumer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 15:29:50.094 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : No active profile set, falling back to default profiles: default
2020-12-14 15:29:50.805 INFO 98654 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 7268 (http)
2020-12-14 15:29:50.812 INFO 98654 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-12-14 15:29:50.812 INFO 98654 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 15:29:50.861 INFO 98654 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-12-14 15:29:50.861 INFO 98654 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 735 ms
2020-12-14 15:29:51.184 INFO 98654 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 15:29:51.396 INFO 98654 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 7268 (http) with context path ''
2020-12-14 15:29:51.398 INFO 98654 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 15:29:51.425 INFO 98654 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#37c36608:0/SimpleConnection@7e62cfa3 [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 52811]
DirectListener收到消息:你好,我是直连交换机过来的一条消息
2020-12-14 15:29:51.555 INFO 98654 --- [ main] .t.d.r.c.DemoRabbitmqConsumerApplication : Started DemoRabbitmqConsumerApplication in 1.777 seconds (JVM running for 2.34)
由此看到,消息被正确消费了,又由于我们采用的是RabbitMQ的默认消息确认机制:自动确认,所以此条消息会被RabbitMQ从队列中移除:
上述示例是直连交换机的演示,关于扇形交换机和主题交换机,这里分别给出消息生产者和消费者的代码,就不进行演示和后台截图了,相信大家自己将下面代码copy到自己项目里,操作一下就能实现
生产者端扇形交换机声明:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: FanoutRabbitConfig
* Author: TP
* Date: 12/13/20 3:58 PM
* Description:
*/
@Configuration
public class FanoutRabbitConfig {
/**
* 创建三个队列 :fanout.A fanout.B fanout.C
* 将三个队列都绑定在交换机fanoutExchange上
* 因为是扇型交换机,路由键无需配置,配置也不起作用
*/
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
生产者端主题交换机声明:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: TopicRabbitConfig
* Author: TP
* Date: 12/13/20 12:52 PM
* Description:主题交换机Config
*/
@Configuration
public class TopicRabbitConfig {
// 绑定键
private final static String man = "topic.man";
private final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
生产者端发送消息
package com.tp.demo.rabbitmq.producer.controller.simple;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendFanoutMessageController
* Author: TP
* Date: 12/13/20 4:02 PM
* Description:扇形交换机消息发送Controller
*/
@RestController
public class SendFanoutMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() throws JsonProcessingException {
String message = "message: testFanoutMessage...";
for (int i = 0; i < 200; i++) {
rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}
return "ok";
}
}
package com.tp.demo.rabbitmq.producer.controller.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: SendTopicMessageController
* Author: TP
* Date: 12/13/20 12:56 PM
* Description:主题交换机消息发送Controller
*/
@RestController
public class SendTopicMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String message = "message: MAN ";
rabbitTemplate.convertAndSend("topicExchange", "topic.man", message);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String message = "message: woman will all in";
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", message);
return "ok";
}
}
消费者端消息监听器:
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerA
* Author: TP
* Date: 12/13/20 4:07 PM
* Description:
*/
@Component
public class FanoutListenerA {
@RabbitListener(queues = "fanout.A")
public void onMessage(String message) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("FanoutListenerA收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerC
* Author: TP
* Date: 12/13/20 4:08 PM
* Description:
*/
@Component
public class FanoutListenerB {
@RabbitListener(queues = "fanout.B")
public void onMessage(String message) {
System.out.println("FanoutListenerB收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: FanoutListenerC
* Author: TP
* Date: 12/13/20 4:08 PM
* Description:
*/
@Component
public class FanoutListenerC {
@RabbitListener(queues = "fanout.C")
public void onMessage(String message) {
System.out.println("FanoutListenerC收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: TopicListener1
* Author: TP
* Date: 12/13/20 2:32 PM
* Description:
*/
@Component
public class TopicListener1 {
@RabbitListener(queues = "topic.man")
public void onMessage(String message) {
System.out.println("TopicListener1收到消息:" + message);
}
}
package com.tp.demo.rabbitmq.consumer.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* FileName: TopicListener
* Author: TP
* Date: 12/13/20 2:32 PM
* Description:
*/
@Component
public class TopicListener2 {
@RabbitListener(queues = "topic.woman")
public void process(String message) {
System.out.println("TopicListener2收到消息:" + message);
}
}
三.消息可靠投递:消息确认机制
RabbitMQ中生产者端和消费者端都有消息确认机制,已尽量避免消息的丢失
消息生产者投递确认
====================================================
生产者发送确认机制共有2种回调:ConfirmCallback、ReturnCallback
两种回调函数都是在什么情况会触发呢?
总体来说,推送消息存在以下四种情况:
①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送到sever,交换机和队列啥都没找到
④消息推送成功
====================================================
①这种情况触发的是 ConfirmCallback回调函数
②这种情况触发的是 ConfirmCallback和ReturnCallback两个回调函数
③这种情况触发的是 ConfirmCallback回调函数
④这种情况触发的是 ConfirmCallback回调函数
====================================================
我们可以在回调函数根据需求做对应的扩展或者业务数据处理
为了支持生产者消息投递确认,我们需要作如下内容:
- 在
application.yml
中增加如下配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
- 对之前生产者端的
RabbitMQConfig.java
进行改造如下:
package com.tp.demo.rabbitmq.producer.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/13/20 4:31 PM
* Description:开启消息发送消息确认
*/
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
// 设置发送方确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("ConfirmCallback >>> " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback >>> " + "确认情况:" + ack);
System.out.println("ConfirmCallback >>> " + "原因:" + cause);
});
rabbitTemplate.setReturnsCallback(e -> {
System.out.println("ReturnCallback >>> " + "消息:" + e.getMessage());
System.out.println("ReturnCallback >>> " + "回应码:" + e.getReplyCode());
System.out.println("ReturnCallback >>> " + "回应信息:" + e.getReplyText());
System.out.println("ReturnCallback >>> " + "交换机:" + e.getExchange());
System.out.println("ReturnCallback >>> " + "路由键:" + e.getRoutingKey());
});
// 设置序列化策略
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
为了方便,我们封装了一个消息发送工具:
package com.tp.demo.rabbitmq.producer.sender;
import com.tp.demo.rabbitmq.producer.utils.RandomUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* FileName: RabbitSender
* Author: TP
* Date: 12/14/20 9:16 AM
* Description:封装一个RabbitMQ发送消息对象,方便使用
* 当然,你也可以直接使用RabbitTemplate
*/
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息方法调用: 构建Message消息
public void convertAndSend(String exchange, String routingKey, Object message) {
// 时间戳+6位随机字符保证全局唯一
// 用于ack保证唯一一条消息(在做补偿策略的时候,必须保证这是全局唯一的消息)
// 在消费方可以通过message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")获取到该CorrelationData
CorrelationData correlationData = new CorrelationData(RandomUtils.UUID());
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}
这个方法中,每次发送消息的时候会生成一个全局唯一标识放入CorrelationData
,CorrelationData
我们也可以封装业务ID信息(把send方法增加个参数,在发送消息的时候指定),这样我们就可以在消息发送失败的时候,根据业务ID进行自己的补偿机制
我们发送一个不存在的交换机进行测试:
package com.tp.demo.rabbitmq.producer.controller.simple;
import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* FileName: TestSendFailCallbackController
* Author: TP
* Date: 12/14/20 10:22 AM
* Description:
*/
@RestController
public class TestSendFailCallbackController {
@Autowired
RabbitSender rabbitSender;
@GetMapping("/sendMessageFail")
public String sendDirectMessage() {
String message = "Hello,This will fail...";
//将消息携带绑定键值:testDirectRouting 发送到交换机testDirectExchange
rabbitSender.convertAndSend("none_exchange", "testDirectRouting", message);
return "ok";
}
}
查看控制台输出:
Connected to the target VM, address: '127.0.0.1:53731', transport: 'socket'
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.1)
2020-12-14 16:01:15.954 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : Starting DemoRabbitMqProducerApplication using Java 1.8.0_191 on tianpengdeMacBook-Pro.local with PID 5063 (/Users/tianpeng/workspace/tp/my-boot-rabbitmq/demo-rabbitmq-producer/target/classes started by tianpeng in /Users/tianpeng/workspace/tp/my-boot-rabbitmq)
2020-12-14 16:01:15.957 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : No active profile set, falling back to default profiles: default
2020-12-14 16:01:16.791 INFO 5063 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 7269 (http)
2020-12-14 16:01:16.800 INFO 5063 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-12-14 16:01:16.800 INFO 5063 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2020-12-14 16:01:16.854 INFO 5063 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-12-14 16:01:16.855 INFO 5063 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 862 ms
2020-12-14 16:01:17.204 INFO 5063 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-12-14 16:01:17.362 INFO 5063 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 7269 (http) with context path ''
2020-12-14 16:01:17.369 INFO 5063 --- [ main] .t.d.r.p.DemoRabbitMqProducerApplication : Started DemoRabbitMqProducerApplication in 1.72 seconds (JVM running for 2.142)
2020-12-14 16:02:26.993 INFO 5063 --- [nio-7269-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-12-14 16:02:26.993 INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-12-14 16:02:26.994 INFO 5063 --- [nio-7269-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2020-12-14 16:02:27.074 INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2020-12-14 16:02:27.100 INFO 5063 --- [nio-7269-exec-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7bab5898:0/SimpleConnection@1759112d [delegate=amqp://guest@127.0.0.1:5672/TP-HOST, localPort= 53771]
2020-12-14 16:02:27.139 ERROR 5063 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
ConfirmCallback >>> 相关数据:CorrelationData [id=20201214160227064961897]
ConfirmCallback >>> 确认情况:false
ConfirmCallback >>> 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'none_exchange' in vhost 'TP-HOST', class-id=60, method-id=40)
可以看到CorrelationData中取到了我们生成的UUID,我们可以根据这个UUID做自己的要业务补偿
顺便贴一下自定义的RandomUtils
:
package com.tp.demo.rabbitmq.producer.utils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
/**
* FileName: RandomUtils
* Author: TP
* Date: 12/14/20 9:36 AM
* Description:
*/
public class RandomUtils {
public synchronized static String UUID() {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
String current = LocalDateTime.now().format(dtf);
return current + getRandomString(6);
}
public synchronized static String UUID(String prefix) {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
String current = LocalDateTime.now().format(dtf);
return prefix + current + getRandomString(6);
}
private static String getRandomString(int length) {
StringBuffer sb = new StringBuffer();
if (length > 0) {
for (int i = 0; i < length; i++) {
sb.append(new Random().nextInt(10));
}
return sb.toString();
}
return null;
}
}
到此,消息生产者确认机制就算完成了
消息消费者投递确认
重头戏来了,消息消费者端RabbitMQ默认是自动确认的,只要消息发送到了消费者,则认为消息已被消费,消息在RabbitMQ服务器会被移除,这显然在生产环境上是极度危险的,所以我们都会设置消息消费者端的消费确认为手动,具体步骤如下:
-
改造消息消费者端的
RabbitMQConfig.java
:
package com.tp.demo.rabbitmq.consumer.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* FileName: RabbitMQConfig
* Author: TP
* Date: 12/14/20 9:03 AM
* Description:消息消费者配置
*/
@Configuration
public class RabbitMQConfig {
/**
* 消息消费者配置JSON反序列化使用Jackson2JsonMessageConverter,与消息生产者保持一致
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
注意:对于没有自己声明上述SimpleRabbitListenerContainerFactory
的同学,可以在yml中直接配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: TP-HOST #虚拟主机,可以不设置使用server默认host
listener:
simple:
acknowledge-mode: manual
而我们自己声明了SimpleRabbitListenerContainerFactory
,这时如果在yml中增加上述配置是无效的
- 监听器内进行手动消息确认
package com.tp.demo.rabbitmq.consumer.listener.ack;
import com.rabbitmq.client.Channel;
import com.tp.demo.rabbitmq.common.entity.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* FileName: DirectAckListener
* Author: TP
* Date: 12/13/20 7:02 PM
* Description:
* 注解@RabbitListener可以定义在方法上也可以定义在类上
* -- 如果定义在类上,需要配合@RabbitHandler标注在方法上,指明具体使用哪个方法做监听
* -- 如果定义在方法上,则可以省略@RabbitHandler
*/
@Component
public class DirectAckListener {
@RabbitListener(queues = "testDirectAckQueue")
public void process(Message message, Channel channel) throws IOException {
System.out.println("接收到消息总体内容:" + message);
System.out.println("实际消息内容:" + new String(message.getBody()));
// TODO 业务逻辑
// 1.获取message中的body,解析消息内容
// 2.其他业务逻辑......
// 回执情形1:消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 回执情形2:消费成功消费处理失败,重新放入队列(一定要慎用,防止造成无限返回队列->消费者->返回队列.....造成消息积压)
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 回执情形3:消费处理失败,拒绝接收(可以指定是否重新放入队列,如果消息不重新放入队列,RabbitMQ服务端会将消息移除)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 如果你很懒,不想从message中获取body再自己反序列化为想要的实体bean怎么办?
* Spring对rabbitMQ的集成允许我们直接使用bean接收,如下:直接可以用形参封装
* 扩展:我们可以在生产者端发送任意类型的消息,并且在消费者端直接用形参封装,但你必须保证用的是同一种数据类型
* 注意:如果想测试这种快捷方式,请将注解注释放开,并将上面的process全部注释掉
*/
// @RabbitListener(queues = "testDirectAckQueue")
public void process(User user, Message message, Channel channel) throws IOException {
System.out.println(user);
// TODO 业务逻辑
// ......
// 回执情形1:进行消息回执
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 回执情形2:消费成功消费处理失败,重新放入队列(一定要慎用,防止造成无限返回队列->消费者->返回队列.....造成消息积压)
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 回执情形3:消费处理失败,拒绝接收(可以指定是否重新放入队列,如果消息不重新放入队列,RabbitMQ服务端会将消息移除)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
当我们设置了消息手动确认,如果消息到了消费者而消费者一直不确认,在RabbitMQ中这条消息将会一直处于unacked
待确认状态,直到消费者与RabbitMQ断开连接,这条消息又会重新变成ready
状态,消费者重启后会重新消费消息,对于消费者手动确认,其回执方式有3种,详见上述代码的注释,这里就不再说明了
测试:
发送一条需要手动确认的消息如下:
package com.tp.demo.rabbitmq.producer.controller.ack;
import com.tp.demo.rabbitmq.common.entity.User;
import com.tp.demo.rabbitmq.producer.sender.RabbitSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
/**
* FileName: SendDirectMessageController
* Author: TP
* Date: 12/13/20 12:11 PM
* Description:直连交换机消息发送Controller
*/
@RestController
public class SendDirectAckMessageController {
@Autowired
RabbitSender rabbitSender;
@GetMapping("/sendDirectMessageAck")
public String sendDirectMessage() {
User user = new User();
user.setId(1);
user.setUserName("TP");
user.setPassWord("pwd123456");
user.setAge(18);
user.setCreateTime(LocalDateTime.now());
rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", user);
return "ok";
}
@GetMapping("/sendDirectMessageAck2")
public String sendDirectMessage2() {
for (int i = 20; i <= 30; i++) {
rabbitSender.convertAndSend("testDirectAckExchange", "testDirectAckRouting", "我是一条需要确认的消息");
}
return "ok";
}
}
我们在消费者端加上debug,让消息先不走回执,观察效果:
放开debu后:
控制台输出:
接收到消息总体内容:(Body:'{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}' MessageProperties [headers={spring_listener_return_correlation=97fb83de-fd43-4529-9d24-abef559a9fcb, spring_returned_message_correlation=20201214163516705225241, __TypeId__=com.tp.demo.rabbitmq.common.entity.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=testDirectAckExchange, receivedRoutingKey=testDirectAckRouting, deliveryTag=2, consumerTag=amq.ctag-F5bUnBjCf-umNuRMKLLUAg, consumerQueue=testDirectAckQueue])
实际消息内容:{"id":1,"userName":"TP","passWord":"pwd123456","age":18,"createTime":[2020,12,14,16,35,16,705000000]}
其他2种回执,请自行测试
本示例发送实体类消息用的User类如下:
package com.tp.demo.rabbitmq.common.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import java.time.LocalDateTime;
/**
* FileName: User
* Author: TP
* Date: 12/14/20 11:33 AM
* Description:
*/
@Data
public class User {
private Integer id;
private String userName;
private String passWord;
private Integer age;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime createTime;
}
至此,Springboot整合RabbitMQ完毕
问题
鄙人在测试中遇到如下问题:消费者端已经开启了消费手动确认,如果我们发送一条消息,消息内容为一个JavaBean
,如果在消费者端监听器进行反序列化消息内容到Message
参数时失败抛出异常了,则RabbitMQ会直接将消息移除,而不会将这条消息标记为unacked
,这会导致消息丢失
为什么会这样我也不知道,如果有高人看到此篇文章并对这种情形有理解,请留言,不胜感激!
当然了,我们可以转JSON发送String消息,然后自己接收后再解析,也可以生产者和消费者引用maven私服内的同一个jar包,同一个实体类不会出现反序列化失败的问题,不理解的是如果Message就是封装失败了,为什么会将这条消息移除呢,而不是标记为未确认呢???
后续
针对上面的问题,通过艰难的Spring-amqp源码debug,定位了问题所在:
消费端监听器如果封装参数失败会抛出:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException
,这个异常被认为为fatal异常,也就是致命异常
图中可以看出,这种情形下,Spring-amqp在进行nack的时候,是否requeue最终为false,所以不会重新放入队列中
那么针对这种情形,我们怎么解决呢?
我们可以使用死信队列,这种情况消息会被认为是死信并发送到死信队列里(如果已经配置)
并且强烈推荐生产环境为自己的所有业务队列配置上死信队列,能保证消息的可靠性,通过死信队列进行业务补偿。
网友评论