美文网首页
2-fescar(seata)源码分析-全局事务开始

2-fescar(seata)源码分析-全局事务开始

作者: 致虑 | 来源:发表于2019-02-19 22:42 被阅读0次

    2-fescar源码分析-全局事务开始

    一、官方介绍

    1.TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
    2.XID 在微服务调用链路的上下文中传播。

    那这一篇主要分析fescar如何开启一个事务,TM 如何向 TC 申请开启一个全局事务,全局事务如何创建成功并生成一个全局唯一的 XID。并将XID在微服务中进行传递。

    --

    二、(原理)源码分析

    紧接着上一篇的server启动分析,依然借助官网的example例图进行出发。

    2.1 demo
    • 继续看下官网的结构图:
      [图片上传失败...(image-1c1702-1550587371400)]
    项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**全局事务的开端**
    
    2.2.主服务入口
    • 1.启动主服务:BusinessServiceImpl

      public class BusinessServiceImpl implements BusinessService {
          ...
          @Override
          @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
          public void purchase(String userId, String commodityCode, int orderCount) {
              LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
              storageService.deduct(commodityCode, orderCount);
              orderService.create(userId, commodityCode, orderCount);
              throw new RuntimeException("xxx");
          }
          ...
          public static void main(String[] args) {
              ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"dubbo-business.xml"});
              final BusinessService business = (BusinessService)context.getBean("business");
              business.purchase("U100001", "C00321", 2);
          }
      }
      
      public class StorageServiceImpl implements StorageService {
      
          private JdbcTemplate jdbcTemplate;
          
          @Override
          public void deduct(String commodityCode, int count) {
              LOGGER.info("Storage Service Begin ... xid: " + RootContext.getXID());
              LOGGER.info("Deducting inventory SQL: update storage_tbl set count = count - {} where commodity_code = {}",count,commodityCode);
      
              jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});
              LOGGER.info("Storage Service End ... ");
          }
      
          public static void main(String[] args) throws Throwable {
      
              String applicationId = "dubbo-demo-storage-service";
              String txServiceGroup = "my_test_tx_group";
      
              RMClientAT.init(applicationId, txServiceGroup);
      
              ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"dubbo-storage-service.xml"});
              context.getBean("service");
              JdbcTemplate jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");
              jdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'");
              jdbcTemplate.update("insert into storage_tbl(commodity_code, count) values ('C00321', 100)");
              new ApplicationKeeper(context).keep();
          }
      }
      
      • 1.BusinessServiceImpl通过main启动,接着purchase方法调用了其他rpc的相关逻辑,此时是多个写服务,必然会涉及分布式事务。
      • 2.StorageServiceImpl通过main方法启动服务,deduct是直接被BusinessServiceImpl调用的写服务方法。
      • 3.其他OrderServiceImpl逻辑类似。

    --

    2.2 相关RPC服务初始化。
    • 以StorageServiceImpl服务启动为例,首先看看dubbo-storage-service.xml文件配置

      <bean id="storageDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy">
          <constructor-arg ref="storageDataSource" />
      </bean>
      
      <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
          <property name="dataSource" ref="storageDataSourceProxy" />
      </bean>
      
      <dubbo:application name="dubbo-demo-storage-service"  />
      <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />
      <dubbo:protocol name="dubbo" port="20882" />
      <dubbo:service interface="com.alibaba.fescar.tm.dubbo.StorageService" ref="service" timeout="10000"/>
      
      <bean id="service" class="com.alibaba.fescar.tm.dubbo.impl.StorageServiceImpl">
          <property name="jdbcTemplate" ref="jdbcTemplate"/>
      </bean>
      
      <bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
          <constructor-arg value="dubbo-demo-storage-service"/>
          <constructor-arg value="my_test_tx_group"/>
      </bean>
      
      

      划重点:初始化服务时,内部会加载GlobalTransactionScanner类,那么这个类具体作用是什么呢?其实就是建立rpc服务下的socketChannel,将rpc与TC server建立连接,保持通信。
      继续往下跟踪。

    • 2.2.1 初始化服务内的SocketChannel

      继续跟踪GlobalTransactionScanner逻辑:

      @Override
      public void afterPropertiesSet() {
          if (disableGlobalTransaction) {
              if (LOGGER.isInfoEnabled()) {
                  LOGGER.info("Global transaction is disabled.");
              }
              return;
          }
          initClient();
      }
      
      private void initClient() {
          ...
          TMClient.init(applicationId, txServiceGroup);
          ...
      }
      

      根据配置中配置的applicationId, txServiceGroup调用RMClientAT去执行init操作:

      public class RMClientAT {
          public static void init(String applicationId, String transactionServiceGroup) {
              RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
              AsyncWorker asyncWorker = new AsyncWorker();
              asyncWorker.init();
              DataSourceManager.init(asyncWorker);
              rmRpcClient.setResourceManager(DataSourceManager.get());
              rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
              rmRpcClient.init();
          }
      }
      
      • 1.根据applicationId, transactionServiceGroup获取一个RmRpcClient实例
      • 2.获取AsyncWorker一个异步工作执行器
      • 3.异步工作执行器进行初始化
      • 4.将AsyncWorker加入DataSourceManager
      • 5.将DataSourceManager、消息监听器设置到RmRpcClient
      • 6.初始化rmRpcClient

      那么一步步分析下:

    • RmRpcClient:肯定是netty的一个客户端
      看到其父类AbstractRpcRemotingClient,里面属性就是netty相关的工作线程组、启动组件等。
      与server端相关的通信就靠这个RmRpcClient了

    • 构造一个异步工作组:AsyncWorker,并进行初始化

      public synchronized void init() {
          LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
          timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
          timerExecutor.scheduleAtFixedRate(new Runnable() {
              @Override
              public void run() {
                  try {
      
                      doBranchCommits();
      
      
                  } catch (Throwable e) {
                      LOGGER.info("Failed at async committing ... " + e.getMessage());
      
                  }
              }
          }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
      }
      

      这里就是启动一个线程,轮训分支事务 提交任务,如果有则进行commit操作。其实分支事务的提交很简单,无非主要删除回滚的日志即可。先看下下面的逻辑:

      private void doBranchCommits() {
          if (ASYNC_COMMIT_BUFFER.size() == 0) {
              return;
          }
          ...
      
          for (String resourceId : mappedContexts.keySet()) {
              Connection conn = null;
              try {
                 ...                
                 for (Phase2Context commitContext : contextsGroupedByResourceId) {
                      try {
                          UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                      } catch (Exception ex) {
                          LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
                      }
                  }
              } finally {
                  ...
          }
      }
      

      此处就是遍历ASYNC_COMMIT_BUFFER集合,删除回滚的sql,先猜想ASYNC_COMMIT_BUFFER数据的来源就是TM在执行事务业务逻辑execute时,备份而来的。后面继续分析。

    • 进而将asyncWorker加入数据层管理器DataSourceManager,DataSourceManager就是具体执行回滚及提交等逻辑的

    • 最后将DataSourceManager赋值给rmRpcClient,进而初始化rmRpcClient。

      @Override
      public void init() {
          if (initialized.compareAndSet(false, true)) {
              super.init();
              timerExecutor.scheduleAtFixedRate(new Runnable() {
                  @Override
                  public void run() {
                      reconnect();
                  }
              }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
              ...
          }
      }
      
      private void reconnect() {
          for (String serverAddress : serviceManager.lookup(transactionServiceGroup))         {
              if (serverAddress != null) {
                  try {
                      connect(serverAddress);
                  } catch (Exception e) {
                      ...
                  }
              }
          }
      }
      
      @Override
      protected Channel connect(String serverAddress) {
          Channel channelToServer = channels.get(serverAddress);
          if (channelToServer != null) {
              channelToServer = getExistAliveChannel(channelToServer, serverAddress);
              if (null != channelToServer) {
                  return channelToServer;
              }
          }
          ...
          channelLocks.putIfAbsent(serverAddress, new Object());
          Object connectLock = channelLocks.get(serverAddress);
          synchronized (connectLock) {
              Channel channel = doConnect(serverAddress);
              return channel;
          }
      }
      
      private Channel doConnect(String serverAddress) {
          Channel channelToServer = channels.get(serverAddress);
          ...
          channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
          } catch (Exception exx) {
              ...
          return channelFromPool;
      }
      

      初始化rmRpcClient过程就是从nettyClientKeyPool获取一个与server建立连接的channel,返回即可。
      至此,GlobalTransactionScanner逻辑基本完结。其核心功能就是给各个rpc服务内置一个与server建立连接的channel。

    2.2 主服务执行事务方法体。
    • server、OrderService、AccountService、BusinessServiceImpl服务启动OK之后,继续回到下面执行方法的入口:

      @Override
      @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
      public void purchase(String userId, String commodityCode, int orderCount) {
          LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
          storageService.deduct(commodityCode, orderCount);
          orderService.create(userId, commodityCode, orderCount);
          throw new RuntimeException("xxx");
      }
      

      那么这里是如何被TM处理,进而提交至server,最终触发RM的回滚或者提交逻辑呢?
      划重点@GlobalTransactional

    • GlobalTransactional
      spring提供的注解方式,降低对业务的侵入。那么直接找到拦截器解析类:GlobalTransactionalInterceptor的拦截逻辑:

    • GlobalTransactionalInterceptor

      @Override
      public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
          final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
          if (anno != null) {
              try {
                  /**
                   * 通过覆盖TransactionalTemplate对象的execute()来对被注解的方法进行代理调
                   */
                  return transactionalTemplate.execute(new TransactionalExecutor() {
                      @Override
                      public Object execute() throws Throwable {
                          return methodInvocation.proceed();
                      }
      
                      @Override
                      public int timeout() {
                          return anno.timeoutMills();
                      }
      
                      @Override
                      public String name() {
                          String name = anno.name();
                          if (!StringUtils.isEmpty(name)) {
                              return name;
                          }
                          return formatMethod(methodInvocation.getMethod());
                      }
                  });
              } catch (TransactionalExecutor.ExecutionException e) {
                  TransactionalExecutor.Code code = e.getCode();
                  switch (code) {
                      case RollbackDone:
                          throw e.getOriginalException();
                      case BeginFailure:
                          failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                          throw e.getCause();
                      case CommitFailure:
                          failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                          throw e.getCause();
                      case RollbackFailure:
                          failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                          throw e.getCause();
                      default:
                          throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
      
                  }
              }
      
          }
          return methodInvocation.proceed();
      }
      

      以上逻辑对注解方法进行了拦截,通过TransactionalTemplate方法的执行execute后,最终return methodInvocation.proceed();返回业务执行结果。那么TransactionalTemplate的execute就是具体的事务切入点了,继续跟踪:

    • TransactionalTemplate

      public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
      
          // 1. get or create a transaction
          GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
      
          // 2. begin transaction
          try {
              tx.begin(business.timeout(), business.name());
      
          } catch (TransactionException txe) {
              throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
          }
      
          Object rs = null;
          try {
      
              // Do Your Business
              rs = business.execute();
      
          } catch (Throwable ex) {
      
              // 3. any business exception, rollback.
              try {
                  tx.rollback();
      
                  // 3.1 Successfully rolled back
                  throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
      
              } catch (TransactionException txe) {
                  // 3.2 Failed to rollback
                  throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, ex);
              }
          }
      
          // 4. everything is fine, commit.
          try {
              tx.commit();
          } catch (TransactionException txe) {
              // 4.1 Failed to commit
              throw new TransactionalExecutor.ExecutionException(tx, txe,
                  TransactionalExecutor.Code.CommitFailure);
      
          }
          return rs;
      }
      

      上面代码的逻辑按流程已经很清楚了

      • 1.获取全局事务
      • 2.执行业务逻辑
      • 3.异常回滚
      • 4.事务提交

      本节分析的核心:获取一个全局事务,然后开始 已经出现:

      // 1. get or create a transaction
      GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
      tx.begin(business.timeout(), business.name());
      

      紧接着重点分析下这两点。

      --

    2.3.获取全局事务
    • GlobalTransactionContext:全局事务容器

      /**
       * Get GlobalTransaction instance bind on current thread.
       * Create a new on if no existing there.
       *
       * @return new context if no existing there.
       */
      public static GlobalTransaction getCurrentOrCreate() {
          GlobalTransaction tx = getCurrent();
          if (tx == null) {
              return createNew();
          }
          return tx;
      }
      
      public static GlobalTransaction getCurrent() {
          GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
          if (tx != null) {
              return tx;
          }
          String xid = RootContext.getXID();
          if (xid == null) {
              return null;
          }
          tx = new DefaultGlobalTransaction(xid);
          THREAD_TRANSACTION_CONTEXT.set(tx);
          return THREAD_TRANSACTION_CONTEXT.get();
      }
      
      private static GlobalTransaction createNew() {
          GlobalTransaction tx = new DefaultGlobalTransaction();
          THREAD_TRANSACTION_CONTEXT.set(tx);
          return THREAD_TRANSACTION_CONTEXT.get();
      }
      
      DefaultGlobalTransaction() {
          this(null);
      }
      
      DefaultGlobalTransaction(String xid) {
          this.transactionManager = DefaultTransactionManager.get();
          this.xid = xid;
          if (xid != null) {
              status = GlobalStatus.Begin;
              role = GlobalTransactionRole.Participant;
          }
      }
      
      • 1.从当前线程获取全局事务
      • 2.若没有就直接创建默认全局事务DefaultGlobalTransaction
      • 3.将全局事务进行缓存,key是当前线程
      • 4.因为此时还没有XID,因此这是全局事务的状态还是Unknow

      --

    2.4.TM 开始全局事务
    • tx.begin(business.timeout(), business.name());

      继续跟踪:

      #TransactionalTemplate
      @Override
      public void begin(int timeout, String name) throws TransactionException {
          if (xid == null && role == GlobalTransactionRole.Launcher) {
              xid = transactionManager.begin(null, null, name, timeout);
              status = GlobalStatus.Begin;
              RootContext.bind(xid);
          } else {
              if (xid == null) {
                  throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
              }
              LOGGER.info(role + " is already in global transaction " + xid);
          }
      }
      

      这里角色是事务发起者Launcher,那么就继续begin

      #TransactionalTemplate
      @Override
      public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
          GlobalBeginRequest request = new GlobalBeginRequest();
          request.setTransactionName(name);
          request.setTimeout(timeout);
          GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
          return response.getXid();
      }
      private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
          try {
              return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
          } catch (TimeoutException toe) {
              throw new TransactionException(TransactionExceptionCode.IO, toe);
          }
      }
      
      #TmRpcClient
      @Override
      public Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException {
          String svrAddr = XID.getServerAddress(RootContext.getXID());
          String validAddress = svrAddr != null ? svrAddr : loadBalance();
          Channel acquireChannel = connect(validAddress);
          Object result = super.sendAsyncRequestWithResponse(validAddress, acquireChannel, msg, timeout);
          if (result instanceof GlobalBeginResponse
              && ((GlobalBeginResponse)result).getResultCode() == ResultCode.Failed) {
              LOGGER.error("begin response error,release channel:" + acquireChannel);
              releaseChannel(acquireChannel, validAddress);
          }
          return result;
      }
      
      #AbstractRpcRemoting
      private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
              throws TimeoutException {
              ...
              ChannelFuture future;
              channelWriteableCheck(channel, msg);
              future = channel.writeAndFlush(rpcMessage);
              future.addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) {
                      if (!future.isSuccess()) {
                          MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                          if (messageFuture != null) {
                              messageFuture.setResultMessage(future.cause());
                          }
                          destroyChannel(future.channel());
                      }
                  }
              });
          }
      }
      

      分析下上叙开始事务的流程

      • 1.TransactionalTemplate begin事务
      • 2.DefaultTransactionManager构造GlobalBeginRequest参数并调用TmRpcClient发起事务开始消息
      • 3.TmRpcClient获取已经建立连接的channel,将消息进行发送,以触发事务的开始。
    2.5.TC 收到消息,开启全局事务
    • 2.5.1.接收TM begin消息
      前面server一节说过,消息的接收就在AbstractRpcRemoting channelRead方法,debug一下:

      image.png
    AbstractRpcRemoting读取到消息后,进行消息的分发,继续跟踪: 
    ![image.png](https://img.haomeiwen.com/i12071549/1205389e2183fb77.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
    
    
    监听器已经捕获到事务开启的消息,进而处理器进一步进行处理:
    
    ```
    #DefaultCoordinator 根据request类型分发处理器处理消息
    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToTC)) {
            throw new IllegalArgumentException();
        }
        AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC)request;
        transactionRequest.setTCInboundHandler(this);
    
        return transactionRequest.handle(context);
    }
    
    #GlobalBeginRequest
    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
        return handler.handle(this, rpcContext);
    }
    
    #AbstractTCInboundHandler 异常处理模板执行后,回到DefaultCoordinator#doGlobalBegin处理逻辑
    @Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new Callback<GlobalBeginRequest, GlobalBeginResponse>() {
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                doGlobalBegin(request, response, rpcContext);
            }
        }, request, response);
        return response;
    }
    
    #AbstractExceptionHandler
    public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request, AbstractTransactionResponse response) {
        try {
            callback.execute(request, response);
            response.setResultCode(ResultCode.Success);
    
        } catch (TransactionException tex) {
            response.setTransactionExceptionCode(tex.getCode());
            response.setResultCode(ResultCode.Failed);
            response.setMsg("TransactionException[" + tex.getMessage() + "]");
    
        } catch (RuntimeException rex) {
            response.setResultCode(ResultCode.Failed);
            response.setMsg("RuntimeException[" + rex.getMessage() + "]");
        }
    }
    
    #DefaultCoordinator
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
    }
    
    #DefaultCore 开始逻辑
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(
                applicationId, transactionServiceGroup, name, timeout);
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
        session.begin();
    
        return XID.generateXID(session.getTransactionId());
    }
    
    #GlobalSession 获取GlobalSession
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;
    
        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
    }
    
    #GlobalSession GlobalSession开始逻辑
    @Override
    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }
    
    #AbstractSessionManager 将GlobalSession进行缓存
    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);
    
    }
    
    #XID 生成全局的XID并且返回
    public static String generateXID(long tranId) {
        return ipAddress + ":" + port + ":" + tranId;
    }
    ```
    
    那么对于上面的执行流程分析一下:
    
    - 1.SocketChannel read到消息,并且进行消息的分发
    - 2.根据消息类型寻找对应的消息处理器,此处是GlobalBeginRequest请求,那么自然寻找到GlobalBeginResponse handle逻辑
    - 3.统一的异常模板处理后,进入核心处理逻辑DefaultCoordinator#doGlobalBegin,开始进行处理
    - 4.根据applicationId, transactionServiceGroup创造全局事务session:GlobalSession,同时设置session中全局事务状态为GlobalStatus.Begin,生成uuid的transactionId。
    - 5.将对应的sessionManager加入lifecycleListeners集合,以管控整个session生命周期
    - 6.开始session的生命周期,且设置相关开始时间,并将session以<session.getTransactionId(), session>map进行缓存进SessionManager
    - 7.根据TransactionId生成XID,以Response形式返回给TM。
    - 8.完成事务的开启逻辑。
    

    --

    2.6.TM 收到处理结果(XID),继续回到TM跟踪
    • 结果同步

      #DefaultGlobalTransaction
      @Override
      public void begin(int timeout, String name) throws TransactionException {
          if (xid == null && role == GlobalTransactionRole.Launcher) {
              xid = transactionManager.begin(null, null, name, timeout);
              status = GlobalStatus.Begin;
              RootContext.bind(xid);
          } else {
              if (xid == null) {
                  throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
              }
              LOGGER.info(role + " is already in global transaction " + xid);
          }
      }
      
      #RootContext
      public static void bind(String xid) {
          if (LOGGER.isDebugEnabled()) {
              LOGGER.debug("bind " + xid);
          }
          CONTEXT_HOLDER.put(KEY_XID, xid);
      }
      
      • 1.获取到返回的XID后,将TM维护的全局事务状态设置为GlobalStatus.Begin。
      • 2.将返回的XID保存进context容器(其实就是当前线程的threadLocal),以保证跟server端的session状态一致。

      至此:整个事务开始流程分析完毕。最终状态就是:

      • 1.server 中的GlobalSession保存了全局事务状态等相关的信息,包含XID
      • 2.TM中的RootContext保存了全局事务状态等相关的信息,包含XID

      --

    三.未完待续。。。

    后续分析主要还是根据example官方实例分为:分支事务注册、事务回滚、事务提交进行。
    同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。

    相关文章

      网友评论

          本文标题:2-fescar(seata)源码分析-全局事务开始

          本文链接:https://www.haomeiwen.com/subject/vryqyqtx.html