官网:https://github.com/seata/seata/wiki/%E6%A6%82%E8%A7%88
首先,跑一下官网的spring cloud的例子。
大概步骤:
1、跑spring cloud对应的sql脚本。
2、修改几个服务的数据库连接密码。
3、下载seate服务端程序,运行起来。
4、把spring cloud的几个服务跑起来。
5、调一下服务,发现事务起作用了。
然后,把seate集成到自己的spring cloud项目中:
1、添加两个配置文件。
2、修改application.yml配置文件。
3、添加datasouce的代理。
4、添加数据库undo_log表。
5、添加注解进测试。
6、测试通过。
文末是undo_log的大概逻辑源码分析。
然后,研读官网介绍。
做一个大概的解读。
二、seate的发展:前身是txc,然后发布到阿里云变成了gts,然后今年开源,变成了fescar,然后又改名成seate。
2.1 设计初衷
- 业务无侵入:引入分布式事务,不能影响业务。比如,tcc着这种人工补偿是不行的。(虽然seate也有补偿,但是是不需要人工干预的)
- 高性能:不能拖慢业务。比如xa机制,只有等到所有事务都执行完了,才释放锁,严重影响事务。(曾经用过lcn,这个东西就是二阶段提交,但是spring boot 2支持的很不好,最后放弃)
高性能方面,体现在本地事务执行完,就commit释放锁,这方面,但是,如果都是seate事务,还是持有全局锁。
2.2 已有的分布式事务方案
xa: 单服务多数据库事务,好做,多服务,则tc要抽出来,性能各方面,各种不好做。
tcc:手写补偿业务,不妥。
基于消息中间件的最终一致性:异常回滚方面先捕获,通用性不好,而且特别麻烦。
2.3 理想的分布式事务
能够想本地事务一样,解决分布式事务问题,通过一个注解transaction注解,就能搞定。
lcn也能入本地事务一样搞定分布式事务,但是这个东西,性能不好,而且各种问题,更可恶的是,spring boot2根本无法完。
三、原理与设计
3.三个组件
可以看到,比mysql的xa,多了一个tc部分。
transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
XID 在微服务调用链路的上下文中传播。
RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
TM 向 TC 发起针对 XID 的全局提交或回滚决议。
TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
XA 方案的 RM 实际上是在数据库层
seate的XA是应用层,不依赖数据库。
XA的锁要在所有事务进入commit阶段才释放。
seate,锁本地事务一执行完就释放。
seate的设计想法是:几乎90%的事务都是成功的,所以是可以马上释放锁的。
如果,有一个事务出现异常了,需要全局回滚的时候,再去争取全局锁。
3.3分支事务如何提交和回滚?
seate对jdbc做了一个代理,会分析sql,然后生成, 这条记录修改前后的镜像(也就是一个json),然后保存在事务所在数据库undo_log表中(所以每个服务都需要一张这个表)。如此,当回滚的时候,取出这个镜像,解析成反向sql,执行就完了。
所以这里的关键点在于:他能分析sql,然后解析出反向sql。也就省去了,手写补偿业务的问题。
3.4事务的传播属性
几乎都可以通过api支持,但是spring事务传播属性,这个源码我没搞明白。
3.5隔离性
事务本身有隔离级别,这是一点。
seate有一个全局锁,多个seate事务的update会有排他锁,保证事务的一致性。当然,由于本地事务执行完了就提交了,也就是数据库本身释放锁了,如果另外一个事务不是seate事务,是没有全局锁的。
seate全局上看,隔离级别是读未提交的,因为,每个本地事务,执行完了,就提交了,但是其实全局事务时没有执行完的,但是这份数据,还是会被别的事务看到,所以说,全局上看,是读未提交的。
seate说,他也可以做到,全局,读已提交的隔离级别。
4.适用场景
因为依赖本地acid特性,所以需要关系型数据库。
还不完善的地方:全局事务最高也就到读已提交的级别。而且,sql解析反向sql有些语法不支持。
前面说的是AT模式,seate还有Mt模式,mt模式是另外一种模式。
4.1分支的基本模式
分支注册:分支事务开启,向seate服务发送注册。
状态上报:分支执行完,有问题还是没问题都要发送报告。
分支提交:没问题,则提交。
分支回滚:有问题,则回滚,并且,全局回滚。
总之:seate有两个特别的思想:
- 本地事务执行完之后,就提交。通过自动生成回滚sql的形式回滚。
2.有解析sql的回滚sql的能力。
五、集成到自己项目测试
集成方式,前面说了。
关键。
A服务:
@GetMapping("/test")
@ApiOperation(value = "测试", notes = "输入参数:selectionActivitiesId")
@GlobalTransactional
public Result test() {
selectionActivitiesFeigin.test();
throw new RuntimeException("shibai");
}
远程调用B服务。
@GetMapping("/test")
public Result test() {
selectionActivitiesService.test();
// 走到这里,B服务的本地事务已经提交,数据库,已经删掉了一条记录
return Result.success("ok");
}
@Transactional
public void test() {
selectionActivitiesMapper.deleteById(14);
}
最后,回滚,删掉的记录又回来了。可见,全局事务,隔离级别是读未提交。
然后看undo_log:可以看到,rollback_info是一个blob字段,记录了操作前的记录的镜像。(其实就是一条json)
13 2013937050 192.168.106.1:8091:2013937049 (BLOB) 1.15 KB 0 2019-06-11 07:47:11 2019-06-11 07:47:11
看源码:
关于undolog的解析就是这个类。打断点。
public class JSONBasedUndoLogParser implements UndoLogParser {
@Override
public String encode(BranchUndoLog branchUndoLog) {
return JSON.toJSONString(branchUndoLog, SerializerFeature.WriteDateUseDateFormat);
}
@Override
public BranchUndoLog decode(String text) {
return JSON.parseObject(text, BranchUndoLog.class);
}
}
会发现,最后,把BranchUndoLog类json化成下面这个类。记录了delete前的,这条记录的的所有字段。
image.png
UndoLogManager这个类操作的。
public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {
assertDbSupport(cp.getDbType());
ConnectionContext connectionContext = cp.getContext();
String xid = connectionContext.getXid();
long branchID = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchID);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
// 解析成json
String undoLogContent = UndoLogParserFactory.getInstance().encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: " + undoLogContent);
}
// 插入数据库
insertUndoLogWithNormal(xid, branchID, undoLogContent, cp.getTargetConnection());
}
// 这是jdbc操作,这个类,定义了很多解析sql的模板。
private static void insertUndoLog(String xid, long branchID,
String undoLogContent, State state, Connection conn) throws SQLException {
PreparedStatement pst = null;
try {
pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL);
pst.setLong(1, branchID);
pst.setString(2, xid);
pst.setBlob(3, BlobUtils.string2blob(undoLogContent));
pst.setInt(4, state.getValue());
pst.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
} finally {
if (pst != null) {
pst.close();
}
}
}
然后看回滚执行逻辑。
public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
...
// 这里反系列化,undolog的json
BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(),
sqlUndoLog);
undoExecutor.executeOn(conn);
}
}
...
}
public void executeOn(Connection conn) throws SQLException {
if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
return;
}
try {
// 把那个undo_log的json,解析成反向sql
String undoSQL = buildUndoSQL();
// 然后执行
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
TableRecords undoRows = getUndoRows();
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
Field pkValue = null;
for (Field field : undoRow.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkValue = field;
} else {
undoValues.add(field);
}
}
undoPrepare(undoPST, undoValues, pkValue);
undoPST.executeUpdate();
}
}
}
至此,大概,seate的undo_log部分,大概逻辑就是这样,存储修改前镜像json,回滚的时候,解析这个json成回滚sql,然后执行。
网友评论