本文主要介绍如何在Spring Boot项目中使用RabbitMQ,源代码请参考:
https://github.com/davenkin/spring-amqp-learning
本文不是RabbitMQ的入门文章,需要事先对RabbitMQ的概念有所了解,初学者建议先行阅读RabbitMQ官网文档或者笔者的RabbitMQ学习笔记。
Hello World
Spring AMQP在默认情况下已经为我们配置好了一个基本可用的RabbitMQ基础实施,这里我们先采用简单配置的方式来完成一个Hello World。
- 第1步,本地启动RabbitMQ:创建一个docker-compose.yml文件:
version: "2"
services:
rabbitmq:
restart: always
container_name: spring-amqp-learning-rabbitmq
image: rabbitmq:3-management
networks:
- ecommerce-order-net
environment:
- "RABBITMQ_DEFAULT_USER=rabbitmq-user"
- "RABBITMQ_DEFAULT_PASS=rabbitmq-password"
volumes:
- ecommerce-order-rabbitmq-data:/var/lib/rabbitmq
ports:
- "15673:15672"
- "5673:5672"
networks:
ecommerce-order-net:
driver: bridge
volumes:
ecommerce-order-rabbitmq-data:
driver: local
此时,切换到在与docker-compose.yml
文件相同目录,命令行运行docker-compose up
后,通过http://localhost:15673/可以访问RabbitMQ的管理页面:

- 第2步,加入对spring AMQP的依赖:在
build.gradle
文件中:
implementation("org.springframework.boot:spring-boot-starter-amqp")
- 第3步,Spring连接RabbitMQ,在
application.yml
文件中配置:
spring:
rabbitmq:
host: localhost
port: 5673
virtual-host: /
username: rabbitmq-user
password: rabbitmq-password
- 第4步,配置Queue:创建
RabbitHelloWorldConfiguration.java
:
@Configuration
public class RabbitHelloWorldConfiguration {
@Bean
public Queue helloWorldQueue() {
return new Queue("HelloWorldQueue", false, false, false);
}
}
- 第5步,发送消息:创建
RabbitHelloWorldController.java
:
@RestController
@RequestMapping(value = "/rabbit/helloworld")
public class RabbitHelloWorldController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping
public void helloWorld() {
rabbitTemplate.convertAndSend("HelloWorldQueue", "HelloWorld!" + LocalDateTime.now().toString());
}
}
- 第6步,接收消息:创建
RabbitHelloWorldListener.java
:
@Component
@RabbitListener(queues = "HelloWorldQueue")
public class RabbitHelloWorldListener {
private Logger logger = AutoNamingLoggerFactory.getLogger();
@RabbitHandler
public void receiveHelloWorld(String queueMessage) {
logger.info("Received message:{}", queueMessage);
}
}
- 第7步,见证奇迹:启动程序
./run.sh
,发送消息curl http://localhost:8080/rabbit/helloworld
,此时可以在命令行中看到日志输出:
2019-07-25 09:17:59.871 -- INFO [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] c.e.o.r.h.RabbitHelloWorldListener : Received message:HelloWorld!2019-07-25T09:17:59.869
表明消息已经成功发送并接收。
在对上文的“Hello World”做出解释之前,先介绍一下Spring AMQP中的主要对象类以及Spring Boot对RabbitMQ的默认配置。
Spring AMQP的主要对象
Spring AMQP主要对象类如下:
类 | 作用 |
---|---|
Queue | 对应RabbitMQ中Queue |
AmqpTemplate | 接口,用于向RabbitMQ发送和接收Message |
RabbitTemplate | AmqpTemplate的实现类 |
@RabbitListener | 指定消息接收方,可以配置在类和方法上 |
@RabbitHandler | 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用 |
Message | 对RabbitMQ消息的封装 |
Exchange | 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等 |
Binding | 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作 |
AmqpAdmin | 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作 |
RabbitAdmin | AmqpAdmin的实现类 |
ConnectionFactory | 创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装 |
CachingConnectionFactory | Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection |
Connection | Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装 |
SimpleConnection | Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类 |
MessageListenerContainer | 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理 |
RabbitListenerContainerFactory | 接口,用于创建MessageListenerContainer |
SimpleMessageListenerContainer | MessageListenerContainer的实现类 |
SimpleRabbitListenerContainerFactory | RabbitListenerContainerFactory的实现类 |
RabbitProperties | 用于配置Spring AMQP的Property类 |
对于发送方而言,需要做以下配置:
- 配置CachingConnectionFactory
- 配置Exchange/Queue/Binding
- 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
- 配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送
对于消费方而言,需要做以下配置:
- 配置CachingConnectionFactory
- 配置Exchange/Queue/Binding
- 配置RabbitAdmin创建上一步的Exchange/Queue/Binding
- 配置RabbitListenerContainerFactory
- 配置@RabbitListener/@RabbitHandler用于接收消息
可以看到,在Spring中使用RabbitMQ的配置还不少,但是Spring Boot已经帮我们做了很多默认配置,才使得我们刚才的“Hello World”可以如此简单的完成。
Spring Boot的默认RabbitMQ设施
在默认情况下,Spring Boot为我们配置了一下RabbitMQ的对象:
- 一个CachingConnectionFactory
- 一个名为“rabbitListenerContainerFactory”的
SimpleRabbitListenerContainerFactory
,默认自动查找IoC容器中的MessageConverter - 一个RabbitTemplate
- 一个AmqpAdmin
因此,通常情况下,我们只需要配置自己的Exchange/Queue/Binding即可。
对Hello World的解释
有了以上知识,再让我们来看看刚才的“Hello World”背后发生了什么:
- 首先,我们配置了一个名为“HelloWorldQueue”的Queue,但是并没有配置Exchange,也没有配置“HelloWorldQueue”的Binding,这其实使用了RabbitMQ的默认行为,即所有Queue都以其自身的名称为routingKey绑定到了一个默认的Exchange上,该默认Exchange的名称为
“”
。 - 由于Spring Boot自动配置了AmqpAdmin,该AmqpAdmin将自动向RabbitMQ创建名为“HelloWorldQueue”的Queue。
- 在发送消息的时候,我们直接使用了RabbitTemplate,这个RabbitTemplate是Spring Boot自动为我们配置好的,RabbitTemplate所依赖的CachingConnectionFactory也由Spring Boot自动配置。
- 发送消息时指定了routingKey为“HelloWorldQueue”,但是没有指定Exchange,此时消息将会发送到RabbitMQ默认的Exchange,又由于名为“HelloWorldQueue”的Queue向默认Exchange绑定的routingKey正是“HelloWorldQueue”,因此消息将由默认Exchange转发到名为“HelloWorldQueue” 的Queue。至此发送方任务结束。
- 在消息接收方,由于Spring Boot默认为我们配置了SimpleRabbitListenerContainerFactory,因此只需要配置@RabbitListener和@RabbitHandler接收消息即可。
另外,Spring Boot采用了很多默认配置,通过统一的RabbitProperties
同时完成消费方和生产方的配置,在默认情况下主要的配置如下:
配置项 | 默认值 | 作用 |
---|---|---|
host | localhost | RabbitMQ服务器地址 |
port | 5672 | RabbitMQ服务器端口 |
username | 账户名 | guest |
password | 密码 | guest |
virtualHost | RabbitMQ虚拟主机名 | / |
publisherConfirms | false | 设置是否启用生产方确认 |
publisherReturns | false | 设置是否启用生产方消息返回 |
ssl | 对象 | 配置SSL,默认停用 |
template | 对象 | 设置RabbitTemplate |
template.retry | 默认停用 | 设置RabbitTemplate发送消息时的重试,主要用于RabbitTemplate与RabbitMQ之间的网络连接 |
template.mandatory | false | 设置发送消息失败时(无接收queue)是否return 消息,与return callback一并使用 |
template.exchange | "" | 默认发送的exchange |
template.routingKey | "" | 默认发送消息时的routing key |
template.defaultReceiveQueue | null | 默认接收消息的queue |
listener.simple | 对象 | 设置SimpleRabbitListenerContainerFactory |
listener.direct | 对象 | 设置DirectRabbitListenerContainerFactory |
listener.simple.concurrency | null | 并发消费方数量 |
listener.simple.acknowledgeMode | AUTO | 设置消费方确认模式,这里的AUTO与RabbitMQ的自动确认不是一回事 |
listener.simple.prefetch | 250 | 设置消费方一次性接收消息的条数 |
listener.simple.defaultRequeueRejected | true | 当Listener发生异常时是否requeue |
listener.simple.retry | 对象 | 设置Listener的重试机制,默认停用,当启用时,Listener对于消息处理过程中的异常将进行requeue重试,超过重试次数再抛弃,此时AmqpRejectAndDontRequeueException异常也会被重试 |
所有默认配置如下:
spring:
rabbitmq:
host: localhost
port: 5673
virtual-host: /
username: rabbitmq-user
password: rabbitmq-password
ssl:
enabled: false
keyStore:
keyStoreType: PKCS12
keyStorePassword:
trustStore:
trustStoreType: JKS
trustStorePassword:
algorithm:
validateServerCertificate: true
verifyHostname: true
addresses:
requestedHeartbeat:
publisherConfirms: false
publisherReturns: false
connectionTimeout:
cache:
channel:
size:
checkoutTimeout:
connection:
mode: CHANNEL
size:
listener:
type: SIMPLE
simple:
autoStartup: true
acknowledgeMode:
prefetch:
defaultRequeueRejected:
idleEventInterval:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
stateless: true
concurrency:
maxConcurrency:
transactionSize:
missingQueuesFatal: true
direct:
autoStartup: true
acknowledgeMode:
prefetch:
defaultRequeueRejected:
idleEventInterval:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
stateless: true
consumersPerQueue:
missingQueuesFatal: false
template:
retry:
enabled: false
maxAttempts: 3
initialInterval: PT1S
multiplier: 1
maxInterval: PT10S
mandatory:
receiveTimeout:
replyTimeout:
exchange: ''
routingKey: ''
defaultReceiveQueue:
parsedAddresses:
Exchange和Queue的定义
在使用RabbitMQ时,Exchange和Queue的定义是需要精心设计的,在笔者的RabbitMQ最佳实践文章中,我建议每个微服务对应一个Exchange等方式。另外,除了定义正常情况下的Exchange和Queue之外,我们还需要考虑到异常情况,因此我们还需要配置死信交换(Dead Letter Exchange, DLX)和死信队列(Dead Letter Queue, DLQ)用于对异常情况的处理。
接下来,我们围绕以下业务场景来定义相应的Exchange和Queue:
在电商系统中,当订单(Order)创建之后,需要给用于发送短信通知。
我们将采用RabbitMQ来完成这个过程,即下单之后发送消息,短信服务监听该消息然后完成短信发送。
在消息发送方,首先需要定义一个Exchange作为接收消息的第一站,在OrderRabbitMqConfig.java
中配置:
//"发送方Exchange"
@Bean
public TopicExchange orderPublishExchange() {
return new TopicExchange(ORDER_PUBLISH_EXCHANGE, true, false, ImmutableMap.of("alternate-exchange", ORDER_PUBLISH_DLX));
}
对于发送方而言,我们只需要关系消息能够成功发送到Exchange即可,而不用关心该消息实际是否到达了某个Queue以及哪个Queue。因此,发送方并不具备定义Queue的职责,定义一个Exchange即可。
然而,有时发送到Exchange的消息没有任何Queue可以接收,要么因为根本就没有绑定Queue,要么消息的RoutingKey设置导致无法路由到任何Queue。严格意义上讲,这些问题也不是发送发需要关心的事情,然而在很多时候,消息的发送方和接收方事实上都是关系密切的系统,为了更好的支持接收方,笔者建议在发送方将那些无法路由到任何Queue的消息先保存下来,以便后续处理。在RabbitMQ中,可以通过定义Exchange时的alternate-exchange
参数指定当消息无法路由时应该发到的Exchange。
在上面定义的ORDER_PUBLISH_EXCHANGE
中,我们其实已经指定了该Exchange的alternate-exchange
为ORDER_PUBLISH_DLX
,ORDER_PUBLISH_DLX
的定义如下:
//"发送方DLX",消息发送失败时传到该DLX
@Bean
public TopicExchange orderPublishDlx() {
return new TopicExchange(ORDER_PUBLISH_DLX, true, false, null);
}
另外,定义一个Queue(ORDER_PUBLISH_DLQ
)用于存储ORDER_PUBLISH_DLX
中的消息:
//"发送方DLQ",所有发到"发送DLX"的消息都将路由到该DLQ
@Bean
public Queue orderPublishDlq() {
return new Queue(ORDER_PUBLISH_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
}
//"发送方DLQ"绑定到"发送方DLX"
@Bean
public Binding orderPublishDlqBinding() {
return BindingBuilder.bind(orderPublishDlq()).to(orderPublishDlx()).with("#");
}
此时,当发送方的消息到达了ORDER_PUBLISH_EXCHANGE
而无法路由到任何Queue时,该消息会被RabbitMQ发送到ORDER_PUBLISH_DLX
,进而路由到ORDER_PUBLISH_DLQ
。
至此,发送发的RabbitMQ配置完成。
在消息接收方,首先定义用于接收消息的Queue(ORDER_RECEIVE_QUEUE
):
//接收方的所有消息都发送到该"接收方Queue",即"接收方queue"可以绑定多个"发送方Exchange"
@Bean
public Queue orderReceiveQueue() {
ImmutableMap<String, Object> args = ImmutableMap.of(
"x-dead-letter-exchange",
ORDER_RECEIVE_DLX,
"x-overflow",
"drop-head",
"x-max-length",
300000,
"x-message-ttl",
24 * 60 * 60 * 1000);
return new Queue(ORDER_RECEIVE_QUEUE, true, false, false, args);
}
//"接收方queue"绑定到"发送方exchange"
@Bean
public Binding orderReceiveBinding() {
return BindingBuilder.bind(orderReceiveQueue()).to(orderPublishExchange()).with("order.#");
}
在消息处理的过程中,可能发生异常,此时需要将异常消息记录下来,同样采用DLX和DLQ机制。在定义ORDER_RECEIVE_QUEUE
时,事实上我们已经指定了DLX为ORDER_RECEIVE_DLX
,定义该DLX和对应的DLQ(ORDER_RECEIVE_DLQ
)为:
//"接收方DLX",消息处理失败时传到该DLX
@Bean
public TopicExchange orderReceiveDlx() {
return new TopicExchange(ORDER_RECEIVE_DLX, true, false, null);
}
//"接收方DLQ",所有发到"接收DLX"的消息都将路由到该DLQ
@Bean
public Queue orderReceiveDlq() {
return new Queue(ORDER_RECEIVE_DLQ, true, false, false, ImmutableMap.of("x-queue-mode", "lazy"));
}
//"接收方DLQ"绑定到"接收方DLX"
@Bean
public Binding orderReceiveDlqBinding() {
return BindingBuilder.bind(orderReceiveDlq()).to(orderReceiveDlx()).with("#");
}
此外,对于发送到ORDER_RECEIVE_DLQ
的死信消息,我们需要对其进行手动处理,一种处理方式便是在后台定期运行一个Job将死信消息重新处理。为此,我们需要定义一个用于手动回复的Exchange(ORDER_RECEIVE_RECOVER_EXCHANGE
),并将接收方的所有的消费Queue绑定到该Exchange:
//"接收方恢复Exchange",用于手动将"接收方DLQ"中的消息发到该DLX进行重试
@Bean
public TopicExchange orderReceiveRecoverExchange() {
return new TopicExchange(ORDER_RECEIVE_RECOVER_EXCHANGE, true, false, null);
}
//"接收方Queue"绑定到"接收方恢复Exchange"
@Bean
public Binding orderReceiveRecoverBinding() {
return BindingBuilder.bind(orderReceiveQueue()).to(orderReceiveRecoverExchange()).with("#");
}
以上关于Exchange/Queue和DLX/DLQ的配置可以成为一种模式,使得我们一方面能够尽量少地配置RabbitMQ的基础设施,另一方面又能对异常情况进行处理。在该模式下存在以下概念:
概念 | 类型 | 解释 | 命名 | 示例 |
---|---|---|---|---|
发送方Exchange | Exchange | 用于接收一个系统(比如微服务)中所有消息的Exchange,一个系统只有一个发送方Exchange
|
xxx-publish-x | order-publish-x |
发送方DLX | Exchange | 用于接收发送方无法路由的消息 | xxx-publish-dlx | order-publish-dlx |
发送方DLQ | Queue | 用于存放发送方DLX 的消息 |
xxx-publish-dlq | order-publish-dlq |
接收方Queue | Queue | 用于接收发送方Exchange 的消息,一个系统只有一个接收方Queue
|
xxx-receive-q | product-receive-q |
接收方DLX | Exchange | 用于接收消费失败的消息 | xxx-receive-dlx | product-receive-dlx |
接收方DLQ | Queue | 用于存放接收方DLX 的消息 |
xxx-receive-dlq | product-receive-dlq |
接收方恢复Exchange | Exchange | 用于接收从接收方DLQ 中手动恢复的消息,接收方Queue 应该绑定到接收方恢复Exchange
|
xxx-receive-recover-x | product-receive-recover-x |
该模式的RabbitMQ配置架构图如下:

消息序列化
RabbitMQ中所有消息都是二进制的,在Spring中我们需要在Java对象与RabbitMQ消息体之间进行相互转换。在默认情况下,Spring Boot使用了SimpleMessageConverter
用于转换String,Java的Serializable对象以及字节数组。通常情况下,我们希望使用Json格式,此时可以采用以下配置:
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(objectMapper);
messageConverter.setClassMapper(classMapper());
return messageConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
classMapper.setTrustedPackages("*");
return classMapper;
}
Spring的RabbitTemplate
和RabbitListenerContainerFactory
会自动找到以上配置的MessageConverter
并使用之。此时,在RabbitTemplate
发送消息时,它会在消息的头部加入Java对象的类型信息,然后在RabbitListener
接收到消息时,通过该头部的类型信息指导应该发序列化为哪个类的对象。
在消费消息时,配置@RabbitListener
:
@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
private Logger logger = AutoNamingLoggerFactory.getLogger();
@RabbitHandler
public void onOrderCreated(OrderCreatedEvent event) {
// send notification
//sendSmsNotification(event);
logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
throw new RuntimeException("");
}
}
其中,@RabbitListener
可以用于类和方法,而@RabbitHandler
只能用于方法,通常的用法是:@RabbitListener
用在类上,接收不同Queue的不同类型消息,然后在方法上使用@RabbitHandler
来处理特定类型的消息。
需要指出的是,不同的处理方法之间必须没有歧义,即我们需要保证对于一种类型的消息,只有一个方法可以接收。比如:对于OrderCreatedEvent,如果它还有个父类叫OrderEvent,那么如果再添加一个方法用于处理OrderEvent,那么将会报错:
@Component
@RabbitListener(queues = {ORDER_RECEIVE_QUEUE})
public class OrderEventNotificationListener {
private Logger logger = AutoNamingLoggerFactory.getLogger();
@RabbitHandler
public void onOrderCreated(OrderCreatedEvent event) {
// send notification
//sendSmsNotification(event);
logger.info("Notification sent for OrderCreatedEvent:{}", event.getOrderId());
}
@RabbitHandler
public void onOrderEvent(OrderEvent event) {
logger.info("Received order event");
}
}
此时报错为:
Caused by: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class com.ecommerce.order.order.model.OrderCreatedEvent: onOrderCreated and onOrderEvent
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.findHandlerForPayload(DelegatingInvocableHandler.java:206)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:129)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
... 26 common frames omitted
发送方确认
要使用发送方确认需要:
- 设置
publisherConfirms = true
- 在发送消息时,加入
CorrelationData
:
rabbitTemplate.convertAndSend(ORDER_PUBLISH_EXCHANGE,
"order.created",
event,
new CorrelationData(event.getOrderId()));
- 在RabbitTemplate中注册确认Callback
setConfirmCallback()
,并通过CorrelationData
确定是哪条消息被确认了:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String eventId = correlationData.getId();
if (ack) {
logger.info("Publish confirmed event[{}].", eventId);
} else {
logger.warn("Domain event[{}] is nacked while publish:{}.", eventId, cause);
}
});
在正常情况下,运行OrderApiTest.should_create_order()
,输出的日志为:
2019-07-27 14:09:13.517 -- INFO [AMQP Connection 127.0.0.1:5673] c.e.o.order.OrderApplicationService : Publish confirmed event[89cacd65a550493c95957729c4f24066].
表明消息被RabbitMQ确认已经收到。在RabbitMQ无法确保消息成功投递时,将作nack确认,比如发送方Exchange
不存在,此时输出的日志为:
2019-07-27 14:14:12.731 -- WARN [rabbitConnectionFactory1] c.e.o.order.OrderApplicationService : Domain event[d27e1afdd4ea449d827af272209e9125] is nacked while publish:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dfd' in vhost '/', class-id=60, method-id=40).
要使用发送方返回需要:
- 设置
publisherReturns = true
- 设置
template.mandatory = true
- 在RabbitTemplate中注册返回Callback
setReturnCallback()
通常来讲,我们没有必要对返回消息进行处理,因此这里不展开讲解。
消费方确认
Spring AMQP在默认情况下是启用了消费方确认的,它会根据消费方是否抛出异常来决定是ack还是nack。AcknowledgeMode
用于配置Spring AMQP的消费方确认行为,其值有以下几种:
- NONE:无消费方确认,即表示消费方自动确认,等同于RabbitMQ的
autoAck=true
- MANNUAL:消费方手动确认,用户需要自行调用
ack/nack
操作 - AUTO:默认值,Spring根据Listener中是否抛出异常来决定是ack还是nack,需要注意的是,这里的
AUTO
与RabbitMQ的autoAck
是不同的东西。
除了AcknowledgeMode
外,消费方确认的行为还受defaultRequeueRejected
参数的影响,当defaultRequeueRejected=true
(默认值)时,表示如果消费方拒绝了消息,那么消息将被重新放入Queue中以便下次重新消费,当为false
时,被拒绝的消息将不会被重新放入Queue中,而是要么直接抛弃,要么放入DLX。
另外,通过抛出指定异常可以绕开defaultRequeueRejected
的影响:
-
AmqpRejectAndDontRequeueException
:表示直接将消息抛弃或者DLX,而不是Requeue,无论defaultRequeueRejected
的值是什么。 -
ImmediateRequeueAmqpException
:与AmqpRejectAndDontRequeueException
正好相反,表示立即将消息进行Requeue处理,无论defaultRequeueRejected
的值是什么。
当Listener中抛出异常时,Spring 默认采用ConditionalRejectingErrorHandler
来判断哪些异常(通常是不能恢复的异常)可以对消息进行Requeue操作,哪些不能。
在消费消息时,还可以配置重试机制,Spring直接支持消费方的重试(spring-retry),只是在默认情况下是禁用的,可以通过以下方式启用消费方重试:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
需要注意的是,这里的重试并没有RabbitMQ的参与,只是Spring内部的重试。也就是说,重试时并不是讲消息做Requeue操作,而是拿到同一条消息在Spring内部做重复处理。默认情况下,重试最大此时为3,当达到最大重试次数时,Spring默认将使用RejectAndDontRequeueRecoverer
对消息进行恢复处理,该类将不会对消息做Requeue操作,而是直接抛弃/DLX掉。
在没有启用重试机制的情况下,并且defaultRequeueRejected=true
,此时如果业务处理过程中一直抛出异常,那么将会导致消息被不断Requeue的情况,为了避免这种情况,要么启用重试机制;要么将defaultRequeueRejected
设置为false
;或者同时启动二者。笔者推荐同时启用二者。
综上,在使用RabbitMQ时,我们希望采用以下实践:
- 默认开启生产方确认
- 默认设置
defaultRequeueRejected=false
- 默认设置消费方重试
为此,除了直接在application.yml中配置之外,还可以将这些默认配置固化到代码中,创建一个ConnectionFactory
并且默认开启生产方确认(setPublisherConfirms(true)
):
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses(rabbitProperties.getAddresses());
factory.setUsername(rabbitProperties.getUsername());
factory.setPassword(rabbitProperties.getPassword());
factory.setPort(rabbitProperties.getPort());
factory.setVirtualHost(rabbitProperties.getVirtualHost());
factory.setPublisherConfirms(true);
return factory;
}
默认配置消费方的defaultRequeueRejected=false
以及重试:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
MessageConverter messageConverter,
TaskExecutor taskExecutor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDefaultRequeueRejected(false);
factory.setPrefetchCount(rabbitProperties.getListener().getSimple().getPrefetch());
factory.setConcurrentConsumers(rabbitProperties.getListener().getSimple().getConcurrency());
factory.setMaxConcurrentConsumers(rabbitProperties.getListener().getSimple().getMaxConcurrency());
factory.setMessageConverter(messageConverter);
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
RetryOperationsInterceptor build = RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffPolicy(fixedBackOffPolicy)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
factory.setAdviceChain(build);
factory.setTaskExecutor(taskExecutor);
return factory;
}
网友评论