美文网首页
Seata AT 模式是如何处理悬挂和空回滚

Seata AT 模式是如何处理悬挂和空回滚

作者: DH大黄 | 来源:发表于2021-05-06 23:43 被阅读0次

    空回滚描述:

    摘自Seata官网(在未收到 Try 请求的情况下收到 Cancel 请求,这种场景被称为空回滚;空回滚在生产环境经常出现,用户在实现TCC服务时,应允许允许空回滚的执行,即收到空回滚时返回成功。)

    AT 模式是利用 undo log 来处理空回滚的,具体细节见代码

        // io.seata.rm.datasource.DataSourceManager#branchRollback
        @Override
        public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                           String applicationData) throws TransactionException {
            DataSourceProxy dataSourceProxy = get(resourceId);
            if (dataSourceProxy == null) {
                throw new ShouldNeverHappenException();
            }
            try {
                // 获取 undo log 并执行 undo 操作
                UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
            } catch (TransactionException te) {
                StackTraceLogger.info(LOGGER, te,
                    "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                    new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
                if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                    return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
                } else {
                    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
                }
            }
            return BranchStatus.PhaseTwo_Rollbacked;
    
        }
    
        // io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
        @Override
        public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
            boolean originalAutoCommit = true;
    
            for (; ; ) {
                try {
                    conn = dataSourceProxy.getPlainConnection();
    
                    // The entire undo process should run in a local transaction.
                    if (originalAutoCommit = conn.getAutoCommit()) {
                        conn.setAutoCommit(false);
                    }
    
                    // Find UNDO LOG
                    selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                    selectPST.setLong(1, branchId);
                    selectPST.setString(2, xid);
                    rs = selectPST.executeQuery();
    
                    boolean exists = false;
                    while (rs.next()) {
                        exists = true;
    
                        // It is possible that the server repeatedly sends a rollback request to roll back
                        // the same branch transaction to multiple processes,
                        // ensuring that only the undo_log in the normal state is processed.
                        // 判断是否能够回滚
                        int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                        if (!canUndo(state)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                            }
                            return;
                        }
    
                        String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                        Map<String, String> context = parseContext(contextString);
                        byte[] rollbackInfo = getRollbackInfo(rs);
    
                        String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                        UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                            : UndoLogParserFactory.getInstance(serializer);
                        BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
    
                        try {
                            // put serializer name to local
                            setCurrentSerializer(parser.getName());
                            List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                            if (sqlUndoLogs.size() > 1) {
                                Collections.reverse(sqlUndoLogs);
                            }
                            for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                    conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                                sqlUndoLog.setTableMeta(tableMeta);
                                AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                    dataSourceProxy.getDbType(), sqlUndoLog);
                                undoExecutor.executeOn(conn);
                            }
                        } finally {
                            // remove serializer name
                            removeCurrentSerializer();
                        }
                    }
    
                    // If undo_log exists, it means that the branch transaction has completed the first phase,
                    // we can directly roll back and clean the undo_log
                    // Otherwise, it indicates that there is an exception in the branch transaction,
                    // causing undo_log not to be written to the database.
                    // For example, the business processing timeout, the global transaction is the initiator rolls back.
                    // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                    // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                    // See https://github.com/seata/seata/issues/489
    
                    if (exists) {
                        // 删除回滚日志
                        deleteUndoLog(xid, branchId, conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                                State.GlobalFinished.name());
                        }
                    } else {
                        // 回滚日志不存在,插入一条全局事务已完成的回滚记录
                        insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                                State.GlobalFinished.name());
                        }
                    }
    
                    return;
                } catch (SQLIntegrityConstraintViolationException e) {
                    // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                    }
                } catch (Throwable e) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                        } catch (SQLException rollbackEx) {
                            LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                        }
                    }
                    throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                        .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                            branchId, e.getMessage()), e);
    
                } finally {
                    try {
                        if (rs != null) {
                            rs.close();
                        }
                        if (selectPST != null) {
                            selectPST.close();
                        }
                        if (conn != null) {
                            if (originalAutoCommit) {
                                conn.setAutoCommit(true);
                            }
                            conn.close();
                        }
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                    }
                }
            }
        
    

    详细的流程见下图


    AT 空回滚.png

    悬挂描述

    摘自Seata官网(事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。

    用户在实现 TCC 服务时,要允许空回滚,但是要拒绝执行空回滚之后 Try 请求,要避免出现悬挂。)

    相关文章

      网友评论

          本文标题:Seata AT 模式是如何处理悬挂和空回滚

          本文链接:https://www.haomeiwen.com/subject/loujdltx.html