1、前言
我们在事务中经常会遇到这种情况,向数据库进行一些数据修改后,再将消息塞进 mq,供其他系统消费。但是这个过程中,数据库修改失败可以进行回滚,但 mq 塞消息却回滚不了;或者数据库修改能保证一定修改成功,mq 却不能保证消息一定到达。
针对数据库回滚了,其实我们可以使用一个补偿定时任务,针对数据状态没有改变的数据(数据必须有状态),再进行捞取进行状态修改再塞到 mq,这就要求另一边消费消息的系统实现消费幂等;或者说我们啥都不用做,因为如果 provider 方是接口提供者,数据库异常对于外部来说是接口调用异常,它会重新进行调用,与定时任务的逻辑相同。
针对向 mq 投递消息丢失的情况,数据需要两个状态,首先数据修改成功会修改第一个状态,但是只有消费方消费消息成功修改数据才会修改第二个状态(针对消费者如何修改生产者的数据库状态,生产者可以提供一个接口给消费者调用,不用怕调用量多大,因为消费者是以恒定的速率消费的)。此时也应该有一个定时任务,会扫描第一个状态改变,而第二个状态没改变的数据,塞进 mq。
其实我本人觉得,这种设计方式可能比所谓的事务消息的效率要高,因为它没有阻塞原来的流程,只是针对原来流程的异常场景做一些补偿措施。但是它需要数据有状态机,针对原来无状态的数据,用这种方式意味着对原来的系统进行改造,成本可能会高。对于 RocketMQ 事务消息来说,它自身实现了事务,即它将自己与用户提供的业务逻辑绑定在一起。
2、流程与示例
![](https://img.haomeiwen.com/i11345146/e4c494ca5b0b3edc.png)
1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
针对 (3)而言,生产者需要提供 TransactionListener 接口 executeLocalTransaction 实现方法,即当发送事务性prepare(half)消息成功时,将调用此方法以执行本地事务。
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
针对事务补偿而言,生产者需要 TransactionListener 接口的 checkLocalTransaction 实现方法,即当没有对 prepare(half) 进行响应的时候,broker 会发送 check 消息检查本地的事务状态。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
示例
Producer.java
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group6");
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
//生产者这是监听器
producer.setTransactionListener(transactionListener);
//启动消息生产者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//producer.shutdown();
}
}
TransactionListenerImpl.java
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
// 可以根据不同表现返回不同状态
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
// 查本地事务状态
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3、原理
1)事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,延时消息的实现机制也是如此。
这种替换思路很常见,比如 dfs 中文件替换思路。
2)Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。
先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。
RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
3)Op消息的存储和对应关系
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
![](https://img.haomeiwen.com/i11345146/d98a69cd4c1d1991.png)
Op 消息最大的用处是标识 half 消息是否到了终态(commit 或 rollback),因为没有终态的消息需要回查来确定状态(half 为啥不自己标识消息的状态呢?不是很清楚,我也只是从概念上认识,没有深入源码)
4)Half消息的索引构建
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
5)如何处理二阶段失败的消息?
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
整体流程如下:
![](https://img.haomeiwen.com/i11345146/bbc297efbdc1799e.png)
3、后记
RocketMQ 的事务消息能不能用呢?我觉得见仁见智。它确实提供一个比较简单的方式让我们实现事务,但与之而来的就是性能问题。所以设计架构时,我们可能不能单一依赖某一个中间件的能力,而是应该把思路放开一点,用多种手段保证系统的准确性。比如我们可以根据业务上来设计,用文章开头我们提到的补偿措施来避免失败的问题。
我们的 mq 是怎么用的呢?就是用的上面的补偿措施。具体到消息分发,很简单,每个 message 都有相应 type 属性,根据 message 不同的 type 触发相应的动作。我们有三种 producer,分别是 rmqProducer、ngProducer、fbProducer,分别放到三个不同的 topic 中。consumer 消费相应的 topic 进行消费,消费到的消息根据不同的 type 做动作。
网友评论