美文网首页
2-fescar(seata)源码分析-全局事务开始

2-fescar(seata)源码分析-全局事务开始

作者: 致虑 | 来源:发表于2019-02-19 22:42 被阅读0次

2-fescar源码分析-全局事务开始

一、官方介绍

1.TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
2.XID 在微服务调用链路的上下文中传播。

那这一篇主要分析fescar如何开启一个事务,TM 如何向 TC 申请开启一个全局事务,全局事务如何创建成功并生成一个全局唯一的 XID。并将XID在微服务中进行传递。

--

二、(原理)源码分析

紧接着上一篇的server启动分析,依然借助官网的example例图进行出发。

2.1 demo
  • 继续看下官网的结构图:
    [图片上传失败...(image-1c1702-1550587371400)]
项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**全局事务的开端**
2.2.主服务入口
  • 1.启动主服务:BusinessServiceImpl

    public class BusinessServiceImpl implements BusinessService {
        ...
        @Override
        @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
        public void purchase(String userId, String commodityCode, int orderCount) {
            LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
            storageService.deduct(commodityCode, orderCount);
            orderService.create(userId, commodityCode, orderCount);
            throw new RuntimeException("xxx");
        }
        ...
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"dubbo-business.xml"});
            final BusinessService business = (BusinessService)context.getBean("business");
            business.purchase("U100001", "C00321", 2);
        }
    }
    
    public class StorageServiceImpl implements StorageService {
    
        private JdbcTemplate jdbcTemplate;
        
        @Override
        public void deduct(String commodityCode, int count) {
            LOGGER.info("Storage Service Begin ... xid: " + RootContext.getXID());
            LOGGER.info("Deducting inventory SQL: update storage_tbl set count = count - {} where commodity_code = {}",count,commodityCode);
    
            jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});
            LOGGER.info("Storage Service End ... ");
        }
    
        public static void main(String[] args) throws Throwable {
    
            String applicationId = "dubbo-demo-storage-service";
            String txServiceGroup = "my_test_tx_group";
    
            RMClientAT.init(applicationId, txServiceGroup);
    
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"dubbo-storage-service.xml"});
            context.getBean("service");
            JdbcTemplate jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");
            jdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'");
            jdbcTemplate.update("insert into storage_tbl(commodity_code, count) values ('C00321', 100)");
            new ApplicationKeeper(context).keep();
        }
    }
    
    • 1.BusinessServiceImpl通过main启动,接着purchase方法调用了其他rpc的相关逻辑,此时是多个写服务,必然会涉及分布式事务。
    • 2.StorageServiceImpl通过main方法启动服务,deduct是直接被BusinessServiceImpl调用的写服务方法。
    • 3.其他OrderServiceImpl逻辑类似。

--

2.2 相关RPC服务初始化。
  • 以StorageServiceImpl服务启动为例,首先看看dubbo-storage-service.xml文件配置

    <bean id="storageDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy">
        <constructor-arg ref="storageDataSource" />
    </bean>
    
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="storageDataSourceProxy" />
    </bean>
    
    <dubbo:application name="dubbo-demo-storage-service"  />
    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />
    <dubbo:protocol name="dubbo" port="20882" />
    <dubbo:service interface="com.alibaba.fescar.tm.dubbo.StorageService" ref="service" timeout="10000"/>
    
    <bean id="service" class="com.alibaba.fescar.tm.dubbo.impl.StorageServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>
    
    <bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-storage-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>
    
    

    划重点:初始化服务时,内部会加载GlobalTransactionScanner类,那么这个类具体作用是什么呢?其实就是建立rpc服务下的socketChannel,将rpc与TC server建立连接,保持通信。
    继续往下跟踪。

  • 2.2.1 初始化服务内的SocketChannel

    继续跟踪GlobalTransactionScanner逻辑:

    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        initClient();
    }
    
    private void initClient() {
        ...
        TMClient.init(applicationId, txServiceGroup);
        ...
    }
    

    根据配置中配置的applicationId, txServiceGroup调用RMClientAT去执行init操作:

    public class RMClientAT {
        public static void init(String applicationId, String transactionServiceGroup) {
            RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
            AsyncWorker asyncWorker = new AsyncWorker();
            asyncWorker.init();
            DataSourceManager.init(asyncWorker);
            rmRpcClient.setResourceManager(DataSourceManager.get());
            rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
            rmRpcClient.init();
        }
    }
    
    • 1.根据applicationId, transactionServiceGroup获取一个RmRpcClient实例
    • 2.获取AsyncWorker一个异步工作执行器
    • 3.异步工作执行器进行初始化
    • 4.将AsyncWorker加入DataSourceManager
    • 5.将DataSourceManager、消息监听器设置到RmRpcClient
    • 6.初始化rmRpcClient

    那么一步步分析下:

  • RmRpcClient:肯定是netty的一个客户端
    看到其父类AbstractRpcRemotingClient,里面属性就是netty相关的工作线程组、启动组件等。
    与server端相关的通信就靠这个RmRpcClient了

  • 构造一个异步工作组:AsyncWorker,并进行初始化

    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
    
                    doBranchCommits();
    
    
                } catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());
    
                }
            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }
    

    这里就是启动一个线程,轮训分支事务 提交任务,如果有则进行commit操作。其实分支事务的提交很简单,无非主要删除回滚的日志即可。先看下下面的逻辑:

    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
        ...
    
        for (String resourceId : mappedContexts.keySet()) {
            Connection conn = null;
            try {
               ...                
               for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    try {
                        UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
                    }
                }
            } finally {
                ...
        }
    }
    

    此处就是遍历ASYNC_COMMIT_BUFFER集合,删除回滚的sql,先猜想ASYNC_COMMIT_BUFFER数据的来源就是TM在执行事务业务逻辑execute时,备份而来的。后面继续分析。

  • 进而将asyncWorker加入数据层管理器DataSourceManager,DataSourceManager就是具体执行回滚及提交等逻辑的

  • 最后将DataSourceManager赋值给rmRpcClient,进而初始化rmRpcClient。

    @Override
    public void init() {
        if (initialized.compareAndSet(false, true)) {
            super.init();
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    reconnect();
                }
            }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
            ...
        }
    }
    
    private void reconnect() {
        for (String serverAddress : serviceManager.lookup(transactionServiceGroup))         {
            if (serverAddress != null) {
                try {
                    connect(serverAddress);
                } catch (Exception e) {
                    ...
                }
            }
        }
    }
    
    @Override
    protected Channel connect(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null) {
            channelToServer = getExistAliveChannel(channelToServer, serverAddress);
            if (null != channelToServer) {
                return channelToServer;
            }
        }
        ...
        channelLocks.putIfAbsent(serverAddress, new Object());
        Object connectLock = channelLocks.get(serverAddress);
        synchronized (connectLock) {
            Channel channel = doConnect(serverAddress);
            return channel;
        }
    }
    
    private Channel doConnect(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        ...
        channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
        } catch (Exception exx) {
            ...
        return channelFromPool;
    }
    

    初始化rmRpcClient过程就是从nettyClientKeyPool获取一个与server建立连接的channel,返回即可。
    至此,GlobalTransactionScanner逻辑基本完结。其核心功能就是给各个rpc服务内置一个与server建立连接的channel。

2.2 主服务执行事务方法体。
  • server、OrderService、AccountService、BusinessServiceImpl服务启动OK之后,继续回到下面执行方法的入口:

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageService.deduct(commodityCode, orderCount);
        orderService.create(userId, commodityCode, orderCount);
        throw new RuntimeException("xxx");
    }
    

    那么这里是如何被TM处理,进而提交至server,最终触发RM的回滚或者提交逻辑呢?
    划重点@GlobalTransactional

  • GlobalTransactional
    spring提供的注解方式,降低对业务的侵入。那么直接找到拦截器解析类:GlobalTransactionalInterceptor的拦截逻辑:

  • GlobalTransactionalInterceptor

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        final GlobalTransactional anno = getAnnotation(methodInvocation.getMethod());
        if (anno != null) {
            try {
                /**
                 * 通过覆盖TransactionalTemplate对象的execute()来对被注解的方法进行代理调
                 */
                return transactionalTemplate.execute(new TransactionalExecutor() {
                    @Override
                    public Object execute() throws Throwable {
                        return methodInvocation.proceed();
                    }
    
                    @Override
                    public int timeout() {
                        return anno.timeoutMills();
                    }
    
                    @Override
                    public String name() {
                        String name = anno.name();
                        if (!StringUtils.isEmpty(name)) {
                            return name;
                        }
                        return formatMethod(methodInvocation.getMethod());
                    }
                });
            } catch (TransactionalExecutor.ExecutionException e) {
                TransactionalExecutor.Code code = e.getCode();
                switch (code) {
                    case RollbackDone:
                        throw e.getOriginalException();
                    case BeginFailure:
                        failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case CommitFailure:
                        failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case RollbackFailure:
                        failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    default:
                        throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
    
                }
            }
    
        }
        return methodInvocation.proceed();
    }
    

    以上逻辑对注解方法进行了拦截,通过TransactionalTemplate方法的执行execute后,最终return methodInvocation.proceed();返回业务执行结果。那么TransactionalTemplate的execute就是具体的事务切入点了,继续跟踪:

  • TransactionalTemplate

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
        // 2. begin transaction
        try {
            tx.begin(business.timeout(), business.name());
    
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);
        }
    
        Object rs = null;
        try {
    
            // Do Your Business
            rs = business.execute();
    
        } catch (Throwable ex) {
    
            // 3. any business exception, rollback.
            try {
                tx.rollback();
    
                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
    
            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, ex);
            }
        }
    
        // 4. everything is fine, commit.
        try {
            tx.commit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
    
        }
        return rs;
    }
    

    上面代码的逻辑按流程已经很清楚了

    • 1.获取全局事务
    • 2.执行业务逻辑
    • 3.异常回滚
    • 4.事务提交

    本节分析的核心:获取一个全局事务,然后开始 已经出现:

    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    tx.begin(business.timeout(), business.name());
    

    紧接着重点分析下这两点。

    --

2.3.获取全局事务
  • GlobalTransactionContext:全局事务容器

    /**
     * Get GlobalTransaction instance bind on current thread.
     * Create a new on if no existing there.
     *
     * @return new context if no existing there.
     */
    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }
    
    public static GlobalTransaction getCurrent() {
        GlobalTransaction tx = THREAD_TRANSACTION_CONTEXT.get();
        if (tx != null) {
            return tx;
        }
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        tx = new DefaultGlobalTransaction(xid);
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }
    
    private static GlobalTransaction createNew() {
        GlobalTransaction tx = new DefaultGlobalTransaction();
        THREAD_TRANSACTION_CONTEXT.set(tx);
        return THREAD_TRANSACTION_CONTEXT.get();
    }
    
    DefaultGlobalTransaction() {
        this(null);
    }
    
    DefaultGlobalTransaction(String xid) {
        this.transactionManager = DefaultTransactionManager.get();
        this.xid = xid;
        if (xid != null) {
            status = GlobalStatus.Begin;
            role = GlobalTransactionRole.Participant;
        }
    }
    
    • 1.从当前线程获取全局事务
    • 2.若没有就直接创建默认全局事务DefaultGlobalTransaction
    • 3.将全局事务进行缓存,key是当前线程
    • 4.因为此时还没有XID,因此这是全局事务的状态还是Unknow

    --

2.4.TM 开始全局事务
  • tx.begin(business.timeout(), business.name());

    继续跟踪:

    #TransactionalTemplate
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }
    }
    

    这里角色是事务发起者Launcher,那么就继续begin

    #TransactionalTemplate
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        return response.getXid();
    }
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
    
    #TmRpcClient
    @Override
    public Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException {
        String svrAddr = XID.getServerAddress(RootContext.getXID());
        String validAddress = svrAddr != null ? svrAddr : loadBalance();
        Channel acquireChannel = connect(validAddress);
        Object result = super.sendAsyncRequestWithResponse(validAddress, acquireChannel, msg, timeout);
        if (result instanceof GlobalBeginResponse
            && ((GlobalBeginResponse)result).getResultCode() == ResultCode.Failed) {
            LOGGER.error("begin response error,release channel:" + acquireChannel);
            releaseChannel(acquireChannel, validAddress);
        }
        return result;
    }
    
    #AbstractRpcRemoting
    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
            throws TimeoutException {
            ...
            ChannelFuture future;
            channelWriteableCheck(channel, msg);
            future = channel.writeAndFlush(rpcMessage);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        destroyChannel(future.channel());
                    }
                }
            });
        }
    }
    

    分析下上叙开始事务的流程

    • 1.TransactionalTemplate begin事务
    • 2.DefaultTransactionManager构造GlobalBeginRequest参数并调用TmRpcClient发起事务开始消息
    • 3.TmRpcClient获取已经建立连接的channel,将消息进行发送,以触发事务的开始。
2.5.TC 收到消息,开启全局事务
  • 2.5.1.接收TM begin消息
    前面server一节说过,消息的接收就在AbstractRpcRemoting channelRead方法,debug一下:

    image.png
AbstractRpcRemoting读取到消息后,进行消息的分发,继续跟踪: 
![image.png](https://img.haomeiwen.com/i12071549/1205389e2183fb77.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


监听器已经捕获到事务开启的消息,进而处理器进一步进行处理:

```
#DefaultCoordinator 根据request类型分发处理器处理消息
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    if (!(request instanceof AbstractTransactionRequestToTC)) {
        throw new IllegalArgumentException();
    }
    AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC)request;
    transactionRequest.setTCInboundHandler(this);

    return transactionRequest.handle(context);
}

#GlobalBeginRequest
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
    return handler.handle(this, rpcContext);
}

#AbstractTCInboundHandler 异常处理模板执行后,回到DefaultCoordinator#doGlobalBegin处理逻辑
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
    GlobalBeginResponse response = new GlobalBeginResponse();
    exceptionHandleTemplate(new Callback<GlobalBeginRequest, GlobalBeginResponse>() {
        @Override
        public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
            doGlobalBegin(request, response, rpcContext);
        }
    }, request, response);
    return response;
}

#AbstractExceptionHandler
public void exceptionHandleTemplate(Callback callback, AbstractTransactionRequest request, AbstractTransactionResponse response) {
    try {
        callback.execute(request, response);
        response.setResultCode(ResultCode.Success);

    } catch (TransactionException tex) {
        response.setTransactionExceptionCode(tex.getCode());
        response.setResultCode(ResultCode.Failed);
        response.setMsg("TransactionException[" + tex.getMessage() + "]");

    } catch (RuntimeException rex) {
        response.setResultCode(ResultCode.Failed);
        response.setMsg("RuntimeException[" + rex.getMessage() + "]");
    }
}

#DefaultCoordinator
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
}

#DefaultCore 开始逻辑
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
    GlobalSession session = GlobalSession.createGlobalSession(
            applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    session.begin();

    return XID.generateXID(session.getTransactionId());
}

#GlobalSession 获取GlobalSession
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
    this.transactionId = UUIDGenerator.generateUUID();
    this.status = GlobalStatus.Begin;

    this.applicationId = applicationId;
    this.transactionServiceGroup = transactionServiceGroup;
    this.transactionName = transactionName;
    this.timeout = timeout;
}

#GlobalSession GlobalSession开始逻辑
@Override
public void begin() throws TransactionException {
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onBegin(this);
    }
}

#AbstractSessionManager 将GlobalSession进行缓存
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
    }
    transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
    sessionMap.put(session.getTransactionId(), session);

}

#XID 生成全局的XID并且返回
public static String generateXID(long tranId) {
    return ipAddress + ":" + port + ":" + tranId;
}
```

那么对于上面的执行流程分析一下:

- 1.SocketChannel read到消息,并且进行消息的分发
- 2.根据消息类型寻找对应的消息处理器,此处是GlobalBeginRequest请求,那么自然寻找到GlobalBeginResponse handle逻辑
- 3.统一的异常模板处理后,进入核心处理逻辑DefaultCoordinator#doGlobalBegin,开始进行处理
- 4.根据applicationId, transactionServiceGroup创造全局事务session:GlobalSession,同时设置session中全局事务状态为GlobalStatus.Begin,生成uuid的transactionId。
- 5.将对应的sessionManager加入lifecycleListeners集合,以管控整个session生命周期
- 6.开始session的生命周期,且设置相关开始时间,并将session以<session.getTransactionId(), session>map进行缓存进SessionManager
- 7.根据TransactionId生成XID,以Response形式返回给TM。
- 8.完成事务的开启逻辑。

--

2.6.TM 收到处理结果(XID),继续回到TM跟踪
  • 结果同步

    #DefaultGlobalTransaction
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (xid == null && role == GlobalTransactionRole.Launcher) {
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            RootContext.bind(xid);
        } else {
            if (xid == null) {
                throw new ShouldNeverHappenException(role + " is NOT in a global transaction context.");
            }
            LOGGER.info(role + " is already in global transaction " + xid);
        }
    }
    
    #RootContext
    public static void bind(String xid) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("bind " + xid);
        }
        CONTEXT_HOLDER.put(KEY_XID, xid);
    }
    
    • 1.获取到返回的XID后,将TM维护的全局事务状态设置为GlobalStatus.Begin。
    • 2.将返回的XID保存进context容器(其实就是当前线程的threadLocal),以保证跟server端的session状态一致。

    至此:整个事务开始流程分析完毕。最终状态就是:

    • 1.server 中的GlobalSession保存了全局事务状态等相关的信息,包含XID
    • 2.TM中的RootContext保存了全局事务状态等相关的信息,包含XID

    --

三.未完待续。。。

后续分析主要还是根据example官方实例分为:分支事务注册、事务回滚、事务提交进行。
同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。

相关文章

网友评论

      本文标题:2-fescar(seata)源码分析-全局事务开始

      本文链接:https://www.haomeiwen.com/subject/vryqyqtx.html