美文网首页111
新版seata来了,我们一起来拆箱看看有哪些变化

新版seata来了,我们一起来拆箱看看有哪些变化

作者: leiwingqueen | 来源:发表于2020-05-10 18:34 被阅读0次

    一、seata是什么

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

    seata官方网站

    在19年初的时候就关注过这个中间件(当时叫Fescar),并且对它的源码进行了一下分析--阿里分布式事务解决方案fescar简析。然而当时并不成熟,并不能直接用于商用,主要有以下几个问题。

    • TC的实现不完善。不支持HA,xid的生成,session的存储,锁的实现等都是以DEMO的方式提供,不能直接用于线上环境,需要二次开发
    • 回滚失败的补偿机制不完善
    • 性能问题。seata当时宣称只有在第一阶段提交的时候进行加锁,相对传统的两阶段提交对性能有比较大的提升。但现实情况是为了保证回滚操作的成功,还必须要有一个全局锁,事实上相比XA的方式我个人认为在系统的吞吐量上个人认为不会有太大的变化。而且由于这种方式实现的是类似补偿性事务的方式,又会引入一个可见性的问题(第二阶段提交前就已经能看到第一阶段提交的结果)
    • RPC框架。由于是阿里系的中间件,因此第一版实现的是基于dubbo,非dubbo的rpc框架需要根据自己的情况做二次开发。

    二、模式

    • AT
    • TCC
    • SAGA(新特性)
    • XA(新特性)

    三、核心组件

    • TC

    事务协调者

    • TM

    事务发起者。定义事务边界

    • RM

    事务参与者

    四、demo演示

    五、源码分析

    注:本文使用的版本为v1.2.0

    1. undo_log的产生和删除机制

    • undo_log的产生
    UndoLogManager类图

    查看flushUndoLogs调用栈

    flushUndoLogs:200, AbstractUndoLogManager (io.seata.rm.datasource.undo)
    processGlobalTransactionCommit:221, ConnectionProxy (io.seata.rm.datasource)
    doCommit:196, ConnectionProxy (io.seata.rm.datasource)
    lambda$commit$0:184, ConnectionProxy (io.seata.rm.datasource)
    call:-1, 362578118 (io.seata.rm.datasource.ConnectionProxy$$Lambda$197)
    execute:289, ConnectionProxy$LockRetryPolicy (io.seata.rm.datasource)
    commit:183, ConnectionProxy (io.seata.rm.datasource)
    ...
    execute:108, ExecuteTemplate (io.seata.rm.datasource.exec)
    execute:49, ExecuteTemplate (io.seata.rm.datasource.exec)
    ...
    update:927, JdbcTemplate (org.springframework.jdbc.core)
    deduct:51, StorageServiceImpl (io.seata.samples.dubbo.service.impl)
    

    ConnectionProxy为Connection的代理类。

    private void processGlobalTransactionCommit() throws SQLException {
            try {
                //**新特性,分支事务注册到TC改为在提交前进行,而不是在一开始就获取一个branchId
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e, context.buildLockKeys());
            }
            try {
                //根据数据库的类型获取对应的UndoLogManager进行刷写undolog日志
                UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
                //原connection的commit操作
                targetConnection.commit();
            } catch (Throwable ex) {
                LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
                report(false);
                throw new SQLException(ex);
            }
            if (IS_REPORT_SUCCESS_ENABLE) {
                report(true);
            }
            context.reset();
        }
    

    AbstractUndoLogManager.java

    @Override
        public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
            //通过连接代理获取连接的上下文,这里先不分析xid的传递机制,留给后面的部分进行分析
            ConnectionContext connectionContext = cp.getContext();
            if (!connectionContext.hasUndoLog()) {
                return;
            }
    
            String xid = connectionContext.getXid();
            long branchId = connectionContext.getBranchId();
    
            BranchUndoLog branchUndoLog = new BranchUndoLog();
            branchUndoLog.setXid(xid);
            branchUndoLog.setBranchId(branchId);
            //具体的undolog的内容
            branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    
            UndoLogParser parser = UndoLogParserFactory.getInstance();
            byte[] undoLogContent = parser.encode(branchUndoLog);
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
            }
            //实际的写入操作,不同的关系型数据库有不同的实现
            insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
                cp.getTargetConnection());
        }
    

    为了直观显示,我这里给出了一条undo_log的数据

               id: 32
        branch_id: 2011290555
              xid: 172.17.0.1:8091:2011290554
          context: serializer=jackson
    rollback_info: {"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"172.17.0.1:8091:2011290554","branchId":2011290555,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"storage_tbl","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"storage_tbl","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":4,"value":4},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":201}]]}]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"storage_tbl","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":4,"value":4},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":199}]]}]]}}]]}
       log_status: 0
      log_created: 2020-05-10 10:02:53
     log_modified: 2020-05-10 10:02:53
              ext: NULL
    
    

    这个操作为把用户的库存记录-2。beforeImage的count为201,afterImage的count为199。

    • RPC框架整合(xid传递)
    • 锁的机制(全局锁和局部锁)
    • session状态存储
    • 全局ID生成

    相关文章

      网友评论

        本文标题:新版seata来了,我们一起来拆箱看看有哪些变化

        本文链接:https://www.haomeiwen.com/subject/zmtsnhtx.html