美文网首页分布式
seate分布式事务调研

seate分布式事务调研

作者: 无聊之园 | 来源:发表于2019-06-10 21:03 被阅读0次

    官网:https://github.com/seata/seata/wiki/%E6%A6%82%E8%A7%88

    首先,跑一下官网的spring cloud的例子。
    大概步骤:
    1、跑spring cloud对应的sql脚本。
    2、修改几个服务的数据库连接密码。
    3、下载seate服务端程序,运行起来。
    4、把spring cloud的几个服务跑起来。
    5、调一下服务,发现事务起作用了。

    然后,把seate集成到自己的spring cloud项目中:
    1、添加两个配置文件。
    2、修改application.yml配置文件。
    3、添加datasouce的代理。
    4、添加数据库undo_log表。
    5、添加注解进测试。
    6、测试通过。

    文末是undo_log的大概逻辑源码分析。

    然后,研读官网介绍。
    做一个大概的解读。
    二、seate的发展:前身是txc,然后发布到阿里云变成了gts,然后今年开源,变成了fescar,然后又改名成seate。

    2.1 设计初衷

    1. 业务无侵入:引入分布式事务,不能影响业务。比如,tcc着这种人工补偿是不行的。(虽然seate也有补偿,但是是不需要人工干预的)
    2. 高性能:不能拖慢业务。比如xa机制,只有等到所有事务都执行完了,才释放锁,严重影响事务。(曾经用过lcn,这个东西就是二阶段提交,但是spring boot 2支持的很不好,最后放弃)

    高性能方面,体现在本地事务执行完,就commit释放锁,这方面,但是,如果都是seate事务,还是持有全局锁。

    2.2 已有的分布式事务方案
    xa: 单服务多数据库事务,好做,多服务,则tc要抽出来,性能各方面,各种不好做。
    tcc:手写补偿业务,不妥。
    基于消息中间件的最终一致性:异常回滚方面先捕获,通用性不好,而且特别麻烦。

    2.3 理想的分布式事务
    能够想本地事务一样,解决分布式事务问题,通过一个注解transaction注解,就能搞定。
    lcn也能入本地事务一样搞定分布式事务,但是这个东西,性能不好,而且各种问题,更可恶的是,spring boot2根本无法完。

    三、原理与设计

    3.三个组件
    可以看到,比mysql的xa,多了一个tc部分。

    transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
    Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
    Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

    TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
    XID 在微服务调用链路的上下文中传播。
    RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
    TM 向 TC 发起针对 XID 的全局提交或回滚决议。
    TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

    XA 方案的 RM 实际上是在数据库层
    seate的XA是应用层,不依赖数据库。

    XA的锁要在所有事务进入commit阶段才释放。
    seate,锁本地事务一执行完就释放。
    seate的设计想法是:几乎90%的事务都是成功的,所以是可以马上释放锁的。
    如果,有一个事务出现异常了,需要全局回滚的时候,再去争取全局锁。

    3.3分支事务如何提交和回滚?
    seate对jdbc做了一个代理,会分析sql,然后生成, 这条记录修改前后的镜像(也就是一个json),然后保存在事务所在数据库undo_log表中(所以每个服务都需要一张这个表)。如此,当回滚的时候,取出这个镜像,解析成反向sql,执行就完了。
    所以这里的关键点在于:他能分析sql,然后解析出反向sql。也就省去了,手写补偿业务的问题。

    3.4事务的传播属性
    几乎都可以通过api支持,但是spring事务传播属性,这个源码我没搞明白。

    3.5隔离性
    事务本身有隔离级别,这是一点。
    seate有一个全局锁,多个seate事务的update会有排他锁,保证事务的一致性。当然,由于本地事务执行完了就提交了,也就是数据库本身释放锁了,如果另外一个事务不是seate事务,是没有全局锁的。

    seate全局上看,隔离级别是读未提交的,因为,每个本地事务,执行完了,就提交了,但是其实全局事务时没有执行完的,但是这份数据,还是会被别的事务看到,所以说,全局上看,是读未提交的。
    seate说,他也可以做到,全局,读已提交的隔离级别。

    4.适用场景
    因为依赖本地acid特性,所以需要关系型数据库。
    还不完善的地方:全局事务最高也就到读已提交的级别。而且,sql解析反向sql有些语法不支持。
    前面说的是AT模式,seate还有Mt模式,mt模式是另外一种模式。

    4.1分支的基本模式
    分支注册:分支事务开启,向seate服务发送注册。
    状态上报:分支执行完,有问题还是没问题都要发送报告。
    分支提交:没问题,则提交。
    分支回滚:有问题,则回滚,并且,全局回滚。

    总之:seate有两个特别的思想:

    1. 本地事务执行完之后,就提交。通过自动生成回滚sql的形式回滚。
      2.有解析sql的回滚sql的能力。

    五、集成到自己项目测试
    集成方式,前面说了。
    关键。
    A服务:

    @GetMapping("/test")
        @ApiOperation(value = "测试", notes = "输入参数:selectionActivitiesId")
        @GlobalTransactional
        public Result test() {
            selectionActivitiesFeigin.test();
            throw new RuntimeException("shibai");
        }
    

    远程调用B服务。

     @GetMapping("/test")
       public Result test() {
            selectionActivitiesService.test();
            // 走到这里,B服务的本地事务已经提交,数据库,已经删掉了一条记录
        return Result.success("ok");
        }
     @Transactional
        public void test() {
            selectionActivitiesMapper.deleteById(14);
        }
    

    最后,回滚,删掉的记录又回来了。可见,全局事务,隔离级别是读未提交。

    然后看undo_log:可以看到,rollback_info是一个blob字段,记录了操作前的记录的镜像。(其实就是一条json)

    13  2013937050  192.168.106.1:8091:2013937049   (BLOB) 1.15 KB  0   2019-06-11 07:47:11 2019-06-11 07:47:11 
    

    看源码:
    关于undolog的解析就是这个类。打断点。

    public class JSONBasedUndoLogParser implements UndoLogParser {
    
        @Override
        public String encode(BranchUndoLog branchUndoLog) {
            return JSON.toJSONString(branchUndoLog, SerializerFeature.WriteDateUseDateFormat);
        }
    
        @Override
        public BranchUndoLog decode(String text) {
            return JSON.parseObject(text, BranchUndoLog.class);
        }
    }
    

    会发现,最后,把BranchUndoLog类json化成下面这个类。记录了delete前的,这条记录的的所有字段。


    image.png

    UndoLogManager这个类操作的。

     public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {
            assertDbSupport(cp.getDbType());
    
            ConnectionContext connectionContext = cp.getContext();
            String xid = connectionContext.getXid();
            long branchID = connectionContext.getBranchId();
    
            BranchUndoLog branchUndoLog = new BranchUndoLog();
            branchUndoLog.setXid(xid);
            branchUndoLog.setBranchId(branchID);
            branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
            // 解析成json
            String undoLogContent = UndoLogParserFactory.getInstance().encode(branchUndoLog);
        
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Flushing UNDO LOG: " + undoLogContent);
            }
            // 插入数据库
            insertUndoLogWithNormal(xid, branchID, undoLogContent, cp.getTargetConnection());
        }
    // 这是jdbc操作,这个类,定义了很多解析sql的模板。
    private static void insertUndoLog(String xid, long branchID,
                                          String undoLogContent, State state, Connection conn) throws SQLException {
            PreparedStatement pst = null;
            try {
                pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL);
                pst.setLong(1, branchID);
                pst.setString(2, xid);
                pst.setBlob(3, BlobUtils.string2blob(undoLogContent));
                pst.setInt(4, state.getValue());
                pst.executeUpdate();
            } catch (Exception e) {
                if (!(e instanceof SQLException)) {
                    e = new SQLException(e);
                }
                throw (SQLException) e;
            } finally {
                if (pst != null) {
                    pst.close();
                }
            }
        }
    

    然后看回滚执行逻辑。

     public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
           ...
    // 这里反系列化,undolog的json
                        BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
    
                        for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                            TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(),
                                sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    }
            ...
            
        }
    public void executeOn(Connection conn) throws SQLException {
    
            if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
                return;
            }
            
            try {
                // 把那个undo_log的json,解析成反向sql
                String undoSQL = buildUndoSQL();
                 // 然后执行
                PreparedStatement undoPST = conn.prepareStatement(undoSQL);
                TableRecords undoRows = getUndoRows();
                for (Row undoRow : undoRows.getRows()) {
                    ArrayList<Field> undoValues = new ArrayList<>();
                    Field pkValue = null;
                    for (Field field : undoRow.getFields()) {
                        if (field.getKeyType() == KeyType.PrimaryKey) {
                            pkValue = field;
                        } else {
                            undoValues.add(field);
                        }
                    }
                    undoPrepare(undoPST, undoValues, pkValue);
                    undoPST.executeUpdate();
                }
            }
        }
    

    至此,大概,seate的undo_log部分,大概逻辑就是这样,存储修改前镜像json,回滚的时候,解析这个json成回滚sql,然后执行。

    相关文章

      网友评论

        本文标题:seate分布式事务调研

        本文链接:https://www.haomeiwen.com/subject/gtvtfctx.html