美文网首页
分布式事务中间件 seata - RM 模块源码解读

分布式事务中间件 seata - RM 模块源码解读

作者: habit_learning | 来源:发表于2019-06-15 18:32 被阅读0次

    上一篇文章,我们介绍了新一代分布式事务的 seata 的实现机制,它是基于两阶段提交模式设计的,以高效且对业务零侵入的方式,解决微服务场景下面临的分布式事务问题。我们重新温故下 seata 三大组件 TC、TM、RM的交互流程:

    本文将深入到 seata 的 RM 模块源码去介绍 seata 是如何在完成分支提交和回滚的基础上又做到零侵入,进而极大方便业务方进行业务系统开发。

    从配置开始解读

    /**
     * The type Druid configuration.
     */
    @Configuration
    public class DruidConfiguration {
    
        @Value("${spring.datasource.druid.user}")
        private String druidUser;
    
        @Value("${spring.datasource.druid.password}")
        private String druidPassword;
    
        /**
         * Druid data source druid data source.
         *
         * @return the druid data source
         */
        @Bean(destroyMethod = "close", initMethod = "init")
        @ConfigurationProperties(prefix = "spring.datasource")
        public DruidDataSource druidDataSource() {
            DruidDataSource druidDataSource = new DruidDataSource();
            return druidDataSource;
        }
    
        /**
         * Data source data source.
         *
         * @param druidDataSource the druid data source
         * @return the data source
         */
        @Primary
        @Bean("dataSource")
        public DataSource dataSource(DruidDataSource druidDataSource) {
            DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
            return dataSourceProxy;
        }
        ...
    }
    

    上面是 seata 数据源的配置,数据源采用 druid 的DruidDataSource,但实际 jdbcTemplate 执行时并不是用该数据源,而用的是 seata 对DruidDataSource的代理DataSourceProxy,所以,与 RM 相关的代码逻辑基本上都是从DataSourceProxy这个代理数据源开始的。

    seata 采用 2PC 来完成分支事务的提交与回滚,具体怎么做到的呢,下面就分别介绍 Phase1、Phase2 具体做了些什么。

    Phase1 - 分支(本地)事务执行

    seata 将一个本地事务做为一个分布式事务分支,所以若干个分布在不同微服务中的本地事务共同组成了一个全局事务,结构如下:


    那么,一个本地事务中 SQL 是如何执行呢?在 Spring 中,本质上都是从 jdbcTemplate 开始的,一般JdbcTemplate执行流程如下图所示:


    由于在配置中,JdbcTemplate 数据源DruidDataSource被配置成了 seata 实现DataSourceProxy,进而控制了后续的流程。数据库连接使用的是seata 提供的ConnectionProxy,Statement 使用的是 seata 实现的StatementProxy,最终 seata 就顺理成章地实现了在本地事务执行前后增加所需要的逻辑,比如:完成分支事务的快照记录和分支事务执行状态的上报等等。

    DataSourceProxy获取ConnectionProxy:

    public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    
        @Override
        public ConnectionProxy getConnection() throws SQLException {
            Connection targetConnection = targetDataSource.getConnection();
            return new ConnectionProxy(this, targetConnection);
        }
    
        @Override
        public ConnectionProxy getConnection(String username, String password) throws SQLException {
            Connection targetConnection = targetDataSource.getConnection(username, password);
            return new ConnectionProxy(this, targetConnection);
        }
    }
    

    ConnectionProxy获取StatmentProxy

    public abstract class AbstractConnectionProxy implements Connection {
      
        @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
            return new PreparedStatementProxy(this, targetPreparedStatement, sql);
        }
    
    }
    

    在获取到StatementProxy后,可以调用excute方法执行SQL了:

    public class PreparedStatementProxy extends AbstractPreparedStatementProxy
        implements PreparedStatement, ParametersHolder {
    
         @Override
        public ArrayList<Object>[] getParameters() {
            return parameters;
        }
    
        private void init() throws SQLException {
            int paramCount = targetStatement.getParameterMetaData().getParameterCount();
            this.parameters = new ArrayList[paramCount];
            for (int i = 0; i < paramCount; i++) {
                parameters[i] = new ArrayList<>();
            }
        }
    
        /**
         * Instantiates a new Prepared statement proxy.
         *
         * @param connectionProxy the connection proxy
         * @param targetStatement the target statement
         * @param targetSQL       the target sql
         * @throws SQLException the sql exception
         */
        public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement,
                                      String targetSQL) throws SQLException {
            super(connectionProxy, targetStatement, targetSQL);
            init();
        }
    
        @Override
        public boolean execute() throws SQLException {
            return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
                @Override
                public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                    return statement.execute();
                }
            });
        }
    
        @Override
        public ResultSet executeQuery() throws SQLException {
            return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, PreparedStatement>() {
                @Override
                public ResultSet execute(PreparedStatement statement, Object... args) throws SQLException {
                    return statement.executeQuery();
                }
            });
        }
    
        @Override
        public int executeUpdate() throws SQLException {
            return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
                @Override
                public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
                    return statement.executeUpdate();
                }
            });
        }
    }
    

    而真正excute实现逻辑如下:

    public class ExecuteTemplate {
    
        ...
        public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
    
            if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
                // Just work as original statement
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
    
            if (sqlRecognizer == null) {
                sqlRecognizer = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        statementProxy.getConnectionProxy().getDbType());
            }
            Executor<T> executor = null;
            if (sqlRecognizer == null) {
                executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
            } else {
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                        break;
                }
            }
            T rs = null;
            try {
                rs = executor.execute(args);
            } catch (Throwable ex) {
                if (!(ex instanceof SQLException)) {
                    // Turn other exception into SQLException
                    ex = new SQLException(ex);
                }
                throw (SQLException)ex;
            }
            return rs;
        }
    }
    
    1. 首先会检查当前本地事务是否处于全局事务中,如果不处于,直接使用默认的Statment执行,避免因引入 seata 导致非全局事务中的 SQL 执行性能下降。

    2.根据 SQL 语句和数据库类型,获取 SQL 识别器SQLRecognizer

    1. 对于 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 SQL 会使用专门实现的 SQL 执行器进行处理,其它 SQL (如查询)直接是默认的PlainExecutor执行。

    2. 返回执行结果,如有异常则直接抛给上层业务代码进行处理。

    再来看一下关键的 INSERT、UPDATE、DELETE、SELECT..FOR UPDATE 这四种类型的 sql 如何执行的,先看一下具体类图结构:


    为节省篇幅,选择UpdateExecutor实现源码看一下,先看入口BaseTransactionalExecutor.execute,该方法将ConnectionProxy与Xid(事务ID)进行绑定,这样后续判断当前本地事务是否处理全局事务中只需要看ConnectionProxy中 Xid 是否为空就行。

    public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {
    
      /**
         * The Statement proxy.
         */
        protected StatementProxy<S> statementProxy;
    
        ...
        @Override
        public Object execute(Object... args) throws Throwable {
            if (RootContext.inGlobalTransaction()) {
                String xid = RootContext.getXID();
                statementProxy.getConnectionProxy().bind(xid);
            }
    
            if (RootContext.requireGlobalLock()) {
                statementProxy.getConnectionProxy().setGlobalLockRequire(true);
            } else {
                statementProxy.getConnectionProxy().setGlobalLockRequire(false);
            }
            return doExecute(args);
        }
       
    }
    

    然后,执行AbstractDMLBaseExecutor中实现的doExecute方法:

    public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
    
        @Override
        public T doExecute(Object... args) throws Throwable {
            AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            if (connectionProxy.getAutoCommit()) {
                return executeAutoCommitTrue(args);
            } else {
                return executeAutoCommitFalse(args);
            }
        }
    
        /**
         * Execute auto commit false t.
         *
         * @param args the args
         * @return the t
         * @throws Throwable the throwable
         */
        protected T executeAutoCommitFalse(Object[] args) throws Throwable {
            TableRecords beforeImage = beforeImage();
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            prepareUndoLog(beforeImage, afterImage);
            return result;
        }
    
        /**
         * Execute auto commit true t.
         *
         * @param args the args
         * @return the t
         * @throws Throwable the throwable
         */
        protected T executeAutoCommitTrue(Object[] args) throws Throwable {
            T result = null;
            AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            LockRetryController lockRetryController = new LockRetryController();
            try {
                connectionProxy.setAutoCommit(false);
                while (true) {
                    try {
                        result = executeAutoCommitFalse(args);
                        connectionProxy.commit();
                        break;
                    } catch (LockConflictException lockConflict) {
                        connectionProxy.getTargetConnection().rollback();
                        lockRetryController.sleep(lockConflict);
                    }
                }
    
            } catch (Exception e) {
                // when exception occur in finally,this exception will lost, so just print it here
                LOGGER.error("exception occur", e);
                throw e;
            } finally {
                connectionProxy.setAutoCommit(true);
            }
            return result;
        }
    
    }
    

    基本逻辑如下:

    1. 先判断是否为 Auto-Commit 模式
    2. 如果非 Auto-Commit 模式,则先查询 Update 前对应行记录的快照beforeImage,再执行 Update 语句,完成后再查询 Update 后对应行记录的快照afterImage,最后将beforeImageafterImage生成 UndoLog 追加到 Connection 上下文ConnectionContext中。(注:获取beforeImageafterImage方法在UpdateExecutor类下,一般是构造一条 Select...For Update 语句获取执行前后的行记录)。
    3. 如果是 Auto-Commit 模式,先将提交模式设置成非自动 Commit,再执行2中的逻辑,再执行connectionProxy.commit()方法,由于执行2过程和commit时都可能会出现全局锁冲突问题,增加了一个循环等待重试逻辑,最后将 connection 的模式设置成 Auto-Commit 模式。

    如果本地事务执行过程中发生异常,业务上层会接收到该异常,至于是给 TM 模块返回成功还是失败,由业务上层实现决定,如果返回失败,则 TM 裁决对全局事务进行回滚;如果本地事务执行过程未发生异常,不管是非 Auto-Commit 还是 Auto-Commit 模式,最后都会调用connectionProxy.commit()对本地事务进行提交,在这里会创建分支事务、上报分支事务的状态以及将 UndoLog 持久化到undo_log表中,具体代码如下图:

        @Override
        public void commit() throws SQLException {
            if (context.inGlobalTransaction()) {
                processGlobalTransactionCommit();
            } else if (context.isGlobalLockRequire()) {
                processLocalCommitWithGlobalLocks();
            } else {
                targetConnection.commit();
            }
        }
    
        private void processGlobalTransactionCommit() throws SQLException {
            try {
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }
    
            try {
                if (context.hasUndoLog()) {
                    UndoLogManager.flushUndoLogs(this);
                }
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();
        }
    

    基本逻辑:

    1. 判断当前本地事务是否处于全局事务中(也就判断ConnectionContext中的 xid 是否为空)。
    2. 如果不处于全局事务中,但是有全局事务锁(即方法标注了@GlobalLock注解),则在全局事务锁中提交本地事务。
    3. 以上情况都不是,则调用targetConnection对本地事务进行 commit。
    4. 如果处于全局事务中,首先创建分支事务,再将ConnectionContext中的 UndoLog 写入到undo_log表中,然后调用targetConnection对本地事务进行 commit,将 UndoLog 与业务 SQL 一起提交,最后上报分支事务的状态(成功 or 失败),并将ConnectionContext上下文重置。

    注:@GlobalLock注解的作用,用GlobalLock修饰的本地业务方法,虽然该方法并非某个全局事务下的分支事务,但是它对数据资源的操作也需要先查询全局锁,如果存在其他 Seata 全局事务正在修改,则该方法也需等待。所以,如果想要 Seata 全局事务执行期间,数据库不会被其他事务修改,则该方法需要强制添加GlobalLock注解,来将其纳入 Seata 分布式事务的管理范围。

    综上所述,RM 模块通过对 JDBC 数据源进行代理,干预业务 SQL 执行过程,加入了很多流程,比如业务 SQL 解析、业务 SQL 执行前后的数据快照查询并组织成 UndoLog、全局锁检查、分支事务注册、UndoLog 写入并随本地事务一起 Commit、分支事务状态上报等。通过这种方式,seata 真正做到了对业务代码无侵入,只需要通过简单的配置,业务方就可以轻松享受 seata 所带来的功能。

    Phase1 整体流程引用 seata 官方图总结如下:


    Phase2 - 分支事务提交或回滚

    阶段 2 完成的是全局事务的最终提交或回滚,当全局事务中所有分支事务全部完成并且都执行成功,这时 TM 会发起全局事务提交,TC 收到全全局事务提交消息后,会通知各分支事务进行提交;同理,当全局事务中所有分支事务全部完成并且某个分支事务失败了,TM 会通知 TC 协调全局事务回滚,进而 TC 通知各分支事务进行回滚。

    在业务应用启动过程中,由于引入了seata 客户端,RmRpcClient会随应用一起启动,RmRpcClient采用 Netty 实现,可以接收 TC 消息和向 TC 发送消息,因此RmRpcClient是与 TC 收发消息的关键模块。

    下面分成两部分来讲:分支事务提交、分去事务回滚。

    1、分支事务提交

    在接收到 TC 发起的全局提交消息后,RmRpcClient对通信协议的处理,会根据发送请求 Body 类型(分支提交请求或者分支回滚请求),调用不同的处理逻辑,RmMessageListener#onMessage

        @Override
        public void onMessage(long msgId, String serverAddress, Object msg, ClientMessageSender sender) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("onMessage:" + msg);
            }
            if (msg instanceof BranchCommitRequest) {
                handleBranchCommit(msgId, serverAddress, (BranchCommitRequest)msg, sender);
            } else if (msg instanceof BranchRollbackRequest) {
                handleBranchRollback(msgId, serverAddress, (BranchRollbackRequest)msg, sender);
            }
        }
    
        private void handleBranchRollback(long msgId, String serverAddress,
                                          BranchRollbackRequest branchRollbackRequest,
                                          ClientMessageSender sender) {
            BranchRollbackResponse resultMessage = null;
            resultMessage = (BranchRollbackResponse)handler.onRequest(branchRollbackRequest, null);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("branch rollback result:" + resultMessage);
            }
            try {
                sender.sendResponse(msgId, serverAddress, resultMessage);
            } catch (Throwable throwable) {
                LOGGER.error("", "send response error", throwable);
            }
        }
    
        private void handleBranchCommit(long msgId, String serverAddress,
                                        BranchCommitRequest branchCommitRequest,
                                        ClientMessageSender sender) {
    
            BranchCommitResponse resultMessage = null;
            try {
                resultMessage = (BranchCommitResponse)handler.onRequest(branchCommitRequest, null);
                sender.sendResponse(msgId, serverAddress, resultMessage);
            } catch (Exception e) {
                LOGGER.error(FrameworkErrorCode.NetOnMessage.getErrCode(), e.getMessage(), e);
                if (resultMessage == null) {
                    resultMessage = new BranchCommitResponse();
                }
                resultMessage.setResultCode(ResultCode.Failed);
                resultMessage.setMsg(e.getMessage());
                sender.sendResponse(msgId, serverAddress, resultMessage);
            }
        }
    
    

    由于 TC 发起的是全局提交消息,于是它发送的消息 Body 类型为分支事务提交请求BranchCommitRequest,于是会走分支事务提交的逻辑,后面会交由RMHandlerAT来完成对分支事务的提交,分支事务提交从RMHandlerAT.doBranchCommit()开始,但最后由AsyncWorker异步 Worker 完成,直接看AsyncWorker中的代码实现:

    public class AsyncWorker implements ResourceManagerInbound {
    
        @Override
        public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                         String applicationData) throws TransactionException {
            if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
                LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
                    + "] will be handled by housekeeping later.");
            }
            return BranchStatus.PhaseTwo_Committed;
        }
    
        /**
         * Init.
         */
        public synchronized void init() {
            LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
            timerExecutor = new ScheduledThreadPoolExecutor(1,
                new NamedThreadFactory("AsyncWorker", 1, true));
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
    
                        doBranchCommits();
    
                    } catch (Throwable e) {
                        LOGGER.info("Failed at async committing ... " + e.getMessage());
    
                    }
                }
            }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
        }
        ...
    }
    

    AsyncWorker#init初始化方法会在程序启动时执行,目的是开启定时任务,执行doBranchCommits()方法。我们先看branchCommit方法,它将分支事务组织成Phase2Context对象,并且放入阻塞队列ASYNC_COMMIT_BUFFER中。而doBranchCommits()方法正是从阻塞队列ASYNC_COMMIT_BUFFER中取出Phase2Context对象,从而删除 UndoLog 日志,我们看代码:

    private void doBranchCommits() {
            if (ASYNC_COMMIT_BUFFER.size() == 0) {
                return;
            }
    
            Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
            while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
                Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
                List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
                if (contextsGroupedByResourceId == null) {
                    contextsGroupedByResourceId = new ArrayList<>();
                    mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
                }
                contextsGroupedByResourceId.add(commitContext);
    
            }
    
            for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
                Connection conn = null;
                try {
                    try {
                        DataSourceManager resourceManager = (DataSourceManager)DefaultResourceManager.get()
                            .getResourceManager(BranchType.AT);
                        DataSourceProxy dataSourceProxy = resourceManager.get(entry.getKey());
                        if (dataSourceProxy == null) {
                            throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                        }
                        conn = dataSourceProxy.getPlainConnection();
                    } catch (SQLException sqle) {
                        LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                        continue;
                    }
                    List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
                    Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                    Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                    for (Phase2Context commitContext : contextsGroupedByResourceId) {
                        xids.add(commitContext.xid);
                        branchIds.add(commitContext.branchId);
                        int maxSize = xids.size() > branchIds.size() ? xids.size() : branchIds.size();
                        if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                            try {
                                UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
                            } catch (Exception ex) {
                                LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                            }
                            xids.clear();
                            branchIds.clear();
                        }
                    }
    
                    if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                        return;
                    }
    
                    try {
                        UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                    }
    
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (SQLException closeEx) {
                            LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                        }
                    }
                }
            }
        }
    

    该方法主要根据Phase2Context对象中的 Xid 和 BranchId 找到并删除对应的 Undolog 日志。

    同样,对于分支事务提交也引用 seata 官方一张图作为结尾:


    2. 分支事务回滚

    同样,当 TC 接收到全局事务回滚的指令时,会向每个 RM 发送分支事务回滚的请求,即BranchRollbackRequest。随后调用RMHandlerAT.doBranchRollback方法,然后到了dataSourceManager.branchRollback,最后完成分支事务回滚逻辑的是UndoLogManager.undo方法。

    public abstract class AbstractRMHandler extends AbstractExceptionHandler{
      
        /**
         * Do branch rollback.
         *
         * @param request  the request
         * @param response the response
         * @throws TransactionException the transaction exception
         */
        protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
            throws TransactionException {
            String xid = request.getXid();
            long branchId = request.getBranchId();
            String resourceId = request.getResourceId();
            String applicationData = request.getApplicationData();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
            }
            BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
                applicationData);
            response.setXid(xid);
            response.setBranchId(branchId);
            response.setBranchStatus(status);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Branch Rollbacked result: " + status);
            }
        }
    }
    
    public class DataSourceManager extends AbstractResourceManager implements Initialize {
    
        @Override
        public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
            DataSourceProxy dataSourceProxy = get(resourceId);
            if (dataSourceProxy == null) {
                throw new ShouldNeverHappenException();
            }
            try {
                UndoLogManager.undo(dataSourceProxy, xid, branchId);
            } catch (TransactionException te) {
                if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                    return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
                } else {
                    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
                }
            }
            return BranchStatus.PhaseTwo_Rollbacked;
    
        }
    }
    

    UndoLogManager.undo方法源码如下:

    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            assertDbSupport(dataSourceProxy.getDbType());
    
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
    
            for (; ; ) {
                try {
                    conn = dataSourceProxy.getPlainConnection();
    
                    // The entire undo process should run in a local transaction.
                    conn.setAutoCommit(false);
    
                    // Find UNDO LOG
                    selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                    selectPST.setLong(1, branchId);
                    selectPST.setString(2, xid);
                    rs = selectPST.executeQuery();
    
                    boolean exists = false;
                    while (rs.next()) {
                        exists = true;
    
                        // It is possible that the server repeatedly sends a rollback request to roll back
                        // the same branch transaction to multiple processes,
                        // ensuring that only the undo_log in the normal state is processed.
                        int state = rs.getInt("log_status");
                        if (!canUndo(state)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("xid {} branch {}, ignore {} undo_log",
                                        xid, branchId, state);
                            }
                            return;
                        }
    
                        Blob b = rs.getBlob("rollback_info");
                        byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
                        BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);
    
                        for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                            TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                            sqlUndoLog.setTableMeta(tableMeta);
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(),
                                sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    }
    
                    // If undo_log exists, it means that the branch transaction has completed the first phase,
                    // we can directly roll back and clean the undo_log
                    // Otherwise, it indicates that there is an exception in the branch transaction,
                    // causing undo_log not to be written to the database.
                    // For example, the business processing timeout, the global transaction is the initiator rolls back.
                    // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                    // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                    // See https://github.com/seata/seata/issues/489
    
                    if (exists) {
                        deleteUndoLog(xid, branchId, conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log deleted with {}",
                                    xid, branchId, State.GlobalFinished.name());
                        }
                    } else {
                        insertUndoLogWithGlobalFinished(xid, branchId, conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log added with {}",
                                    xid, branchId, State.GlobalFinished.name());
                        }
                    }
    
                    return;
                } catch (SQLIntegrityConstraintViolationException e) {
                    // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback",
                                xid, branchId);
                    }
                } catch (Throwable e) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                        } catch (SQLException rollbackEx) {
                            LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                        }
                    }
                    throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid),
                        e);
    
                } finally {
                    try {
                        if (rs != null) {
                            rs.close();
                        }
                        if (selectPST != null) {
                            selectPST.close();
                        }
                        if (conn != null) {
                            conn.close();
                        }
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                    }
                }
            }
        }
    

    基本逻辑为:

    1. 获取Connection对象,并设置成非 Auto-Commit 模式。
    2. 查询 UndoLog。
    3. 对 UndoLog 进行解码处理。
    4. 根据 UndoLog 反向生成业务 SQL 并执行。
    5. 删除 UndoLog,并提交本地事务。

    从上面源码可以看出,整个回滚到全局事务之前状态的代码逻辑集中在如下代码中:

    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(),
                                sqlUndoLog);
    

    首先通过UndoExecutorFactory获取到对应的UndoExecutor,然后再执行UndoExecutorexecuteOn方法完成回滚操作。目前三种类型的UndoExecutor结构如下:


    undoExecutor.executeOn源码如下:
    public void executeOn(Connection conn) throws SQLException {
    
            if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
                return;
            }
            
            try {
                String undoSQL = buildUndoSQL();
    
                PreparedStatement undoPST = conn.prepareStatement(undoSQL);
    
                TableRecords undoRows = getUndoRows();
    
                for (Row undoRow : undoRows.getRows()) {
                    ArrayList<Field> undoValues = new ArrayList<>();
                    Field pkValue = null;
                    for (Field field : undoRow.getFields()) {
                        if (field.getKeyType() == KeyType.PrimaryKey) {
                            pkValue = field;
                        } else {
                            undoValues.add(field);
                        }
                    }
    
                    undoPrepare(undoPST, undoValues, pkValue);
    
                    undoPST.executeUpdate();
                }
    
            } catch (Exception ex) {
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
    
            }
    
        }
    

    该方法会根据 UndoLog,反向生成业务 SQL,设置参数并执行。

    至此,整个分支事务回滚就结束了。

    引入seata 官方对分支事务回滚原理介绍图作为结尾:


    思考:在 Phase2 阶段,全局事务提交或者回滚的过程中,如果出现了超时或者异常情况,导致提交或者回滚失败时,seata 是如何保证分布式事务运行结果的正确性的?

    其实,从上面源码中可以发现,当全局事务提交或者回滚操作处理完成之后(异常会封装异常信息),会把处理结果发送给 TC(服务端),sender.sendResponse(msgId, serverAddress, resultMessage)。服务端那边会有超时检测和重试机制,来保证分布式事务运行结果的正确性。

    综合上述,seata 在 Phase2 通过 UndoLog 自动完成分支事务提交与回滚,在这个过程中不需要业务方做任何处理,业务方无感知,因此在该阶段对业务代码也是无侵入的。

    相关文章

      网友评论

          本文标题:分布式事务中间件 seata - RM 模块源码解读

          本文链接:https://www.haomeiwen.com/subject/komvfctx.html