美文网首页
Seata源码解析

Seata源码解析

作者: 知止9528 | 来源:发表于2021-02-27 20:37 被阅读0次

    一:Seata使用

    第一步,增加全局事务注解

    首先依赖 Seata 的客户端 SDK,然后在整个分布式事务发起方的业务方法上增加 @GlobalTransactional 注解,下面的例子来源于 Seata-Samples dubbo 案例,purchase 是事务发起方的业务方法,通过 RPC 调用了下游库存服务和订单服务提供的接口:

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        // RPC 调用库存服务
        storageService.deduct(commodityCode, orderCount);
        // RPC 调用订单服务
        orderService.create(userId, commodityCode, orderCount);
        throw new RuntimeException("xxx");
    }
    

    第二步,配置代理数据源

    以 MySQL 为例:

    // 配置数据源
    <bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
            init-method="init" destroy-method="close">
            // …… …… 省略数据源配置
    </bean>
    
    // 关键步骤,配置 Seata 的代理数据源,代理之前配置的 accountDataSource
    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
    </bean>
    
    // 配置 applicationId 和 txServiceGroup,这主要是来标识应用和服务端集群的
    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-account-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>
    
    // 省略一些 dubbo 服务注册配置和 jdbcTemplate 配置
    

    第三步,新建 undo_log 表

    在事务链涉及的服务的数据库中新建 undo_log 表用来存储 UndoLog 信息,
    用于二阶段回滚操作,表中包含 xid、branchId、rollback_info 等关键字段信息。
    

    二:Seata AT 的工作流程

    工作流程总览


    2阶段.jpg

    AT 模式的工作流程分为两阶段。一阶段进行业务 SQL 执行,并通过 SQL 拦截、SQL 改写等过程生成修改数据前后的快照(Image),并作为 UndoLog 和业务修改在同一个本地事务中提交。

    如果一阶段成功那么二阶段仅仅异步删除刚刚插入的 UndoLog;如果二阶段失败则通过 UndoLog 生成反向 SQL 语句回滚一阶段的数据修改。其中关键的 SQL 解析和拼接工作借助了 Druid Parser 中的代码

    一阶段中分支事务的具体工作有:

    1. 根据需要执行的 SQL(UPDATE、INSERT、DELETE)类型生成相应的 SqlRecognizer
    2. 进而生成相应的 SqlExecutor
    3. 接着便进入核心逻辑查询数据的前后快照,例如图中标红的部分,拿到修改数据行的前后快照之后,将二者整合生成 UndoLog,并尝试将其和业务修改在同一事务中提交。

    整个流程的流程图如下:


    流程.jpg

    值得注意的是,本地事务提交前必须先向服务端注册分支,分支注册信息中包含由表名和行主键组成的全局锁,如果分支注册过程中发现全局锁正在被其他全局事务锁定则抛出全局锁冲突异常,客户端需要循环等待,直到其他全局事务释放锁之后该本地事务才能提交。Seata 以这样的机制保证全局事务间的写隔离

    Commit 流程

    对服务端来说,等到一阶段完成未抛异常,全局事务的发起方会向服务端申请提交这个全局事务,服务端根据 xid 查询出该全局事务后加锁并关闭这个全局事务,目的是防止该事务后续还有分支继续注册上来,同时将其状态从 Begin 修改为 Committing。

    紧接着,判断该全局事务下的分支类型是否均为 AT 类型,若是则服务端会进行异步提交,因为 AT 模式下一阶段完成数据已经落地。服务端仅仅修改全局事务状态为 AsyncCommitting,然后会有一个定时线程池去存储介质(File 或者 Database)中查询出待提交的全局事务日志进行提交,如果全局事务提交成功则会释放全局锁并删除事务日志。整个流程如下图所示:

    commit.jpg

    对客户端来说,先是接收到服务端发送的 branch commit 请求,然后客户端会根据 resourceId 找到相应的 ResourceManager,接着将分支提交请求封装成 Phase2Context 插入内存队列 ASYNC_COMMIT_BUFFER,客户端会有一个定时线程池去查询该队列进行 UndoLog 的异步删除。

    一旦客户端提交失败或者 RPC 超时,则服务端会将该全局事务状态置位 CommitRetrying,之后会由另一个定时线程池去一直重试这些事务直至成功。整个流程如下图所示:


    commit2.jpg

    Rollback 流程

    回滚相对复杂一些,如果发起方一阶段抛异常会向服务端请求回滚该全局事务,服务端会根据 xid 查询出这个全局事务,加锁关闭事务使得后续不会再有分支注册上来,并同时更改其状态 Begin 为 Rollbacking,接着进行同步回滚以保证数据一致性。除了同步回滚这个点外,其他流程同提交时相似,如果同步回滚成功则释放全局锁并删除事务日志,如果失败则会进行异步重试。整个流程如下图所示:

    rollback.jpg

    客户端接收到服务端的 branch rollback 请求,先根据 resourceId 拿到对应的数据源代理,然后根据 xid 和 branchId 查询出 UndoLog 记录,反序列化其中的 rollback 字段拿到数据的前后快照,我们称该全局事务为 A。

    根据具体 SQL 类型生成对应的 UndoExecutor,校验一下数据 UndoLog 中的前后快照是否一致或者前置快照和当前数据(这里需要 SELECT 一次)是否一致,如果一致说明不需要做回滚操作,如果不一致则生成反向 SQL 进行补偿,在提交本地事务前会检测获取数据库本地锁是否成功,如果失败则说明存在其他全局事务(假设称之为 B)的一阶段正在修改相同的行,但是由于这些行的主键在服务端已经被当前正在执行二阶段回滚的全局事务 A 锁定,因此事务 B 的一阶段在本地提交前尝试获取全局锁一定是失败的,等到获取全局锁超时后全局事务 B 会释放本地锁,这样全局事务 A 就可以继续进行本地事务的提交,成功之后删除本地 UndoLog 记录。整个流程如下图所示:

    rollback2.jpg

    数据源代理

    Seata 中主要针对 java.sql 包下的 DataSource、Connection、Statement、PreparedStatement 四个接口进行了再包装,包装类分别为 DataSourceProxy、ConnectionProxy、StatementProxy、PreparedStatementProxy,很好一一对印,其功能是在 SQL 语句执行前后、事务 commit 或者 rollbakc 前后进行一些与 Seata 分布式事务相关的操作,例如分支注册、状态回报、全局锁查询、快照存储、反向 SQL 生成等。

    ExecuteTemplate.execute

    execute.jpg

    AT 模式下,真正分支事务开始是在 StatementProxy 和 PreparedStatementProxy 的 execute、executeQuery、executeUpdate 等具体执行方法中,这些方法均实现自 Statement 和 PreparedStatement 的标准接口,而方法体内调用了 ExecuteTemplate.execute 做方法拦截.

    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                        StatementProxy<S> statementProxy,
                                                        StatementCallback<T, S> statementCallback,
                                                        Object... args) throws SQLException {
    
        // 如果不是处于全局事务中,即上游没有 xid 传递下来
        // 或者没有 GlobalLock 修饰,该数据操作不需要纳入 Seata 框架下进行管理
        // 则直接执行这个 SQL                                                        
        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
    
        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
    
            // 通过 SQL 的类型,生成不同的执行器
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
    
            // 调用执行器的 execute 方法,显然这是一个抽象方法,最后会调到三个具体的执行器实现类之一
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
    

    执行器接口 execute 的实现
    execute 方法的实现位于 BaseTransactionalExecutor 类中:

    @Override
    public Object execute(Object... args) throws Throwable {
    
        // 如果处于全局事务中,绑定 xid
        if (RootContext.inGlobalTransaction()) {
            String xid = RootContext.getXID();
            statementProxy.getConnectionProxy().bind(xid);
        }
    
        // 如果被 GlobalLock 修饰,该操作需要设置全局锁标识
        if (RootContext.requireGlobalLock()) {
            statementProxy.getConnectionProxy().setGlobalLockRequire(true);
        } else {
            statementProxy.getConnectionProxy().setGlobalLockRequire(false);
        }
    
        // 调用抽象方法 doExecute
        return doExecute(args);
    }
    

    BaseTransactionalExecutor 类中 execute 方法主要做了一些与全局事务相关的状态值的设定,继续追踪进入 doExecute 方法的实现。

    抽象方法 doExecute 的实现
    终于进入正题,doExecute 方法位于 AbstractDMLBaseExecutor 类中,该类继承自上文中的 BaseTransactionalExecutor。
    doExecute 方法体内先拿到具体的连接代理对象 connectionProxy,然后根据 Commit 标识进行不同方法的调用,但翻看代码实现时发现,其实 executeCommitTrue 方法就是先把 Commit 标识改成 false 然后再调用 executeCommitFalse 方法。

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getCommit()) {
            return executeCommitTrue(args);
        } else {
            return executeCommitFalse(args);
        }
    }
    

    executeCommitTrue 方法体中有一个无限循环,这么做的意义是,一旦分支注册时抛出锁冲突异常,则需要一直等待直到别的全局事务释放该全局锁之后才能提交自己的修改,否则一直阻塞等待。

    protected T executeCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
    
            // 先将 Commit 标识改成 false,只允许手动提交
            connectionProxy.setCommit(false);
    
            // 进入一个无限循环
            while (true) {
                try {
    
                    // 调用 executeCommitFalse 方法
                    result = executeCommitFalse(args);
    
                    // 如果分支成功,则 commit,提交本地事务,该方法也是代理方法,下文会叙述
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
    
                    // 如果全局锁冲突,可能是已经有别的事务拿到了要修改行的全局锁,则回滚
                    connectionProxy.getTargetConnection().rollback();
    
                    // 然后 sleep 一段时间,不要立即重试
                    lockRetryController.sleep(lockConflict);
                }
            }
    
        } catch (Exception e) {
    
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("exception occur", e);
            throw e;
        } finally {
            connectionProxy.setCommit(true);
        }
        return result;
    }
    

    下面我们仔细看一下 executeCommitFalse 方法的逻辑,它是实现 AT 模式的关键步骤。其中,beforeImage 是一个抽象方法,针对 INSERT、UPDATE、DELETE 有不同的实现,因为需要将这三种不同的 SQL 解析为相应的 SELECT 语句,查询操作前数据的快照;同样的 afterImage 也是一个抽象方法,来查询操作后数据的快照;statementCallback.execute 语句真正执行 SQL;prepareUndoLog 整合 beforeImage 和 afterImage 生成 UndoLog 对象。

    protected T executeCommitFalse(Object[] args) throws Throwable {
    
        // beforeImage 是一个抽象方法,针对 INSERT、UPDATE、DELETE 有不同的实现
        TableRecords beforeImage = beforeImage();
    
        // 真正执行 SQL
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    
        // 原理同 beforeImage
        TableRecords afterImage = afterImage(beforeImage);
    
        // 整合 beforeImage 和 afterImage 生成 UndoLog
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
    

    executeCommitFalse 执行过后,会调用 connectionProxy.commit() 做事务提交,我们看看该代理方法的实现。

    ConnectionProxy 复写的 commit 方法
    该 commit 方法实现自 Connection 接口的 commit 方法:

    @Override
    public void commit() throws SQLException {
    
        // 针对分支事务处理
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } 
    
        // 针对 GlobalLock 的处理
        else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    

    执行一阶段本地事务提交
    如果是分支事务,调用 processGlobalTransactionCommit 方法进行提交

    private void processGlobalTransactionCommit() throws SQLException {
        try {
    
            // 调用 RM 注册分支事务,包括行记录的主键作为全局锁
            register();
        } catch (TransactionException e) {
    
            // 如果报锁冲突异常,则 executeCommitTrue 会循环等待
            recognizeLockKeyConflictException(e);
        }
    
        try {
    
            // 分支注册成功不抛异常,则将 UndoLog 插入数据库
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
    
            // 将业务修改和 UndoLog 一并提交
            targetConnection.commit();
        } catch (Throwable ex) {
    
            // 汇报分支状态为一阶段失败,默认失败会重试五次
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
    
        // 汇报分支状态为一阶段成功 
        report(true);
        context.reset();
    }
    

    GlobalLock 的具体作用

    如果是用 GlobalLock 修饰的业务方法,虽然该方法并非某个全局事务下的分支事务,但是它对数据资源的操作也需要先查询全局锁,如果存在其他 Seata 全局事务正在修改,则该方法也需等待。所以,如果想要 Seata 全局事务执行期间,数据库不会被其他事务修改,则该方法需要强制添加 GlobalLock 注解,来将其纳入 Seata 分布式事务的管理范围。

    功能有点类似于 Spring 的 @Transactional 注解,如果你希望开启事务,那么必须添加该注解,如果你没有添加那么事务功能自然不生效,业务可能出 BUG;Seata 也一样,如果你希望某个不在全局事务下的 SQL 操作不影响 AT 分布式事务,那么必须添加 GlobalLock 注解。

    private void processLocalCommitWithGlobalLocks() throws SQLException {
    
        // 查询这些主键是不是被其他全局事务锁住,如果有就抛出锁冲突异常
        checkLock(context.buildLockKeys());
        try {
    
            // 否则提交事务,因为该方法的修改并不影响已存在的 Seata 分布式事务
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }
    

    二阶段异步删除分支 UndoLog
    如果一阶段成功,则 TC 会通知客户端 RM 进行第二阶段的提交工作,这部分代码最终实现位于 AsyncWorker 类中的 branchCommit 方法。

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                        String applicationData) throws TransactionException {
    
        // 将分支提交信息包装成 Phase2Context 插入内存中的异步提交队列
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
                + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }
    

    插入 ASYNC_COMMIT_BUFFER 之后,AsyncWorker 类中会有一个定时任务,从队列中取出分支提交信息 Phase2Context,将其中的 xid 和 branchId 提取出来生成 DELETE SQL 语句,删除本地数据库中存储的相应的 UndoLog。下面是该定时任务的关键方法 doBranchCommits 的实现:

    private void doBranchCommits() {
    
        // 如果异步提交队列是空,return
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
    
        // 该 map 报错 resourceId 和 commitContext 列表的对应关系
        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
        while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
    
            // 取出分支提交信息 commitContext
            Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<>();
    
                // 将其放入对应 RM 的提交队列中
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);
        }
    
        for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
            Connection conn = null;
            try {
                try {
    
                    // …… …… 省略,这里拿到原始 connection,因为只需做 delete 操作,没有涉及代理数据源的部分
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                    continue;
                }
    
                // …… …… 省略,批量删除 UndoLog
    
                try {
                    UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
                } catch (Exception ex) {
                    LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                }
            } 
            // …… …… 省略
        }
    }
    

    二阶段生成反向 SQL 回滚
    如果一阶段失败,则二阶段需要回滚一阶段的数据库更新操作,此时涉及到根据 UndoLog 构造逆向 SQL 进行补偿。这部分逻辑的入口位于 DataSourceManager 类中的 branchRollback 方法:

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    
        // 根据 resourceId 拿到代理数据源
        DataSourceProxy dataSourceProxy = get(resourceId);
        try {
    
            // 调用 UndoLogManager 的 undo 方法进行补偿,核心逻辑在这里
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } 
        // …… …… 省略
    }
    

    UndoLogManager 负责 UndoLog 的插入、删除、补偿等操作,其中核心方法即为 undo,我们可以看到其中有一个无限 for 循环,一旦当前事务进行二阶段回滚时获取本地锁失败,则进入循环等待逻辑,等待本地锁被释放之后自己再提交本地事务:

    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    
        // …… …… 省略
    
        for (; ; ) {
            try {
                // …… …… 省略
    
                // 构造查询 UndoLog 表的 SELECT 语句,条件为 xid 和 branchId
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                // 进行查询
                rs = selectPST.executeQuery();
    
                boolean exists = false;
                while (rs.next()) {
                    exists = true;
    
                    // 仅允许 normal 状态的 UndoLog 记录进行回顾,主要是防止 
                    int state = rs.getInt("log_status");
                    if (!canUndo(state)) {
                        return;
                    }
    
                    // 取出 rollback_info 
                    Blob b = rs.getBlob("rollback_info");
    
                    // 转为字节数组
                    byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
    
                    // JSON 反序列化为对象
                    BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
    
                    // 循环所有的 UndoLog 记录
                    for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                        TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
    
                        // 构造反向 SQL 语句,三类更新操作会生成三种不同的 UndoExecutor
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(),
                            sqlUndoLog);
    
                        // executeOn 内部会调用一个抽象方法 buildUndoSQL
                        undoExecutor.executeOn(conn);
                    }
                }
    
                // 如果该 Undo Log 存在则执行回滚后删除记录
                if (exists) {
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                } else {
    
                    // 如果不存在说明一阶段有异常,可以插入一条终止状态的 UndoLog 防止一阶段因为超时等问题,悬挂的事务提交又在补偿方法之后达到,与 TCC 中 Cancel 方法处理 Try 方法悬挂异曲同工
                    insertUndoLogWithGlobalFinished(xid, branchId, conn);
                    conn.commit();
                }
                return;
            } 
            // 如果抛出 SQLIntegrityConstraintViolationException 则进入循环,等待本地锁被释放
            // …… …… 省略
        }
    }
    

    UndoExecutorFactory 类的 getUndoExecutor 方法会根据 UndoLog 中记录的 SQLType 生成不同的 UndoExecutor 返回:

    public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
        if (!dbType.equals(JdbcConstants.MYSQL)) {
            throw new NotSupportYetException(dbType);
        }
        switch (sqlUndoLog.getSqlType()) {
            case INSERT:
                return new MySQLUndoInsertExecutor(sqlUndoLog);
            case UPDATE:
                return new MySQLUndoUpdateExecutor(sqlUndoLog);
            case DELETE:
                return new MySQLUndoDeleteExecutor(sqlUndoLog);
            default:
                throw new ShouldNeverHappenException();
        }
    }
    

    ndoExecutor 中的 executeOn 方法 首先会调用一个抽象方法 buildUndoSQL,根据 INSERT、UPDATE、DELETE 三种不同的 SQL 类型生成相应的反向 SQL 语句。

    下面我们以 DELETE 为例分析一下 MySQLUndoDeleteExecutor 类的 buildUndoSQL 方法的实现,如果一阶段已经删除了某行数据,那么二阶段补偿自然需要构造一个 INSERT 语句将被删除的行重新插入。

    protected String buildUndoSQL() {
    
        // 首先对于 MySQL 中的一些保留字,如果出现在普通 SQL 中需要增加反引号
        KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
    
        // DELETE 的 beforeImage 快照不可能是空,因为肯定有数据才会执行删除
        TableRecords beforeImage = sqlUndoLog.getBeforeImage();
        List<Row> beforeImageRows = beforeImage.getRows();
        if (beforeImageRows == null || beforeImageRows.size() == 0) {
            throw new ShouldNeverHappenException("Invalid UNDO LOG");
        }
        Row row = beforeImageRows.get(0);
        List<Field> fields = new ArrayList<>(row.nonPrimaryKeys());
        Field pkField = row.primaryKeys().get(0);
    
        // 主键放在列表最后一个
        fields.add(pkField);
        String insertColumns = fields.stream()
            .map(field -> keywordChecker.checkAndReplace(field.getName()))
            .collect(Collectors.joining(", "));
        String insertValues = fields.stream().map(field -> "?")
            .collect(Collectors.joining(", "));
    
        // "INSERT INTO %s (%s) VALUES (%s)" 依次放入表名、列名、数值
        return String.format(INSERT_SQL_TEMPLATE, keywordChecker.checkAndReplace(sqlUndoLog.getTableName()),
                                insertColumns, insertValues);
    }
    

    Seata AT 模式服务端部分

    AT 模式下,全局事务注册、提交、回滚均和 TCC 模式一模一样,均是根据一阶段调用抛不抛异常决定。
    区别在于两点:

    1. 分支事务的注册,TCC 模式下分支事务是在进入参与方 Try 方法之前的切面中注册的,而且分支实现完毕不需要再次汇报分支状态;但 AT 模式不一样,分支事务是在代理数据源提交本地事务之前注册的,注册成功才能提交一阶段本地事务,如果注册失败报锁冲突则一直阻塞等待直到该全局锁被释放,且本地提交之后不论是否成功还需要再次向 TC 汇报一次分支状态。
    2. AT 模式由于一阶段已经完成数据修改,因此二阶段可以异步提交,但回滚是同步的,回滚失败才会异步重试;但是 Seata 中 TCC 模式二阶段 Confirm 是同步提交的,可以最大程度保证 TCC 模式的数据一致性,但是笔者认为在要求性能的场景下,TCC 的二阶段也可以改为异步提交

    服务端提交全局事务
    核心方法是 DefaultCore 类中的 commit 方法:

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
    
        // 查询全局事务
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        if (globalSession == null) {
            return GlobalStatus.Finished;
        }
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
        // 锁住全局事务并关闭它,不让后续可能的分支再注册上来
        boolean shouldCommit = globalSession.lockAndExcute(() -> {
    
            //the lock should release after branch commit
            globalSession
                .closeAndClean(); 
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
            return false;
        });
        if (!shouldCommit) {
            return globalSession.getStatus();
        }
        if (globalSession.canBeCommittedAsync()) {
    
            // 如果是 AT 模式,只改变全局事务状态为 AsyncCommitting 进行异步提交
            asyncCommit(globalSession);
            return GlobalStatus.Committed;
        } else {
    
            // 如果分支里包含 TCC 的分支,进行同步提交
            doGlobalCommit(globalSession, false);
        }
        return globalSession.getStatus();
    }
    

    服务端异步提交分支事务
    DefaultCoordinator 类中有一个 asyncCommitting 定时线程池,会定时调用 handleAsyncCommitting 方法从存储介质(文件或者数据库)中分批查询出状态为 AsyncCommitting 的全局事务列表,针对每个全局事务调用 doGlobalCommit 方法提交其下所有未提交的分支事务。

    asyncCommitting.scheduleAtFixedRate(() -> {
            try {
                handleAsyncCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception async committing ... ", e);
            }
        }, 0, asynCommittingRetryDelay, TimeUnit.SECONDS);
    
    protected void handleAsyncCommitting() {
    
        // 查询待提交的全局事务列表
        Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager()
            .allSessions();
        if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
            return;
        }
        for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
            try {
                asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
                // 调用 doGlobalCommit 进行提交,通过回调每个分支事务的客户端来触发客户端 RM 完成分支事务提交
                // 如果 doGlobalCommit 成功则释放全局锁并删除事务日志
                core.doGlobalCommit(asyncCommittingSession, true);
            } catch (TransactionException ex) {
                LOGGER.info("Failed to async committing [{}] {} {}",
                    asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage());
            }
        }
    }
    

    服务端同步回滚分支事务
    一旦一阶段失败,全局事务发起方通知 TC 回滚全局事务的话,那么二阶段的回滚调用是同步进行的,一旦同步回滚失败才会进入异步重试阶段。核心方法为 DefaultCore 类中的 doGlobalRollback 方法:

    public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                globalSession.removeBranch(branchSession);
                continue;
            }
            try {
    
                // 进行回调,通知客户端 RM 回滚分支事务
                BranchStatus branchStatus = resourceManagerInbound.branchRollback(branchSession.getBranchType(),
                    branchSession.getXid(), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());
    
                // …… …… 省略
    
            } catch (Exception ex) {
                LOGGER.error("Exception rollbacking branch " + branchSession, ex);
    
                // 如果是第一次补偿,retrying 是 false,因此如果失败则进入异步重试
                if (!retrying) {
                    queueToRetryRollback(globalSession);
                }
                throw new TransactionException(ex);
            }
        }
    
        // 回滚成功的话,释放全局锁并删除事务日志
        SessionHelper.endRollbacked(globalSession);
    }
    

    回滚的异步重试与异步提交相同,都是一个定时线程池去扫描存储介质中尚未完成回滚的全局事务,因此这里不再赘述。

    Seata AT 模式的全局锁
    全局锁的组成和作用
    全局锁主要由表名加操作行的主键两个部分组成,Seata AT 模式使用服务端保存全局锁的方法保证: 1. 全局事务之前的写隔离 2. 全局事务与被 GlobalLock 修饰方法间的写隔离性

    全局锁的注册
    当客户端在进行一阶段本地事务提交前,会先向服务端注册分支事务,此时会将修改行的表名、主键信息封装成全局锁一并发送到服务端进行保存,如果服务端保存时发现已经存在其他全局事务锁定了这些行主键,则抛出全局锁冲突异常,客户端循环等待并重试。

    全局锁的查询
    被 GlobalLock 修饰的方法虽然不在某个全局事务下,但是其在提交事务前也会进行全局锁查询,如果发现全局锁正在被其他全局事务持有,则自身也会循环等待。

    全局锁的释放
    由于二阶段提交是异步进行的,当服务端向客户端发送 branch commit 请求后,客户端仅仅是将分支提交信息插入内存队列即返回,服务端只要判断这个流程没有异常就会释放全局锁。因此,可以说如果一阶段成功则在二阶段一开始就会释放全局锁,不会锁定到二阶段提交流程结束。

    相关文章

      网友评论

          本文标题:Seata源码解析

          本文链接:https://www.haomeiwen.com/subject/awgifltx.html