抽空扒一下RocketMQ
的事务消息,看看具体的实现,版本4.6.0
。
基本原理
RocketMQ
的事务消息实现是基于二阶段协议实现的。即先发送半消息
,成功之后执行本次事务,之后发送事务结束消息
这里其实之前一直存在一个误解。这个误解其实一直都在怀疑只是没时间确认,因为无法确认RocketMQ怎么可能得到事务对象
在执行完半消息发送之后,会调用TransactionListener
写入数据库,写入数据库成功后等到事务正常提交之后再进行发送事务结束消息,即本地事务需要与调用TransactionListener
写入数据库需要在一个事务内。
如下,期望是本地事务reduct
执行与消息发送
在一个事务之内,以此达到数据库事务的特性,当发送结束事务消息成功后,如果此时事务异常回滚了,消息与事务的一致性就被破坏了。
public class BusinessService {
@Transactional
public void reduct() {
businessService.reduct();///
rocketmqProducer.sendMessageInTransaction(
message, args
);
}
}
相反过来,RocketMQ
的事务消息的本质是保证消息一定会发送成功,与本地事务并非是一致性。或者可以先发送半消息,然后再执行事务,之后再根据事务的结果是否执行事务结束消息。
public class BusinessService {
public BusinessService() {
rocketmqProducer.setTransactionListener(new TransactionListenerImpl());
}
public static class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return businessService.reduct();///;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return businessService.check();///;
}
}
// 对外提供该方法
public void reductAndSendMsg() {
rocketmqProducer.sendMessageInTransaction(
message, args
);
}
@Transactional
public void reduct() {
// TODO
}
}
RocketMQ
的这种基于两阶段协议的事务消息,相比普通消息而言是有了事务成功的保证,但是使用时还是要慎重,因为它非常依赖消息队列,一旦消息队列服务波动涉及到的都将无法提供服务。
源码分析
client
客户端的源码基本上都在DefaultMQProducerImpl#sendMessageInTransaction()
方法中,核心片段如下
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg); // 半消息,注意是同步的消息
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
//....
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//...
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
// .....
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
this.endTransaction(sendResult, localTransactionState, localException); // 异步消息
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
broker
服务端关注几个点
-
SendMessageProcessor
半消息处理 -
EndTransactionProcessor
事务结束处理 -
TransactionalMessageCheckService
消息回查
其次逻辑队列是RocketMQ
存储中的一个非常核心的架构,也是需要重点关注
-
RMQ_SYS_TRANS_HALF_TOPIC
半消息队列 -
RMQ_SYS_TRANS_OP_HALF_TOPIC
事务消息已处理队列 -
CID_RMQ_SYS_TRANS
事务消息由于超时或超过最大回查次数等原因的队列
这里重点看看消息回查的逻辑即可,其他的按照类名称点进去翻一翻即可。
消息回查的基本逻辑,从RMQ_SYS_TRANS_HALF_TOPIC
取出该主题下的一组逻辑队列,对队列进行循环,取出每个队列记录的上次消费的物理位置(此处的消费并非是消息被消费而是消息被处理过),同样,根据该逻辑队列从RMQ_SYS_TRANS_OP_HALF_TOPIC
中取出相对应的逻辑队列,该逻辑队列存储的试衣镜处理过的消息(包括处理成功或者失败)。之后再根据 一些参数进行判断(具体请看代码),该丢弃的丢弃(丢弃进入CID_RMQ_SYS_TRANS
队列),该回查的回查,最后将处理的队列进度更新到消费进度(consumer_table)中。
事务消息的整个逻辑大致分为三个点:
- 客户端写入半消息到
CommiLog
,在服务端有线程异步将写入的消息不断的刷到逻辑队列中,此时半消息进入RMQ_SYS_TRANS_HALF_TOPIC
,同步返回后客户端执行本地事务,执行成功后发送事务结束消息到服务端,此时再根据半消息回复为正常的消息放入到CommiLog
(更正了消息的Topic
),之后再由异步线程刷到逻辑队列(正确的待消费的逻辑队列)。其次再写入RMQ_SYS_TRANS_OP_HALF_TOPIC
消息 - 服务端每个一分钟(默认)会执行一次消息回查任务,根据
RMQ_SYS_TRANS_HALF_TOPIC
与RMQ_SYS_TRANS_OP_HALF_TOPIC
的消息取出需要被回查的消息进行回查。回查时是由服务端主动向客户端发起回查请求,该逻辑处理为异步。 - 客户端收到回查请求后进行回查,并根据回查的结果发送事务结束消息
核心代码片段就不贴出来了,因为TransactionalMessageServiceImpl#check
整个方法都是非常核心的,代码又长,有需要自行查阅。
总结
事实上个人并不是很赞同使用事务消息,至少看起来略显得鸡肋(目前的版本),在分布式事务场景下只能保证消息一定发送,而分布式事务是需要保证各个节点事务严格一致性,如果发送成功了但是没有被正确消费等等,该方法就不适合了。只能说有特定场景,但是还不具备普适性,当然技术方案自然不可能做到面面俱到,能解决部分场景已然很好。
其次服务过于依赖消息队列服务,如果消息队列出现三五分钟的动荡,上游的服务基本上是废弃的。而大多数场景下是上游的服务并非严格依赖消息队列,所以即便是消息队列动荡,上游理应可以继续提供服务,下游可能存在数据延迟更新等问题,需要介入修复。比如下单后的发货,存在三五分钟通知延迟是可以接受的,但是不能下单就不能接受。
也不能否则存在上下游严格依赖的场景。但是如果需要强一致性的解决方案,是不是应该考虑使用其他技术方案,比如Seata
网友评论