美文网首页
seata源码阅读-AT模式

seata源码阅读-AT模式

作者: w_j_y | 来源:发表于2021-12-28 22:26 被阅读0次

    demo用例


    image.png

    角色划分:
    TM: 事务管理,开启、提交、回滚分布式事务
    RM: 资源管理,注册、汇报、执资源,负责接收TC发过来的提交、回滚消息,并作出提交,回滚操作
    TC: 事务管理器服务功能,存储事务日志、补偿异常事务等、集中管理事务全局锁(全局行锁)

    整体流程:
    • TM 开启分布式事务,TM 向 TC 注册全局事务记录;
    • 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
    • TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
    • TC 汇总事务信息,决定分布式事务是提交还是回滚;
    • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;

    1.服务启动
    SeataAutoConfiguration

     @Bean
        @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
        @ConditionalOnMissingBean(GlobalTransactionScanner.class)
        public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Automatically configure Seata");
            }
            return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
        }
    

    2.初始化
    GlobalTransactionScanner实现了InitializingBean接口,在afterPropertiesSet对bean进行初始化

        @Override
        public void afterPropertiesSet() {
            if (disableGlobalTransaction) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global transaction is disabled.");
                }
                return;
            }
            initClient();
        }
    
        private void initClient() {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Initializing Global Transaction Clients ... ");
            }
            if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
            }
            //init TM
            TMClient.init(applicationId, txServiceGroup);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
            //init RM
            RMClient.init(applicationId, txServiceGroup);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
    
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global Transaction Clients are initialized. ");
            }
            registerSpringShutdownHook();
    
        }
    

    2.1 初始化TMClient
    1)启动ScheduledExecutorService定时执行器,每10秒尝试进行一次重连TC

    2)重连时,先从file.conf中根据分组名称(service_group)找到集群名称(cluster_name)

    3)再根据集群名称找到fescar-server集群ip端口列表

    4)从ip列表中选择一个用netty进行连接

    public class TMClient {
    
        /**
         * Init.
         *
         * @param applicationId           the application id
         * @param transactionServiceGroup the transaction service group
         */
        public static void init(String applicationId, String transactionServiceGroup) {
            TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
            tmNettyRemotingClient.init();
        }
    
    }
    
    public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
    
    @Override
        public void init() {
            // registry processor
            registerProcessor();
            if (initialized.compareAndSet(false, true)) {
                super.init();
            }
        }
    }
    
    public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
    
     @Override
        public void init() {
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                }
            }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
            if (NettyClientConfig.isEnableClientBatchSendRequest()) {
                mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                    MAX_MERGE_SEND_THREAD,
                    KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
                mergeSendExecutorService.submit(new MergedSendRunnable());
            }
            super.init();
            clientBootstrap.start();
        }
    
    }
    

    2.2 初始化RMClient

    public class RMClient {
    
        /**
         * Init.
         *
         * @param applicationId           the application id
         * @param transactionServiceGroup the transaction service group
         */
        public static void init(String applicationId, String transactionServiceGroup) {
            RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
           //资源管理器
            rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
            //消息回调监听器,rmHandler用于接收TC在二阶段发出的提交或者回滚请求
            rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
            rmNettyRemotingClient.init();
        }
    
    }
    
    

    2.3 开启aop切面

    对加了@GlobalTransactional注释的方法,通过GlobalTransactionalInterceptor过滤器加入cglib切面

    public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements InitializingBean, ApplicationContextAware,
        DisposableBean {
    
        @Override
        protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            if (disableGlobalTransaction) {
                return bean;
            }
            try {
                synchronized (PROXYED_SET) {
                    if (PROXYED_SET.contains(beanName)) {
                        return bean;
                    }
                    interceptor = null;
                    //check TCC proxy
                    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                        //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                        interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    } else {
                        Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                        Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    
                        if (!existsAnnotation(new Class[]{serviceInterface})
                            && !existsAnnotation(interfacesIfJdk)) {
                            return bean;
                        }
    
                        if (interceptor == null) {
                            if (globalTransactionalInterceptor == null) {
                                globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                                ConfigurationCache.addConfigListener(
                                    ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                    (ConfigurationChangeListener)globalTransactionalInterceptor);
                            }
                            interceptor = globalTransactionalInterceptor;
                        }
                    }
    
                    LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                    if (!AopUtils.isAopProxy(bean)) {
                        bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                    } else {
                        AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                        Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                        for (Advisor avr : advisor) {
                            advised.addAdvisor(0, avr);
                        }
                    }
                    PROXYED_SET.add(beanName);
                    return bean;
                }
            } catch (Exception exx) {
                throw new RuntimeException(exx);
            }
        }
    }
    

    3.事务 -阶段1

    public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
    
    @Override
        public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
            Class<?> targetClass =
                methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
            Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
            if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
                final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
                final GlobalTransactional globalTransactionalAnnotation =
                    getAnnotation(method, targetClass, GlobalTransactional.class);
                final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
                boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
                if (!localDisable) {
                    if (globalTransactionalAnnotation != null) {
                      //全局事务开始
                        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                    } else if (globalLockAnnotation != null) {
                       //全局锁
                        return handleGlobalLock(methodInvocation);
                    }
                }
            }
            return methodInvocation.proceed();
        }
    }
    

    3.2开始事务

    public class TransactionalTemplate {
    
    
    public Object execute(TransactionalExecutor business) throws Throwable {
            // 1 get transactionInfo
            TransactionInfo txInfo = business.getTransactionInfo();
            if (txInfo == null) {
                throw new ShouldNeverHappenException("transactionInfo does not exist");
            }
            // 1.1 get or create a transaction 
            GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
            // 1.2 Handle the Transaction propatation and the branchType
            Propagation propagation = txInfo.getPropagation();
            SuspendedResourcesHolder suspendedResourcesHolder = null;
            try {
                switch (propagation) {
                    case NOT_SUPPORTED:
                        suspendedResourcesHolder = tx.suspend(true);
                        return business.execute();
                    case REQUIRES_NEW:
                        suspendedResourcesHolder = tx.suspend(true);
                        break;
                    case SUPPORTS:
                        if (!existingTransaction()) {
                            return business.execute();
                        }
                        break;
                    case REQUIRED:
                        break;
                    case NEVER:
                        if (existingTransaction()) {
                            throw new TransactionException(
                                    String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                            ,RootContext.getXID()));
                        } else {
                            return business.execute();
                        }
                    case MANDATORY:
                        if (!existingTransaction()) {
                            throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                        }
                        break;
                    default:
                        throw new TransactionException("Not Supported Propagation:" + propagation);
                }
    
    
                try {
    
                    // 2. begin transaction
                    beginTransaction(txInfo, tx);
    
                    Object rs = null;
                    try {
    
                        // Do Your Business
                        rs = business.execute();
    
                    } catch (Throwable ex) {
    
                        // 3.the needed business exception to rollback.
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
    
                    // 4. everything is fine, commit.
                    commitTransaction(tx);
    
                    return rs;
                } finally {
                    //5. clear
                    triggerAfterCompletion();
                    cleanUp();
                }
            } finally {
                tx.resume(suspendedResourcesHolder);
            }
    
        }
    
    }
    

    拦截sql

    public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
    
        @Override
        public boolean execute(String sql) throws SQLException {
            this.targetSQL = sql;
            return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
        }
    }
    

    生成回滚镜像

    public class ExecuteTemplate {
    
         public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
    
            if (!RootContext.requireGlobalLock() && !StringUtils.equals(BranchType.AT.name(), RootContext.getBranchType())) {
                // Just work as original statement
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
    
            String dbType = statementProxy.getConnectionProxy().getDbType();
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                sqlRecognizers = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        dbType);
            }
            Executor<T> executor;
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                executor = new PlainExecutor<>(statementProxy, statementCallback);
            } else {
                if (sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    switch (sqlRecognizer.getSQLType()) {
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                        case UPDATE:
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case DELETE:
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case SELECT_FOR_UPDATE:
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            executor = new PlainExecutor<>(statementProxy, statementCallback);
                            break;
                    }
                } else {
                    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                }
            }
            T rs;
            try {
                rs = executor.execute(args);
            } catch (Throwable ex) {
                if (!(ex instanceof SQLException)) {
                    // Turn other exception into SQLException
                    ex = new SQLException(ex);
                }
                throw (SQLException) ex;
            }
            return rs;
        }
    }
    

    以更新为例子,UpdateExecutor的父亲类,使用了模版方法设计模式

    public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
       @Override
       public T doExecute(Object... args) throws Throwable {
           AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
           if (connectionProxy.getAutoCommit()) {
               return executeAutoCommitTrue(args);
           } else {
               return executeAutoCommitFalse(args);
           }
       }
    
        protected T executeAutoCommitFalse(Object[] args) throws Exception {
           if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
           {
               throw new NotSupportYetException("multi pk only support mysql!");
           }
           //业务SQL执行前快照
           TableRecords beforeImage = beforeImage();
           //真正执行业务SQL
           T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
           //业务SQL执行后快照
           TableRecords afterImage = afterImage(beforeImage);
           //准备快照
           prepareUndoLog(beforeImage, afterImage);
           return result;
       }
    
    }
    

    生成更新前后的数据镜像

    public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
    
    
        @Override
        protected TableRecords beforeImage() throws SQLException {
            ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
            TableMeta tmeta = getTableMeta();
            String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
            return buildTableRecords(tmeta, selectSQL, paramAppenderList);
        }
    
        private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
            SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
            StringBuilder prefix = new StringBuilder("SELECT ");
            StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
            String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
            if (StringUtils.isNotBlank(whereCondition)) {
                suffix.append(" WHERE ").append(whereCondition);
            }
            suffix.append(" FOR UPDATE");
            StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
            if (ONLY_CARE_UPDATE_COLUMNS) {
                List<String> updateColumns = recognizer.getUpdateColumns();
                if (!containsPK(updateColumns)) {
                    selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
                }
                for (String columnName : updateColumns) {
                    selectSQLJoin.add(columnName);
                }
            } else {
                for (String columnName : tableMeta.getAllColumns().keySet()) {
                    selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
                }
            }
            return selectSQLJoin.toString();
        }
    
        @Override
        protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
            TableMeta tmeta = getTableMeta();
            if (beforeImage == null || beforeImage.size() == 0) {
                return TableRecords.empty(getTableMeta());
            }
            String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
            ResultSet rs = null;
            try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
                SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
                rs = pst.executeQuery();
                return TableRecords.buildRecords(tmeta, rs);
            } finally {
                IOUtil.close(rs);
            }
        }
    
        private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
            StringBuilder prefix = new StringBuilder("SELECT ");
            String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());
            String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;
            StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
            if (ONLY_CARE_UPDATE_COLUMNS) {
                SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
                List<String> updateColumns = recognizer.getUpdateColumns();
                if (!containsPK(updateColumns)) {
                    selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
                }
                for (String columnName : updateColumns) {
                    selectSQLJoiner.add(columnName);
                }
            } else {
                for (String columnName : tableMeta.getAllColumns().keySet()) {
                    selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
                }
            }
            return selectSQLJoiner.toString();
        }
    
    
    
    }
    

    3.4 分支事务注册与事务提交

    public class ConnectionProxy extends AbstractConnectionProxy {
    
           private void doCommit() throws SQLException {
               //全局事务提交
               if (context.inGlobalTransaction()) {
                   processGlobalTransactionCommit();
               //全局锁提交
               } else if (context.isGlobalLockRequire()) {
                   processLocalCommitWithGlobalLocks();
               //正常提交
               } else {
                   targetConnection.commit();
               }
           }
           
               //全局事务提交
           private void processGlobalTransactionCommit() throws SQLException {
               try {
                   //注册branchId,并保存到上下文
                   register();
               } catch (TransactionException e) {
                   recognizeLockKeyConflictException(e, context.buildLockKeys());
               }
               try {
                   if (context.hasUndoLog()) {
                       //如果包含undolog,则将之前绑定到上下文中的undolog进行入库
                       UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
                   }
                   //本地事务提交
                   targetConnection.commit();
               } catch (Throwable ex) {
                   LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
                   //通过RMClient汇报TC结果
                   report(false);
                   throw new SQLException(ex);
               }
               if (IS_REPORT_SUCCESS_ENABLE) {
                   //通过RmRpcClient汇报TC结果
                   report(true);
               }
               context.reset();
           }
           //注册branchId,并保存到上下文
           private void register() throws TransactionException {
               Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
               null, context.getXid(), null, context.buildLockKeys());
               context.setBranchId(branchId);
           }
    
    }
    

    4、事务阶段2

    在RMClient初始化时,启动了RMHandler接收TC在二阶段发出的提交或者回滚请求

    public abstract class AbstractRMHandler extends AbstractExceptionHandler
        implements RMInboundHandler, TransactionMessageHandler {
    
        @Override
        public BranchCommitResponse handle(BranchCommitRequest request) {
            BranchCommitResponse response = new BranchCommitResponse();
            exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
                @Override
                public void execute(BranchCommitRequest request, BranchCommitResponse response)
                    throws TransactionException {
                    doBranchCommit(request, response);
                }
            }, request, response);
            return response;
        }
    
        @Override
        public BranchRollbackResponse handle(BranchRollbackRequest request) {
            BranchRollbackResponse response = new BranchRollbackResponse();
            exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
                @Override
                public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                    throws TransactionException {
                    doBranchRollback(request, response);
                }
            }, request, response);
            return response;
        }
    
    }
    

    全局提交

    public class AsyncWorker implements ResourceManagerInbound {
    
        @Override
        public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                         String applicationData) throws TransactionException {
            if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
                LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
            }
            return BranchStatus.PhaseTwo_Committed;
        }
    
        /**
         * Init.
         */
        public synchronized void init() {
            LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
            ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
            timerExecutor.scheduleAtFixedRate(() -> {
                try {
    
                    doBranchCommits();
    
                } catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... {}", e.getMessage());
    
                }
            }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
        }
    
        private void doBranchCommits() {
            if (ASYNC_COMMIT_BUFFER.isEmpty()) {
                return;
            }
    
            Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
            while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
                Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
                List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
                contextsGroupedByResourceId.add(commitContext);
            }
    
            for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
                Connection conn = null;
                DataSourceProxy dataSourceProxy;
                try {
                    try {
                        DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
                            .getResourceManager(BranchType.AT);
                        dataSourceProxy = resourceManager.get(entry.getKey());
                        if (dataSourceProxy == null) {
                            throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                        }
                        conn = dataSourceProxy.getPlainConnection();
                    } catch (SQLException sqle) {
                        LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                        continue;
                    }
                    List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
                    Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                    Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                    for (Phase2Context commitContext : contextsGroupedByResourceId) {
                        xids.add(commitContext.xid);
                        branchIds.add(commitContext.branchId);
                        int maxSize = Math.max(xids.size(), branchIds.size());
                        if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                            try {
                                UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
                                    xids, branchIds, conn);
                            } catch (Exception ex) {
                                LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                            }
                            xids.clear();
                            branchIds.clear();
                        }
                    }
    
                    if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                        return;
                    }
    
                    try {
                        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
                            branchIds, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                    }
    
                    if (!conn.getAutoCommit()) {
                        conn.commit();
                    }
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                    try {
                        conn.rollback();
                    } catch (SQLException rollbackEx) {
                        LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
                    }
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (SQLException closeEx) {
                            LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                        }
                    }
                }
            }
        }
    
    
    }
    

    全局回滚

    public abstract class AbstractUndoLogManager implements UndoLogManager {
    
        @Override
        public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
            boolean originalAutoCommit = true;
    
            for (; ; ) {
                try {
                    conn = dataSourceProxy.getPlainConnection();
    
                    // The entire undo process should run in a local transaction.
                    if (originalAutoCommit = conn.getAutoCommit()) {
                        conn.setAutoCommit(false);
                    }
    
                    // Find UNDO LOG
                    selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                    selectPST.setLong(1, branchId);
                    selectPST.setString(2, xid);
                    rs = selectPST.executeQuery();
    
                    boolean exists = false;
                    while (rs.next()) {
                        exists = true;
    
                        // It is possible that the server repeatedly sends a rollback request to roll back
                        // the same branch transaction to multiple processes,
                        // ensuring that only the undo_log in the normal state is processed.
                        int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                        if (!canUndo(state)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                            }
                            return;
                        }
    
                        String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                        Map<String, String> context = parseContext(contextString);
                        byte[] rollbackInfo = getRollbackInfo(rs);
    
                        String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                        UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                            : UndoLogParserFactory.getInstance(serializer);
                        BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
    
                        try {
                            // put serializer name to local
                            setCurrentSerializer(parser.getName());
                            List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                            if (sqlUndoLogs.size() > 1) {
                                Collections.reverse(sqlUndoLogs);
                            }
                            for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                    conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                                sqlUndoLog.setTableMeta(tableMeta);
                                AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                    dataSourceProxy.getDbType(), sqlUndoLog);
                                undoExecutor.executeOn(conn);
                            }
                        } finally {
                            // remove serializer name
                            removeCurrentSerializer();
                        }
                    }
    
                    // If undo_log exists, it means that the branch transaction has completed the first phase,
                    // we can directly roll back and clean the undo_log
                    // Otherwise, it indicates that there is an exception in the branch transaction,
                    // causing undo_log not to be written to the database.
                    // For example, the business processing timeout, the global transaction is the initiator rolls back.
                    // To ensure data consistency, we can insert an undo_log with GlobalFinished state
                    // to prevent the local transaction of the first phase of other programs from being correctly submitted.
                    // See https://github.com/seata/seata/issues/489
    
                    if (exists) {
                        deleteUndoLog(xid, branchId, conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                                State.GlobalFinished.name());
                        }
                    } else {
                        insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                        conn.commit();
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                                State.GlobalFinished.name());
                        }
                    }
    
                    return;
                } catch (SQLIntegrityConstraintViolationException e) {
                    // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
                    }
                } catch (Throwable e) {
                    if (conn != null) {
                        try {
                            conn.rollback();
                        } catch (SQLException rollbackEx) {
                            LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                        }
                    }
                    throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
                        .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                            branchId, e.getMessage()), e);
    
                } finally {
                    try {
                        if (rs != null) {
                            rs.close();
                        }
                        if (selectPST != null) {
                            selectPST.close();
                        }
                        if (conn != null) {
                            if (originalAutoCommit) {
                                conn.setAutoCommit(true);
                            }
                            conn.close();
                        }
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
                    }
                }
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:seata源码阅读-AT模式

          本文链接:https://www.haomeiwen.com/subject/fdfmqrtx.html