美文网首页读书摄影java
分布式事务(Seata)原理 详解篇,建议收藏

分布式事务(Seata)原理 详解篇,建议收藏

作者: 牧小农 | 来源:发表于2022-07-10 15:28 被阅读0次

    前言

    在之前的系列中,我们讲解了关于Seata基本介绍和实际应用,今天带来的这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。

    首先 Seata 客户端启动一般分为以下几个流程:

    1. 自动加载Bean属性和配置信息
    2. 初始化TM
    3. 初始化RM
    4. 初始化分布式事务客户端完成,完成代理数据库配置
    5. 连接TC(Seata服务端),注册RM和TM
    6. 开启全局事务

    在这篇源码的讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务(Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT模式主要分为两个阶段:

    一阶段:

    • 解析SQL,获取SQL类型(CRUD)、表信息、条件(where) 等相关信息
    • 查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
    • 执行业务SQL,更新数据
    • 查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
    • 插入回滚日志,将前后镜像数据以及业务SQL等信息,组织成一条回滚日志记录,插入到undo Log表中
    • 提交前,向TC注册分支,申请全局锁
    • 本地事务提交,业务数据的更细腻和生成的undoLog一起提交
    • 将本地事务提交的结果通知给TC

    二阶段:

    如果TC收到的是回滚请求

    • 开启本地事务,通过XID和BranchID查找到对应的undo Log记录
    • 根据undoLog中的前镜像和业务SQL的相关信息生成并执行回滚语句
    • 提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给TC

    如果没问题,执行提交操作

    • 收到TC分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给TC
    • 异步任务阶段的分支提交请求删除undoLog中记录

    源码入口

    接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过IDEA打开项目。

    源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seataspring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的注解,这个注解就是我们入手的一个点,我们在 spring.factories 中看到有一个 GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口

    GlobalTransactionAutoConfiguration 中我们找到一个用Bean注入的方法 globalTransactionScanner ,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean

    这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。

    @Configuration
    @EnableConfigurationProperties(SeataProperties.class)
    public class GlobalTransactionAutoConfiguration {
    
        //全局事务扫描器
        @Bean
        public GlobalTransactionScanner globalTransactionScanner() {
    
          String applicationName = applicationContext.getEnvironment()
              .getProperty("spring.application.name");
    
          String txServiceGroup = seataProperties.getTxServiceGroup();
    
          if (StringUtils.isEmpty(txServiceGroup)) {
            txServiceGroup = applicationName + "-fescar-service-group";
            seataProperties.setTxServiceGroup(txServiceGroup);
          }
          // 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
          return new GlobalTransactionScanner(applicationName, txServiceGroup);
        }
    
    }
    

    在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么

    /**
     * The type Global transaction scanner.
     * 全局事务扫描器
     * @author slievrly
     */
    public class GlobalTransactionScanner
            //AbstractAutoProxyCreator AOP动态代理 增强Bean
            extends AbstractAutoProxyCreator
            /**
             * ConfigurationChangeListener: 监听器基准接口
             * InitializingBean: Bean初始化
             * ApplicationContextAware: Spring容器
             * DisposableBean: Spring 容器销毁
             */
            implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
            
        private final String applicationId;//服务名
        private final String txServiceGroup;//事务分组        
    
      private void initClient() {
            //启动日志
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Initializing Global Transaction Clients ... ");
            }
            //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
            if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
                LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                        "please change your default configuration as soon as possible " +
                        "and we don't recommend you to use default tx-service-group's value provided by seata",
                        DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
            }
            if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
            }
            //init TM
            //初始化TM
            TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
            //init RM
            //初始化RM
            RMClient.init(applicationId, txServiceGroup);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
    
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global Transaction Clients are initialized. ");
            }
            registerSpringShutdownHook();
    
        }
    
        @Override
        public void afterPropertiesSet() {
            if (disableGlobalTransaction) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global transaction is disabled.");
                }
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)this);
                return;
            }
            if (initialized.compareAndSet(false, true)) {
                initClient();
            }
        }
        
       private void initClient() {
            //启动日志
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Initializing Global Transaction Clients ... ");
            }
            //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
            if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
                LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                        "please change your default configuration as soon as possible " +
                        "and we don't recommend you to use default tx-service-group's value provided by seata",
                        DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
            }
    
            //检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
            if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
            }
            //init TM
            //初始化TM
            TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
            //init RM
            //初始化RM
            RMClient.init(applicationId, txServiceGroup);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
    
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global Transaction Clients are initialized. ");
            }
            registerSpringShutdownHook();
    
        }
    
        //代理增强,Spring 所有的Bean都会经过这个方法
        @Override
        protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            // do checkers
            //检查bean和beanName
            if (!doCheckers(bean, beanName)) {
                return bean;
            }
    
            try {
                //加锁防止并发
                synchronized (PROXYED_SET) {
                    if (PROXYED_SET.contains(beanName)) {
                        return bean;
                    }
                    interceptor = null;
                    //check TCC proxy
                    //检查是否为TCC模式
                    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                        // init tcc fence clean task if enable useTccFence
                        //如果启用useTccFence 失败 ,则初始化TCC清理任务
                        TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                        //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                        //如果是,添加TCC拦截器
                        interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)interceptor);
                    } else {
                        //不是TCC
                        Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                        Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    
                        //判断是否有相关事务注解,如果没有不进行代理
                        if (!existsAnnotation(new Class[]{serviceInterface})
                            && !existsAnnotation(interfacesIfJdk)) {
                            return bean;
                        }
    
                        //发现存在全局事务注解标注的Bean对象,添加拦截器
                        if (globalTransactionalInterceptor == null) {
                            //添加拦截器
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            ConfigurationCache.addConfigListener(
                                    ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                    (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
    
                    LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                    //检查是否为代理对象
                    if (!AopUtils.isAopProxy(bean)) {
                        //不是代理对象,调用父级
                        bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                    } else {
                        //是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中
                        AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                        Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                        int pos;
                        for (Advisor avr : advisor) {
                            // Find the position based on the advisor's order, and add to advisors by pos
                            pos = findAddSeataAdvisorPosition(advised, avr);
                            advised.addAdvisor(pos, avr);
                        }
                    }
                    PROXYED_SET.add(beanName);
                    return bean;
                }
            } catch (Exception exx) {
                throw new RuntimeException(exx);
            }
        }
    
    
    }
    

    InitializingBean:中实现了一个 afterPropertiesSet()方法,在这个方法中,调用了initClient()

    AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。

    具体代码如下:

    public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {
    
        public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
            this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
            this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                DEFAULT_DISABLE_GLOBAL_TRANSACTION);
            this.order =
                ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
            degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
                DEFAULT_TM_DEGRADE_CHECK);
            if (degradeCheck) {
                ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
                degradeCheckPeriod = ConfigurationFactory.getInstance()
                    .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
                degradeCheckAllowTimes = ConfigurationFactory.getInstance()
                    .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
                EVENT_BUS.register(this);
                if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
                    startDegradeCheck();
                }
            }
            this.initDefaultGlobalTransactionTimeout();
        }
    
        @Override
        public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
            //获取执行的方法
            Class<?> targetClass =
                methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
            Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
            if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
                final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
                //获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据
                final GlobalTransactional globalTransactionalAnnotation =
                    getAnnotation(method, targetClass, GlobalTransactional.class);
                //GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁
                //保证全局事务在执行的时候,本地事务不可以操作全局事务的记录
                final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁
                boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
                if (!localDisable) {
                    if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                        AspectTransactional transactional;
                        if (globalTransactionalAnnotation != null) {
                            transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                                globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                                globalTransactionalAnnotation.noRollbackForClassName(),
                                globalTransactionalAnnotation.noRollbackFor(),
                                globalTransactionalAnnotation.noRollbackForClassName(),
                                globalTransactionalAnnotation.propagation(),
                                globalTransactionalAnnotation.lockRetryInterval(),
                                globalTransactionalAnnotation.lockRetryTimes());
                        } else {
                            transactional = this.aspectTransactional;
                        }
                        //执行全局事务
                        return handleGlobalTransaction(methodInvocation, transactional);
                    } else if (globalLockAnnotation != null) {
                        //执行全局锁
                        return handleGlobalLock(methodInvocation, globalLockAnnotation);
                    }
                }
            }
            return methodInvocation.proceed();
        }
    
    }
    

    具体流程图如下所示:

    核心源码

    在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的

    Object handleGlobalTransaction(final MethodInvocation methodInvocation,
            final AspectTransactional aspectTransactional) throws Throwable {
            boolean succeed = true;
            try {
                return transactionalTemplate.execute(new TransactionalExecutor() {
                    @Override
                    public Object execute() throws Throwable {
                        return methodInvocation.proceed();
                    }
    
                    //获取事务名称,默认获取方法名
                    public String name() {
                        String name = aspectTransactional.getName();
                        if (!StringUtils.isNullOrEmpty(name)) {
                            return name;
                        }
                        return formatMethod(methodInvocation.getMethod());
                    }
    
                    /**
                     * 解析GlobalTransation注解属性,封装对对象
                     * @return
                     */
                    @Override
                    public TransactionInfo getTransactionInfo() {
                        // reset the value of timeout
                        //获取超时时间,默认60秒
                        int timeout = aspectTransactional.getTimeoutMills();
                        if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                            timeout = defaultGlobalTransactionTimeout;
                        }
    
                        //构建事务信息对象
                        TransactionInfo transactionInfo = new TransactionInfo();
                        transactionInfo.setTimeOut(timeout);//超时时间
                        transactionInfo.setName(name());//事务名称
                        transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播
                        transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔
                        transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数
                        Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                        //其他构建信息
                        for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
                            rollbackRules.add(new RollbackRule(rbRule));
                        }
                        for (String rbRule : aspectTransactional.getRollbackForClassName()) {
                            rollbackRules.add(new RollbackRule(rbRule));
                        }
                        for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
                            rollbackRules.add(new NoRollbackRule(rbRule));
                        }
                        for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
                            rollbackRules.add(new NoRollbackRule(rbRule));
                        }
                        transactionInfo.setRollbackRules(rollbackRules);
                        return transactionInfo;
                    }
                });
            } catch (TransactionalExecutor.ExecutionException e) {
                //执行异常
                TransactionalExecutor.Code code = e.getCode();
                switch (code) {
                    case RollbackDone:
                        throw e.getOriginalException();
                    case BeginFailure:
                        succeed = false;
                        failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case CommitFailure:
                        succeed = false;
                        failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case RollbackFailure:
                        failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                        throw e.getOriginalException();
                    case RollbackRetrying:
                        failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                        throw e.getOriginalException();
                    default:
                        throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
                }
            } finally {
                if (degradeCheck) {
                    EVENT_BUS.post(new DegradeCheckEvent(succeed));
                }
            }
        }
    

    在这里我们,主要关注一个重点方法 execute() ,这个方法主要用来执行事务的具体流程:

    • 获取事务信息
    • 执行全局事务
    • 发生异常全局回滚,各个数据通过UndoLog进行事务补偿
    • 全局事务提交
    • 清除所有资源

    这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:

        public Object execute(TransactionalExecutor business) throws Throwable {
            // 1. Get transactionInfo
            //获取事务信息
            TransactionInfo txInfo = business.getTransactionInfo();
            if (txInfo == null) {
                throw new ShouldNeverHappenException("transactionInfo does not exist");
            }
            // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
            //获取当前事务,主要获取XID
            GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    
            // 1.2 Handle the transaction propagation.
            //根据配置的不同事务传播行为,执行不同的逻辑
            Propagation propagation = txInfo.getPropagation();
            SuspendedResourcesHolder suspendedResourcesHolder = null;
            try {
                switch (propagation) {
                    case NOT_SUPPORTED:
                        // If transaction is existing, suspend it.
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                        }
                        // Execute without transaction and return.
                        return business.execute();
                    case REQUIRES_NEW:
                        // If transaction is existing, suspend it, and then begin new transaction.
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                            tx = GlobalTransactionContext.createNew();
                        }
                        // Continue and execute with new transaction
                        break;
                    case SUPPORTS:
                        // If transaction is not existing, execute without transaction.
                        if (notExistingTransaction(tx)) {
                            return business.execute();
                        }
                        // Continue and execute with new transaction
                        break;
                    case REQUIRED:
                        // If current transaction is existing, execute with current transaction,
                        // else continue and execute with new transaction.
                        break;
                    case NEVER:
                        // If transaction is existing, throw exception.
                        if (existingTransaction(tx)) {
                            throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                        , tx.getXid()));
                        } else {
                            // Execute without transaction and return.
                            return business.execute();
                        }
                    case MANDATORY:
                        // If transaction is not existing, throw exception.
                        if (notExistingTransaction(tx)) {
                            throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                        }
                        // Continue and execute with current transaction.
                        break;
                    default:
                        throw new TransactionException("Not Supported Propagation:" + propagation);
                }
    
                // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
                //如果当前事务为空,创建一个新的事务
                if (tx == null) {
                    tx = GlobalTransactionContext.createNew();
                }
    
                // set current tx config to holder
                GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
    
                try {
                    // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                    //    else do nothing. Of course, the hooks will still be triggered.
                    //开始执行全局事务
                    beginTransaction(txInfo, tx);
    
                    Object rs;
                    try {
                        // Do Your Business
                        // 执行当前业务逻辑
                        //1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据
                        //2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中
                        //3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表
                        //4、会在lock_table表中插入全局锁数据(一个分支一条)
                        rs = business.execute();
                    } catch (Throwable ex) {
                        // 3. The needed business exception to rollback.
                        //发生异常全局回滚,每个事务通过undo_log表进行事务补偿
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
    
                    // 4. everything is fine, commit.
                    //全局提交
                    commitTransaction(tx);
    
                    return rs;
                } finally {
                    //5. clear
                    //清理所有资源
                    resumeGlobalLockConfig(previousConfig);
                    triggerAfterCompletion();
                    cleanUp();
                }
            } finally {
                // If the transaction is suspended, resume it.
                if (suspendedResourcesHolder != null) {
                    tx.resume(suspendedResourcesHolder);
                }
            }
        }
    

    这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction() 方法

        // 向TC发起请求,采用模板模式
        private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
            try {
                triggerBeforeBegin();
                //对TC发起请求
                tx.begin(txInfo.getTimeOut(), txInfo.getName());
                triggerAfterBegin();
            } catch (TransactionException txe) {
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);
    
            }
        }
    
    

    在来关注其中,向TC发起请求的 tx.begin() 方法,而调用begin()方法的类为:DefaultGlobalTransaction

     @Override
        public void begin(int timeout, String name) throws TransactionException {
            //判断调用者是否为TM
            if (role != GlobalTransactionRole.Launcher) {
                assertXIDNotNull();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
                }
                return;
            }
            assertXIDNull();
            String currentXid = RootContext.getXID();
            if (currentXid != null) {
                throw new IllegalStateException("Global transaction already exists," +
                    " can't begin a new global transaction, currentXid = " + currentXid);
            }
            //获取XID
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            //绑定XID
            RootContext.bind(xid);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Begin new global transaction [{}]", xid);
            }
        }
    

    再来看一下 transactionManager.begin() 方法,这个时候使用的是 DefaultTransactionManager.begin 默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()方法

        public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
            throws TransactionException {
            GlobalBeginRequest request = new GlobalBeginRequest();
            request.setTransactionName(name);
            request.setTimeout(timeout);
            //发送请求得到响应
            GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
            }
            //返回XID
            return response.getXid();
        }
    

    在这里我们需要关注一个syncCall,在这里采用的是Netty通讯方式

        private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
            try {
                // 通过Netty发送请求
                return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
            } catch (TimeoutException toe) {
                throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
            }
        }
    

    具体图解如下:

    在这里我们需要重点了解 GlobalTransactionScanner 这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。

    • ApplicationContextAware:继承这个类型以后,需要实现其方法 setApplicationContext(),当Spring启动完成以后,会自动调用这个类型,将 ApplicationContextbean,也就是说, GlobalTransactionScanner 能够很自然的使用Spring环境

    • InitializingBean: 继承这个接口,需要实现 afterPropertiesSet() ,但凡是继承这个接口的类,在初始化的时候,当所有的 properties 设置完成以后,会执行这个方法

    • DisposableBean : 这个类,实现了一个 destroy() 这个方法是在销毁的时候去调用

    • AbstractAutoProxyCreator: 这个类是Spring实现AOP的一种方式,本质上是一个 BeanPostProcessor ,在Bean初始化至去年,调用内部 createProxy() ,创建一个Bean的AOP代理Bean并返回,对Bean进行增强。

    Seata数据源代理

    在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。

    首先AT模式的核心主要分为一下两个

    • 开启全局事务,获取全局锁。
    • 解析SQL并写入undoLog中。

    关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。

    数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。

    同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个 SeataAutoConfiguration 类,我们在里面找到一个SeataDataSourceBeanPostProcessor(),这个就是我们数据源代理的入口方法

    我们进入SeataDataSourceBeanPostProcessor类里面,发现继承了一个 BeanPostProcessor ,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法 postProcessAfterInitialization()postProcessBeforeInitialization() 这两个方法都是由 BeanPostProcessor提供的,这两个方法,一个是初始化之前执行Before。一个是在初始化之后执行After,主要用来对比我们的的Bean是否为数据源代理对象。

    在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource() 方法,这个里面

        private Object proxyDataSource(Object originBean) {
            DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean);
            if (this.useJdkProxy) {
                return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
            } else {
                return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
            }
    
        }
    

    这里有一个DataSourceProxy代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy 连接对象、StatementProxyPreparedStatementProxy SQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作

    DataSourceProxy 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()

    public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    
        private String resourceGroupId;
    
        private void init(DataSource dataSource, String resourceGroupId) {
            //资源组ID,默认是“default”这个默认值
            this.resourceGroupId = resourceGroupId;
            try (Connection connection = dataSource.getConnection()) {
                //根据原始数据源得到JDBC连接和数据库类型
                jdbcUrl = connection.getMetaData().getURL();
                dbType = JdbcUtils.getDbType(jdbcUrl);
                if (JdbcConstants.ORACLE.equals(dbType)) {
                    userName = connection.getMetaData().getUserName();
                } else if (JdbcConstants.MARIADB.equals(dbType)) {
                    dbType = JdbcConstants.MYSQL;
                }
            } catch (SQLException e) {
                throw new IllegalStateException("can not init dataSource", e);
            }
            initResourceId();
            DefaultResourceManager.get().registerResource(this);
            if (ENABLE_TABLE_META_CHECKER_ENABLE) {
                //如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息
                tableMetaExecutor.scheduleAtFixedRate(() -> {
                    try (Connection connection = dataSource.getConnection()) {
                        TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                            .refresh(connection, DataSourceProxy.this.getResourceId());
                    } catch (Exception ignore) {
                    }
                }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
            }
    
            //Set the default branch type to 'AT' in the RootContext.
            RootContext.setDefaultBranchType(this.getBranchType());
        }
    }
    

    从上面我们可以看出,他主要做了以下几点的增强:

    1. 给每个数据源标识资源组ID
    2. 如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地
    3. 生成代理连接 ConnectionProxy 对象

    在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy完成的,除此之外 DataSrouceProxy重写了一个方法 getConnection,因为这里返回的是一个 ConnectionProxy,而不是原生的Connection

        @Override
        public ConnectionProxy getConnection() throws SQLException {
            Connection targetConnection = targetDataSource.getConnection();
            return new ConnectionProxy(this, targetConnection);
        }
    
        @Override
        public ConnectionProxy getConnection(String username, String password) throws SQLException {
            Connection targetConnection = targetDataSource.getConnection(username, password);
            return new ConnectionProxy(this, targetConnection);
        }
    

    ConnectionProxy

    ConnectionProxy 继承 AbstractConnectionProxy ,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxyStatementProxyDataSourceProxy

    所以我们需要先来看一下AbstractConnectionProxy,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxyStatementProxy ,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等

        @Override
        public Statement createStatement() throws SQLException {
            //调用真实连接对象获取Statement对象
            Statement targetStatement = getTargetConnection().createStatement();
            //创建Statement的代理
            return new StatementProxy(this, targetStatement);
        }
        
        @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            //获取数据库类型 mysql/oracle
            String dbType = getDbType();
            // support oracle 10.2+
            PreparedStatement targetPreparedStatement = null;
            //如果是AT模式且开启全局事务
            if (BranchType.AT == RootContext.getBranchType()) {
                List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
                if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                        //获取表的元数据
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                                sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                        //得到表的主键列名
                        String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                        tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                        targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                    }
                }
            }
            if (targetPreparedStatement == null) {
                targetPreparedStatement = getTargetConnection().prepareStatement(sql);
            }
            //创建PreparedStatementProxy代理
            return new PreparedStatementProxy(this, targetPreparedStatement, sql);
        }
    
    

    在这两个代理对象中,都用到了以下几个方法:

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
    }
    
    @Override
    public int executeUpdate(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
    }
    
    @Override
    public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
    }
    
    

    在这些方法中都调用了 ExecuteTemplate.execute(),所以我们就看一下在 ExecuteTemplate类中具体是做了什么操作:

    public class ExecuteTemplate {
    
        public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
            //如果没有全局锁,并且不是AT模式,直接执行SQL
            if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
                // Just work as original statement
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
    
            //得到数据库类型- mysql/oracle
            String dbType = statementProxy.getConnectionProxy().getDbType();
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                //sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式
                sqlRecognizers = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        dbType);
            }
            Executor<T> executor;
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                //如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor
                //PlainExecutor直接使用原生的Statment对象执行SQL
                executor = new PlainExecutor<>(statementProxy, statementCallback);
            } else {
                if (sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    switch (sqlRecognizer.getSQLType()) {
                        //新增
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                        new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                        new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                            //修改
                        case UPDATE:
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                            //删除
                        case DELETE:
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                            //加锁
                        case SELECT_FOR_UPDATE:
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                            //插入加锁
                        case INSERT_ON_DUPLICATE_UPDATE:
                            switch (dbType) {
                                case JdbcConstants.MYSQL:
                                case JdbcConstants.MARIADB:
                                    executor =
                                        new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                    break;
                                default:
                                    throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                            }
                            break;
                            //原生
                        default:
                            executor = new PlainExecutor<>(statementProxy, statementCallback);
                            break;
                    }
                } else {
                    //批量处理SQL语句
                    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                }
            }
            T rs;
            try {
                //执行
                rs = executor.execute(args);
            } catch (Throwable ex) {
                if (!(ex instanceof SQLException)) {
                    // Turn other exception into SQLException
                    ex = new SQLException(ex);
                }
                throw (SQLException) ex;
            }
            return rs;
        }
    
    }
    

    ExecuteTemplate就一个 execute(),Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERTUPDATEDELETESELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE),这些执行器的父类都是AbstractDMLBaseExecutor

    • UpdateExecutor: 执行update语句
    • InsertExecutor: 执行insert语句
    • DeleteExecutor: 执行delete语句
    • SelectForUpdateExecutor: 执行select for update语句
    • PlainExecutor: 执行普通查询语句
    • MultiExecutor: 复合执行器,在一条SQL语句中执行多条语句

    关系图如下:

    然后我们找到rs = executor.execute(args); 最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()

        @Override
        public T execute(Object... args) throws Throwable {
            String xid = RootContext.getXID();
            if (xid != null) {
                //获取XID
                statementProxy.getConnectionProxy().bind(xid);
            }
            //设置全局锁
            statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
            return doExecute(args);
        }
    
    

    在根据doExecute(args);找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()

        @Override
        public T doExecute(Object... args) throws Throwable {
            AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            //是否自动提交
            if (connectionProxy.getAutoCommit()) {
                return executeAutoCommitTrue(args);
            } else {
                return executeAutoCommitFalse(args);
            }
        }
    

    对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()

        protected T executeAutoCommitTrue(Object[] args) throws Throwable {
            ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            try {
                //设置为手动提交
                connectionProxy.changeAutoCommit();
                return new LockRetryPolicy(connectionProxy).execute(() -> {
                    //调用手动提交方法,得到分支执行的最终结果
                    T result = executeAutoCommitFalse(args);
                    //执行提交
                    connectionProxy.commit();
                    return result;
                });
            } catch (Exception e) {
                // when exception occur in finally,this exception will lost, so just print it here
                LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
                if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                    connectionProxy.getTargetConnection().rollback();
                }
                throw e;
            } finally {
                connectionProxy.getContext().reset();
                connectionProxy.setAutoCommit(true);
            }
        }
    

    connectionProxy.changeAutoCommit()方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()

        protected T executeAutoCommitFalse(Object[] args) throws Exception {
            if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
                throw new NotSupportYetException("multi pk only support mysql!");
            }
            //获取前镜像
            TableRecords beforeImage = beforeImage();
            //执行具体业务
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            //获取执行数量
            int updateCount = statementProxy.getUpdateCount();
            //判断如果执行数量大于0
            if (updateCount > 0) {
                //获取后镜像
                TableRecords afterImage = afterImage(beforeImage);
                //暂存到undolog中,在Commit的时候保存到数据库
                prepareUndoLog(beforeImage, afterImage);
            }
            return result;
        }
    

    我们再回到executeAutoCommitTrue中,去看看提交做了哪些操作connectionProxy.commit();

        @Override
        public void commit() throws SQLException {
            try {
                lockRetryPolicy.execute(() -> {
                    //具体执行
                    doCommit();
                    return null;
                });
            } catch (SQLException e) {
                if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                    rollback();
                }
                throw e;
            } catch (Exception e) {
                throw new SQLException(e);
            }
        }
    

    进入到doCommit()

        private void doCommit() throws SQLException {
            //判断是否存在全局事务
            if (context.inGlobalTransaction()) {
                processGlobalTransactionCommit();
            } else if (context.isGlobalLockRequire()) {
                processLocalCommitWithGlobalLocks();
            } else {
                targetConnection.commit();
            }
        }
    

    作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()

      private void processGlobalTransactionCommit() throws SQLException {
            try {
                //注册分支事务
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e, context.buildLockKeys());
            }
            try {
                //写入数据库undolog
                UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
                //执行原生提交 一阶段提交
                targetConnection.commit();
            } catch (Throwable ex) {
                LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
                report(false);
                throw new SQLException(ex);
            }
            if (IS_REPORT_SUCCESS_ENABLE) {
                report(true);
            }
            context.reset();
        }
    

    其中register()方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作

    
        //注册分支事务,生成分支事务ID
        private void register() throws TransactionException {
            if (!context.hasUndoLog() || !context.hasLockKey()) {
                return;
            }
            //注册分支事务
            Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
            context.setBranchId(branchId);
        }
    

    然后我们在回到processGlobalTransactionCommit中,看看写入数据库中的flushUndoLogs()

     @Override
        public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
            ConnectionContext connectionContext = cp.getContext();
            if (!connectionContext.hasUndoLog()) {
                return;
            }
            //获取XID
            String xid = connectionContext.getXid();
            //获取分支ID
            long branchId = connectionContext.getBranchId();
    
            BranchUndoLog branchUndoLog = new BranchUndoLog();
            branchUndoLog.setXid(xid);
            branchUndoLog.setBranchId(branchId);
            branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
    
            UndoLogParser parser = UndoLogParserFactory.getInstance();
            byte[] undoLogContent = parser.encode(branchUndoLog);
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
            }
    
            CompressorType compressorType = CompressorType.NONE;
            if (needCompress(undoLogContent)) {
                compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
                undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
            }
            //写入数据库具体位置
            insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
        }
    

    具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()

        @Override
        protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                                               Connection conn) throws SQLException {
            insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
        }
        
            //具体写入操作
        private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
                                   State state, Connection conn) throws SQLException {
            try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
                pst.setLong(1, branchId);
                pst.setString(2, xid);
                pst.setString(3, rollbackCtx);
                pst.setBytes(4, undoLogContent);
                pst.setInt(5, state.getValue());
                pst.executeUpdate();
            } catch (Exception e) {
                if (!(e instanceof SQLException)) {
                    e = new SQLException(e);
                }
                throw (SQLException) e;
            }
        }
    

    具体流程如下所示:

    Seata 服务端

    我们找到Server.java 这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务

      //默认协调者
            DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
    

    DefaultCoordinator类中我们找到 一个doGlobalBegin 这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit 和全局回滚 doGlobalRollback

        //处理全局事务
        @Override
        protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
                throws TransactionException {
            //响应客户端xid
            response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                    request.getTransactionName(), request.getTimeout()));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                        rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
            }
        }
        
            //处理全局提交
        @Override
        protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
                throws TransactionException {
            MDC.put(RootContext.MDC_KEY_XID, request.getXid());
            response.setGlobalStatus(core.commit(request.getXid()));
        }
    
        //处理全局回滚
        @Override
        protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                        RpcContext rpcContext) throws TransactionException {
            MDC.put(RootContext.MDC_KEY_XID, request.getXid());
            response.setGlobalStatus(core.rollback(request.getXid()));
        }
    

    在这里我们首先关注 doGlobalBegincore.begin()

        @Override
        public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
            throws TransactionException {
            //创建全局事务Session
            GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
                timeout);
            MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    
            //为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager
            //观察者设计模式,创建DataBaseSessionManager
            session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
            //全局事务开始
            session.begin();
    
            // transaction start event
            MetricsPublisher.postSessionDoingEvent(session, false);
    
            return session.getXid();
        }
    

    然后我们在来看一下SessionHolder.getRootSessionManager()

        /**
         * Gets root session manager.
         * 获取一个全局Session管理器
         * @return the root session manager
         */
        public static SessionManager getRootSessionManager() {
            if (ROOT_SESSION_MANAGER == null) {
                throw new ShouldNeverHappenException("SessionManager is NOT init!");
            }
            return ROOT_SESSION_MANAGER;
        }
        
            public static void init(String mode) {
            if (StringUtils.isBlank(mode)) {
                mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                        CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
            }
            StoreMode storeMode = StoreMode.get(mode);
            //判断Seata模式,当前为DB
            if (StoreMode.DB.equals(storeMode)) {
                //通过SPI机制读取SessionManager接口实现类,读取的META-INF.services目录,在通过反射机制创建对象DataBaseSessionManager
                ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
                ........
            }
        }
    

    在这里他其实读取的是DB模式下 io.seata.server.session.SessionManager文件的内容

    我们在回到begin方法中,去查看session.begin()

        @Override
        public void begin() throws TransactionException {
            //声明全局事务开始
            this.status = GlobalStatus.Begin;
            //开始时间
            this.beginTime = System.currentTimeMillis();
            //激活全局事务
            this.active = true;
            //将SessionManager放入到集合中,调用onBegin方法
            for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                //调用父级抽象类的方法
                lifecycleListener.onBegin(this);
            }
        }
    
    

    这里我们来看一下 onBegin() 方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession() 方法,但是要注意,这里我们用的是db模式所以调用的是db模式的 DateBaseSessionManager

        @Override
        public void onBegin(GlobalSession globalSession) throws TransactionException {
            //这里调用的是DateBaseSessionManager
            addGlobalSession(globalSession);
        }
        
            @Override
        public void addGlobalSession(GlobalSession session) throws TransactionException {
            if (StringUtils.isBlank(taskName)) {
                //写入session
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            } else {
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            }
        }
    

    然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()

     @Override
        public boolean writeSession(LogOperation logOperation, SessionStorable session) {
            //第一次进入是写入 会进入当前方法
            //全局添加
            if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
                return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
                //全局修改
            } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
                return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
                //全局删除
            } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
                return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
                //分支添加
            } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
                return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
                //分支更新
            } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
                return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
                //分支移除
            } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
                return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
            } else {
                throw new StoreException("Unknown LogOperation:" + logOperation.name());
            }
        }
    

    我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));

       @Override
        public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
            String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
            Connection conn = null;
            PreparedStatement ps = null;
            try {
                int index = 1;
                conn = logStoreDataSource.getConnection();
                conn.setAutoCommit(true);
                ps = conn.prepareStatement(sql);
                ps.setString(index++, globalTransactionDO.getXid());
                ps.setLong(index++, globalTransactionDO.getTransactionId());
                ps.setInt(index++, globalTransactionDO.getStatus());
                ps.setString(index++, globalTransactionDO.getApplicationId());
                ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
                String transactionName = globalTransactionDO.getTransactionName();
                transactionName = transactionName.length() > transactionNameColumnSize ?
                    transactionName.substring(0, transactionNameColumnSize) :
                    transactionName;
                ps.setString(index++, transactionName);
                ps.setInt(index++, globalTransactionDO.getTimeout());
                ps.setLong(index++, globalTransactionDO.getBeginTime());
                ps.setString(index++, globalTransactionDO.getApplicationData());
                return ps.executeUpdate() > 0;
            } catch (SQLException e) {
                throw new StoreException(e);
            } finally {
                IOUtil.close(ps, conn);
            }
        }
    

    在这里有一个 GlobalTransactionDO对象,里面有xid、transactionId 等等,到这里是不是就很熟悉了、

    还记得我们第一次使用Seata的时候会创建三张表

    1. branch_table 分支事务表
    2. global_table 全局事务表
    3. lock_table 全局锁表

    而这里就是对应我们的global_table表,其他两个也是差不多,都是一样的操作


    流程图如下:


    总结

    完整流程图:

    对于Seata源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以Debug,对于Seata AT模式,我们主要掌握的核心点是

    • 如何获取全局锁、开启全局事务
    • 解析SQL并写入undolog

    围绕这两点去看的话,会有针对性一点,到这里我们的Seata源码就讲解完了,有疑问的小伙伴记得在下方留言。

    我是牧小农,怕什么真理无穷,进一步有进一步的欢喜,大家加油!

    相关文章

      网友评论

        本文标题:分布式事务(Seata)原理 详解篇,建议收藏

        本文链接:https://www.haomeiwen.com/subject/umbwbrtx.html