分布式事务常见方案
1. 两阶段提交方案/XA
XA规范 X/Open组织提出的分布式事务的规范
image.png同一个事务上下文中需要协调多种资源(即数据库以及消息主题或队列时)才有必要使用X/Open XA接口
1. XA规范 定义了全局事务管理器(Transaction Manager)和局部资源管理器(Resource Manager)之间的接口。
2. XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。
3. XA引入的事务管理器充当上文所述全局事务中的“协调者”角色。事务管理器控制着全局事务,管理事务生命周期,并协调资源。
4. 资源管理器负责控制和管理实际资源(如数据库或JMS队列)。目前,Oracle、Informix、DB2、Sybase和PostgreSQL等各主流数据库都提供了对XA的支持。
atomikos的实现
package com.zwz.atomikos.service;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import javax.transaction.*;
import java.sql.*;
import java.util.Properties;
/**
* @description: x https://blog.csdn.net/qq_38322527/article/details/102580683
* 在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。
* 而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。
* @date : 2020/1/7 11:52
* @author: zwz
*/
public class AtomikosExample {
private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
// 连接池基本属性
Properties p = new Properties();
p.setProperty("url", "jdbc:mysql://127.0.0.1:3306/" + dbName);
p.setProperty("user", "root");
p.setProperty("password", "root");
// 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
//atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
ds.setUniqueResourceName(dbName);
ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
ds.setXaProperties(p);
return ds;
}
public static void main(String[] args) {
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;
// TransactionEssentials 实现了JTA/XA 规范中的事务管理器 应该实现的相关接口
// UserTransactionManager 实现了 TransactionManager
// TransactionImp 实现了 Transaction
/* TransactionEssentials:
1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:
UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类
TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager
Transaction实现是com.atomikos.icatch.jta.TransactionImp*/
UserTransaction userTransaction = new UserTransactionImp();
try {
//开启事务
userTransaction.begin();
// 执行db1上的sql
conn1 = ds1.getConnection();
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
ps1.setString(1, "zwz");
ps1.executeUpdate();
ResultSet generatedKeys = ps1.getGeneratedKeys();
int userId = -1;
//获取自动生成的userId
while (generatedKeys.next()) {
userId = generatedKeys.getInt(1);
}
// 模拟异常 ,直接进入catch代码块,2个都不会提交
int i = 1 / 0;
// 执行db2上的sql
conn2 = ds2.getConnection();
ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
ps2.setInt(1, userId);
ps2.setDouble(2, 100);
ps2.executeUpdate();
//两阶段提交
userTransaction.commit();
} catch (NotSupportedException | SystemException | SQLException | HeuristicMixedException | HeuristicRollbackException | RollbackException e) {
e.printStackTrace();
try {
userTransaction.rollback();
} catch (SystemException ex) {
ex.printStackTrace();
}
} finally {
try {
if (ps1 != null) {
ps1.close();
}
if (ps2 != null) {
ps2.close();
}
if (conn1 != null) {
conn1.close();
}
if (conn2 != null) {
conn2.close();
}
ds1.close();
ds2.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
2pc:协调者询问参与者是否准备好了提交,并根据参与者的反馈情况决定向所有参与者发送commit或者rollback指令。
1. 准备阶段-投票阶段:协调者询问所有参与者是否准备好提交,参与者如果已经准备好提交则回复Prepared,否则回复Non-Prepared。
2. 提交阶段-称执行阶段。协调者如果在上一阶段收到所有参与者回复的Prepared,则在此阶段向所有参与者发送commit指令,所有参与者立即执行commit操作;否则协调者向所有参与者发送rollback指令,参与者立即执行rollback操作。
第一阶段
image事务管理器通知参与该事务的各个资源管理器,通知他们开启事务、执行SQL(暂不提交),并进入prepare状态(该状态下可执行commit / rollback)。
资源管理器接收到消息后开始准备阶段,写好事务日志并执行事务,但不提交,然后将是否就绪的消息返回给事务管理器(此时已经将事务的大部分事情做完,以后的内容耗时极小)。
第二阶段
image事务管理器在接受各个消息后,开始分析,如果有任意其一失败,则发送回滚命令,否则发送提交命令。
各个资源管理器接收到命令后,执行(耗时很少),并将提交消息返回给事务管理器。
事务管理器接受消息后,事务结束,应用程序继续执行。
JTA Java Transaction API 基于XA架构建模的
1. 在JTA中,事务管理器抽象为javax.transaction.TransactionManager接口,并通过底层事务服务(即Java Transaction Service)实现。
2. 像很多其他的Java规范一样,JTA仅仅定义了接口,具体的实现则是由供应商(如J2EE厂商)负责提供,目前JTA的实现主要有以下几种:
-
J2EE容器所提供的JTA实现(如JBoss)。
-
独立的JTA实现:如JOTM(Java Open Transaction Manager),Atomikos。这些实现可以应用在那些不使用J2EE应用服务器的环境里用以提供分布事事务保证。
2. TCC 方案
2.1 TCC方案是采用最终一致性的方式实现的服务层柔性分布式事务方案
TCC1. try阶段:这个阶段说的是对各个服务的资源做检测以及对资源进行锁定或者预留。
2. confirm阶段:这个阶段说的是在各个服务中执行实际的操作。
3. cancel阶段:如果任何一个服务的业务方法执行出错,这里就需要进行补偿,就是执行已经执行成功的业务逻辑的回滚操作。
2.2 TCC方案评估
- 优点:
- JTA方案只能解决同一服务的多数据源的分布式事务问题,TCC方案可以解决微服务架构内同一事务多个服务上连接数据库的提交操作。
- XA协议采用刚性事务方案,性能和吞吐量较低,TCC采用柔性分布式事务方案
- 缺点:
- 对业务的侵入性大
- 需要考虑预留资源
- 编写大量的try、confirm、cancel方法
- 考虑方法幂等性问题
2.3 阿里的seata方案
示例 https://github.com/seata/seata-samples/tree/master/springcloud-eureka-feign-mybatis-seata
@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)
public void create(Order order) {
LOGGER.info("------->交易开始");
//本地方法
orderDao.create(order);
//远程方法 扣减库存
storageApi.decrease(order.getProductId(),order.getCount());
//远程方法 扣减账户余额
LOGGER.info("------->扣减账户开始order中");
accountApi.decrease(order.getUserId(),order.getMoney());
LOGGER.info("------->扣减账户结束order中");
LOGGER.info("------->交易结束");
}
seata方案的事务提交过程
Seata管理的分布式事务的典型生命周期:
image.png-
TM 要求TC开始一个新的全局事务。TC生成代表全局事务的XID
-
XID 通过微服务的调用链传播
-
RM 将本地事务注册为XID到TC的相应全局事务的分支
-
TM 要求TC提交或回退相应的XID全局事务
-
TC驱动XID的相应全局事务下的所有分支事务以完成分支提交或回滚
seata相对于其它分布式事务的最大区别是在第一提交阶段就将各个事务进行了commit操作,这样节约了两个阶段持有锁的事件,提高了整体的执行效率。通过xid管理全局事务,如果要全局回滚,通过xid找到对应的回滚日志记录,通过回滚记录生成反向更新sql,进行更新回滚操作
2.4 问题:现有的TCC事务方案的性能瓶颈在哪里?能支撑高并发交易场景吗?如何优化?
-
byteTcc (纯正的TCC)基于数据库里面创建的一些表,基于表中的数据进行状态的更新
-
核心链路中的各个服务都需要跟TC这个角色进行频繁的网络通信。频繁的通络通信其实就会带来性能的开销,本来一次请求不引入分布式事务只需要100ms,此时引入了分布式事务之后可能需要200ms
-
网络请求可能还挺耗时的,上报一些分支事务的状态给TC,seata-server,选择基于那种存储来放这些分布式事务日志或者状态的,file/磁盘文件/mysql(数据库开存放对应的一些状态)
-
高并发场景下,会不会有问题,seata-server,你也需要支持扩容,也需要部署多台机器,用一个数据库来存放分布式事务的日志和状态的话,假设并发量每秒上万,分库分表,对TC背后的数据库也会有同样的压力。
-
这个时候对TC背后的db也得进行分库分表,抗更高的并发压力。
3. 本地消息表 ebay
流程示意图1. A系统在自己本地一个事务里操作同时,插入一条数据到消息表
2. 接着A系统将这个消息发送到MQ中去
3. B系统接收到消息之后,在一个事务里,先往自己本地消息表里面插一条数据,同时执行其他的业务操作。
如果这个消息已经被处理过了,那么此时这个事务会回滚,这样保证不会重复处理消息
4. B系统执行成功之后,就会更新自己本地消息表的状态以及A系统消息表的状态
5. 如果B系统处理失败了,那么就不会更新消息表状态,那么此时A系统会定时扫描自己的消息表,如果有没处理的消息,会再次发送到MQ中去,让B再次处理。
6. 这个方案保证了最终一致性,哪怕B事务失败了,但是A会不断重发消息,直到B那边成功为止。
问题
-
最大的问题在于严重依赖数据库的消息表来管理事务
-
不太适合高并发
4. 可靠消息最终一致性方案
4.1 基于RocketMQ的流程:
流程示意图1. A系统先发送一个prepared消息到RocketMQ,如果这个prepared消息发送失败就直接取消操作别执行了。
2. 如果这个消息发送成功了,那么接着执行本地事务,如果成功就告诉MQ发送确认消息,如果失败就告诉MQ回滚消息。
3. 如果发送了确认消息,那么此时B系统会接收到确认消息,然后执行本地事务。
4. MQ会自动定时轮询所有preapred消息回调你的接口。
5. 要是系统B的事务失败了咋办?
-
重试咯。自动不断重试直到成功,如果实在是不行,那么就是针对重要的资金类业务进行回滚。
-
比如B系统本地回滚后,想办法通知系统A也回滚,或者是发送报警由人工来手工回
模拟电商流程
滚和补偿。
代码示例 -
生产者
package com.bfxy.rocketmq.transaction;
import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @date : 2020/3/19 10:06
* @author: zwz
*/
public class TransactionProducer {
public static void main(String[] args) throws UnsupportedEncodingException, MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");
ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("text_tx_producer_group_name" + "-thread");
return thread;
}
});
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.setExecutorService(executorService);
//这两个对象主要做两件事情。就是异步的执行本地事务。第二件事就是做消息回查
TransactionListener transactionListener = new TransactionListener() {
//先发送消息再执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.err.println("-----执行本地事务-----");
String callArg = (String) arg;
System.err.println("callArg:" + callArg);
System.err.println("msg:" + msg);
// tx.begin
// 数据库落库操作
// tx.commit
return LocalTransactionState.UNKNOW; //UNKONOW 消息不可达
// return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.err.println("----回调消息检查----msg:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
}
};
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message("test_tx_topic0", "tagA", "key",
("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET));
//同步发送消息,成功返回后才执行本地事务。消息发送状态和本地事务执行结果用同一个状态 LocalTransactionState
//endTransaction 中要么发送回滚消息,要么发送确认消息
producer.sendMessageInTransaction(message, "我是回调的参数");
//后面可以继续其他的任务操作
for (int i = 0; i < 100 * 1000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
- 消费者
package com.bfxy.rocketmq.transaction;
import com.bfxy.rocketmq.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
/**
* @description:
* @date : 2020/3/19 10:54
* @author: zwz
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("text_tx_producer_group_name");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("test_tx_topic0", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("收到消息=> topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
//这里执行业务逻辑:数据库的事务操作等
//应该捕获最大异常
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
问题
-
这个消息是不是本地事务处理失败了,所以没有发确认消息?
-
那是继续重试还是回滚?
-
一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。
-
这个就是避免可能本地事务执行成功了,而确认消息发送失败了。
4.2 基于其他的消息队列
-
需要自己实现一个可靠消息服务,接收Producer发送的half message,然后返回响应给Producer。如果Producer没收到响应,则重发。 然后Producer执行本地事务,接着发送commit/rollback给可靠消息服务。
-
可靠消息服务启动一个后台线程定时扫描本地数据库表中所有half message,超过一定时间没commit/rollback就回调Producer接口,确认本地事务是否成功,获取commit/rollback
-
如果消息被rollback就废弃掉,如果消息被commit就发送这个消息给下游服务,或者是发送给RabbitMQ/Kafka/ActiveMQ,都可以,然后下游服务消费了,必须回调可靠消息服务接口进行ack
-
如果一段时间都没收到ack,则重发消息给下游服务
5. 最大努力通知方案
示意图1. 系统A本地事务执行完之后,发送消息到MQ
2. 这里会有专门消费MQ的最大努力通知服务,这个服务会消费MQ然后写入数据库中记录下来,或者是放入内存队列也行,接着调用系统B的通知
3. 要是系统B执行成功就OK了,要是B执行失败了,那么最大努力通知服务就定时重新调用系统B,反复N此,最后还是不行就放弃。
真实场景
99%的分布式接口调用,不做分布式事务,直接就是监控(发邮件、发短信)、记录日志、事后快速定位、排查和出解决方案、修复数据。
网友评论