[TOC]
一、RabbitMQ消息发送机制
消息由生产者生产,首先并发送到交换机(Exchange),然后由交换机根据指定的路由规则将消息路由到不同的队列(Queue)中。最后由不同的消费者去消费处理。
RabbitMQ根据RabbitMQ的机制分析,想要确保消息发送的可靠性,需要保证两个方面:
- 确认消息到达交换机,publish
- 确认消息到达队列,routes
这两步如果全成功了,说明消息已经成功发送;任何一步出现问题,消息就没发送成功。
二、方案1: 开启事务机制
在SpringBoot项目中可以通过开启RabbitMQ事务机制的方式解决,具体操作如下。
-
提供一个事物管理器
@Bean RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
-
生产者上添加事务注解并设置通信信道为事务模式
@Service public class MsgService { @Autowired RabbitTemplate rabbitTemplate; // 添加事务注解 @Transactional public void send() { // 开启事务模式 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("ecchange", "queue", "Hello World!"); throw new RuntimeException(); } }
开启事务模式以后,RabbitMQ生产者发送消息要经过如下4步:
- 客户端请求,将信道设置为事务模式;
- 服务端响应,设置完成;
- 客户端发送消息;
- 客户端提交事务;
- 服务端响应,确认事务提交。
正常情况我们发送消息只需要第三步,可以看出上面的步骤比较复杂,所以事务模式效率不是很高。
三、方案2: 发送方确认
添加配置,启用发送方确认。注意,发送方确认和事务不能同时存在,会报错。
spring:
rabbitmq:
# 消息到达交换器确认回调
publisher-confirm-type: correlated
# 消息到达队列回调
publisher-returns: true
publisher-confirm-type
枚举如下:
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
配置监听,实现回调
@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
public static final String EXCHANGE_NAME = "exchange_name";
public static final String QUEUE_NAME = "queue.name";
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with(QUEUE_NAME);
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("{}:消息成功到达交换器", correlationData.getId());
} else {
log.error("{}:消息发送失败", correlationData.getId());
}
}
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("{}:消息未成功路由到队列", message.getMessageProperties().getMessageId());
}
}
三、方案3: 失败重试
失败重试分两种情况,没链接上mq的链接重试和链接上了发送失败的发送重试。
1. 链接重试
利用Spring Boot自带的重试机制,直接通过配置开启即可:
spring:
rabbitmq:
template:
retry:
enabled: true #开启重试机制
initial-interval: 1000ms #重试起始时间间隔
max-attempts: 10 #最大重试次数
max-interval: 10000ms #最大重试时间间隔
multiplier: 2 #时间间隔系数
配置完以后,当MQ链接断开后,Spring会进行retry连接。retry时间间隔为起始时间间隔*系数
。
2. 发送重试
发送重试主要针对的是消息没有到达交换器。总体思路就是:利用消息确认机制,当消息没有到达交换器时,就会走失败回调,在回调方法中进行相应的业务补偿处理即可。
利用数据库存储发送的消息记录,创建数据表,大致字段内容如下:
字段名 | 字段类型 | 字段是否为空 | 默认值 | 备注 |
---|---|---|---|---|
id | bigint(20) unsigned | N | 主键,消息id | |
content | text | N | 消息内容 | |
state | tinyint(1) | N | 状态:0-发送中,1-成功,2-失败 | |
route_key | varchar(255) | N | 路由key | |
exchange | varcher(255) | N | 交换机 | |
count | tinyint | N | 重试次数 | |
try_time | datetime | N | 重试时间 | |
create_time | datetime | N | CURRENT_TIMESTAMP | 创建时间 |
update_time | datetime | N | CURRENT_TIMESTAMP | 更新时间 |
del_flag | tinyint(1) | N | 逻辑删除 |
操作步骤如下:
- 消息发送前,向表中插入消息发送记录,状态为0,try_time根据实际情况设置即可。
- 在confirm回调方法中,收到发送成功的回调,则将该消息的状态修改为1。
- 通过定时job扫描发送记录,筛选出状态为0,并且过了重试时间的消息,重新发送。重试次数根据实际情况判断。
注意,在发送的时候会出现重复发送的情况,所以在消费的时候需要做好幂等。
网友评论