官方文档:
http://rocketmq.apache.org/docs/transaction-example/
1:什么是分布式事务:
单体事务:强一致性 干性事务
分布式事务:最终一致性性 柔性事务
什么是分布式事务
来源:单体应用一>拆分为分布式应用
一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障,
分布式事务常见解决方案:
2PC :两阶段提交,基于XA协议
TCC : Try、Confirms Cancel
2:RocketMQ4.X分布式事务消息架构讲解
RocketMQ事务消息:
RocketMQ提供分布事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致
半消息 Half Message
Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,
此时该消息被标记成“暂不能投递”(暂时不能被Consumer消费)状态,处于该种状态下的消息即半消息。
消息回查:
由于网络闪断、生产者应用重启等原因,导致某条事务消息的 二次确认步骤(第4步) 丢失,消息队列
RocketMQ服务端通过扫描发现某条消息长期处于“半消息”时,
需要主动向消息生产者询问 该消息的最终状态(Commit或是Rollback),该过程即消息回查。
如果步骤4丢失 则需要执行步骤 5 6 7
RocketMQ事务消息的状态
COMMIT_MESSAGE:提交事务消息,消费者可以消费此消息
ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
UNKNOW: Broker需要回查确认消息的状态
关于事务消息能否成功投递到Broker节点
事务消息producer端的生产方式和普通消息是一样的,确保消息能发送到Broker节点(具有重试机制)
关于事务消息的消费
事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到
(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)
3:RocketMQ4.X分布式事务消息的存在的问题
RocketMQ4.X分布式事务的解决了什么
仅是解决 消息的发送方的事务 以及 确保在消息发送方在事务成功之后 把消息投递到订阅端
发送方=生产者 订阅端=消费者
但并不包括 订阅端的事务失败后回滚 发送方的事务
但并不包括 订阅端的事务失败后回滚 发送方的事务
但并不包括 订阅端的事务失败后回滚 发送方的事务
而这个对于消息队列来说 其实并不是的问题:
因为我们的消息队列的目标 就是进行异步削峰
什么是消息队列:
我发送方的消息 只管发 你订阅端的逻辑成不成功和我有什么关系
如果要两个服务之间的数据库需要实现最终一致性,即是保证两个服务的事务的一致成功或者失败,
那就是要等两个服务的逻辑都完全走完,事务完全提交,这压根就不是异步削峰了。
完全可以采用调接口的方法进行,而不是采用发消息的方式。
4:TransactionProducer
生产者:
主要需要把DefaultMQProducer 换成 TransactionMQProducer
并设置使用TransactionListener 进行本地事务的监听
注意:消费者完全不用修改(只需要修改订阅的 topic就行了)
@Component
public class TransactionProducer {
// DefaultMQProducer就是我们最普通的生产者
// DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
// TransactionMQProducer 继承了 DefaultMQProducer
public void sendMessageInTransaction() throws MQClientException, InterruptedException {
TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
//该生产者所在group
transactionMQProducer.setProducerGroup("transaction_producer_group");
///如果是集群模式 以 ; 分开 "IP1:9876;IP2:9876;"
transactionMQProducer.setNamesrvAddr("47.113.101.241:9876");
//是否走Vip通道
transactionMQProducer.setVipChannelEnabled(false);
//消息同步发送失败重试次数
transactionMQProducer.setRetryTimesWhenSendFailed(3);
// //消息异步发送失败重试次数
transactionMQProducer.setRetryTimesWhenSendAsyncFailed(3);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//使用实现类
transactionMQProducer.setExecutorService(executorService);
//创建事务实现类
TransactionListener transactionListener = new TransactionListenerImpl();
transactionMQProducer.setTransactionListener(transactionListener);
transactionMQProducer.start();
//设置Topic
String topic = "transaction_test_topic";
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message(topic, tags[i % tags.length], "KEY" + i,
("Hello RocketMQ OrderNo: " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息在事务里面发送出去了
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//生产者延时关闭 因为在需要 1分钟内才能进入 UNKNOW状态消息的的回查逻辑
Thread.sleep(1000 * 60);
transactionMQProducer.shutdown();
}
}
5:TransactionListenerImpl
TransactionListenerImpl 实现了 TransactionListener 重写了相关逻辑
executeLocalTransaction 方法进行 本地事务完成后提交 设置Broker里面半成功消息的状态
checkLocalTransaction 方法用于 当Broker服务认为本地事务的并没有进行提交 或者 提交了UNKNOW后 进行回查
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* when send transactional prepare(half) message succeed,
* this method will be invoked to execute local transaction.
* 当发送半消息成功后
* 可以设置该消息在broker的状态为
* COMMIT_MESSAGE(Broker端直接消费)
* ROLLBACK_MESSAGE(Broker端直接回滚)
* UNKNOW(1分钟内才能 进入checkLocalTransaction的回查逻辑)
* null (立刻进入反查逻辑)
*
* @param msg
* @param otherParam
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object otherParam) {
log.info("executeLocalTransaction:{} ", msg.getTransactionId());
//模拟生成订单号 0 到 9
//使用orderNo进行各种业务处理 然后返回一个 status
//这里暂时设置根据orderNo执行各种业务逻辑后 返回的status为如下
Integer orderNo = transactionIndex.getAndIncrement();
Integer status = orderNo % 3;
//在这里模拟本地事务的状态
//有四种状态 UNKNOW COMMIT_MESSAGE ROLLBACK_MESSAGE 以及null
//如果是 UNKNOW 会在00秒后进入checkLocalTransaction 逻辑
//如果是 null 会立即进入checkLocalTransaction 逻辑
if (null != status) { switch (status) {
case 0:
// 订单号 0 3 6 9 成功
localTrans.put(msg.getTransactionId(), orderNo);
return LocalTransactionState.COMMIT_MESSAGE;
case 1:
// 订单号 1 4 7 失败
localTrans.put(msg.getTransactionId(), orderNo);
return LocalTransactionState.ROLLBACK_MESSAGE;
case 2:
//该msg.getTransactionId() 需要在回调时使用
// 订单号 2 5 8 进入回调
localTrans.put(msg.getTransactionId(), orderNo);
return LocalTransactionState.UNKNOW;
default:
//该msg.getTransactionId() 需要在回调时使用
localTrans.put(msg.getTransactionId(), orderNo);
return null;
}
}
return null;
}
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status,
* and this method will be invoked to get local transaction status.
*
* 在进行发送消息后 到到Broker变成半消息状态
* 执行完executeLocalTransaction 其状态是 UNKNOW 或者 null 则需要进入回查处理
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
Integer orderNo = localTrans.get(transactionId);
log.info("执行的逻辑 checkLocalTransaction:{} orderNo:{} ",transactionId,orderNo);
//根据orderNo进行逻辑回查处理
//进行回查的结果 只能是 COMMIT_MESSAGE(成功) 或者 ROLLBACK_MESSAGE(失败)
//这里暂时设置根据orderNo执行各种业务逻辑后 返回为如下
if (orderNo % 2 == 0) {
log.info("执行的逻辑 checkLocalTransaction: {} orderNo:{},COMMIT_MESSAGE", msg.getTransactionId(),orderNo);
return LocalTransactionState.COMMIT_MESSAGE;
}else
{
log.info("执行的逻辑 checkLocalTransaction: {} orderNo:{},ROLLBACK_MESSAGE", msg.getTransactionId(),orderNo);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
6:测试结果
模拟生成订单号 0 到 9
在 executeLocalTransaction(本地事务) 订单号 0 3 6 9 成功 修改消息状态为可以被消费
在 executeLocalTransaction(本地事务) 订单号 1 4 7 失败 这些消息直接被丢弃
订单号 2 5 8 进入回调
在 checkLocalTransaction(进行反查) 订单号 2 8 经过反查后确认成功 修改消息状态为可以被消费
在 checkLocalTransaction(进行反查) 订单号 5 经过反查后确认失败 这些消息直接被丢弃
7:transactionMQProducer.sendMessageInTransaction(核心方法)
1:设置为半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
2:和普通发送消息一样发送
sendResult = this.send(msg);
3:判断transactionListener 是否为空 不为空则执行重写的 executeLocalTransaction
if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
4:endTransaction 方法
事务设置消息相关的状态(4种)
通过Oneway的方式告诉Borker端 修改里面半消息的状态
5:在一个异步的方法里面 checkLocalTransaction(回查)
LocalTransactionState 为UNKNOW的状态的消息进行回查
项目连接
请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git
rocketmq-module项目模块下 transactionRocketMQ包下
网友评论