Fescar TC-commit流程

作者: 晴天哥_王志 | 来源:发表于2019-01-31 14:46 被阅读40次

    开篇

     这篇文章的目的主要是讲解Fescar TC执行commit的流程,目的是讲解清楚commit流程中的一些步骤。

     遗憾的是因为commit本身Fescar的分支事务注册上报,如果事先不了解Fescar的分支事务,有些逻辑理解起来会有一些奇怪,对于branchSession本身还未了解,所以只能单独讲解commit流程。

    背景

    Fescar事务管理
    说明:
    • 分支事务中数据的 本地锁 由本地事务管理,在分支事务 Phase1 结束时释放。
      同时,随着本地事务结束,连接 也得以释放。

    • 分支事务中数据的 全局锁 在事务协调器侧管理,在决议 Phase2 全局提交时,全局锁马上可以释放。只有在决议全局回滚的情况下,全局锁 才被持有至分支的 Phase2 结束。

    这个设计,极大地减少了分支事务对资源(数据和连接)的锁定时间,给整体并发和吞吐的提升提供了基础。

    这里需要重点指出的是:Phase1阶段的commit()操作是各个分支事务本地的事务操作。Phase2阶段的操作是全局的commit()和rollback()。TC-commit流程指的就是Phase2阶段。

    TC commit流程介绍

    • 1.根据transactionId查找begin阶段生成的GlobalSession对象。

    • 2.对GlobalSession对象进行清理操作,删除分支事务的锁并清理GlobalSession对象。

    • 3.TC通知所有RM(各分支事务的资源管理器)进行全局提交操作(doGlobalCommit)。

    TC commit源码分析

    public class DefaultCoordinator extends AbstractTCInboundHandler
        implements TransactionMessageHandler, ResourceManagerInbound {
    
        @Override
        protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
            throws TransactionException {
            response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));
        }
    }
    

    说明:

    • DefaultCoordinator的doGlobalCommit()作为全局回滚入口
    • core.commit()根据XID去执行全局commit()操作。

    Commit 主流程

    public class DefaultCore implements Core {
        public GlobalStatus commit(String xid) throws TransactionException {
             // 1.查找GlobalSession
            GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
            
            if (globalSession == null) {
                return GlobalStatus.Finished;
            }
            
            GlobalStatus status = globalSession.getStatus();
            // 2.关闭全局session并执行清理工作
            globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.
    
            // 3.执行GlobalCommit通知动作
            if (status == GlobalStatus.Begin) {
                if (globalSession.canBeCommittedAsync()) {
                    asyncCommit(globalSession);
                } else {
                    doGlobalCommit(globalSession, false);
                }
    
            }
    
            // 返回GlobalCommit后的状态
            return globalSession.getStatus();
        }
    }
    

    说明:

    • DefaultCore是全局回滚的核心逻辑。
    • SessionHolder.findGlobalSession查找全局的GlobalSession对象。
    • GlobalSession执行closeAndClean操作。
    • DefaultCore执行doGlobalCommit通知TC执行全局回滚操作。

    查找GlobalSession

    public class SessionHolder {
        public static GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
            return getRootSessionManager().findGlobalSession(transactionId);
        }
    }
    
    public class DefaultSessionManager extends AbstractSessionManager {}
    
    public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
    
        protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();
    
        public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {
            return sessionMap.get(transactionId);
        }
    }
    

    说明:

    • findGlobalSession()方法从DefaultSessionManager当中获取GlobalSession。
    • DefaultSessionManager的父类AbstractSessionManager的findGlobalSession从sessionMap获取GlobalSession对象。

    GlobalSession的closeAndClean

    public class GlobalSession implements SessionLifecycle, SessionStorable {
    
        public void closeAndClean() throws TransactionException {
            close();
            clean();
        }
    
        public void close() throws TransactionException {
            if (active) {
                for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
                    lifecycleListener.onClose(this);
                }
            }
        }
    
        private void clean() throws TransactionException {
            for (BranchSession branchSession : branchSessions) {
                branchSession.unlock();
            }
        }
    }
    
    
    
    public class DefaultSessionManager extends AbstractSessionManager {}
    public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
    
        public void onClose(GlobalSession globalSession) throws TransactionException {
            globalSession.setActive(false);
        }
    }
    

    说明:

    • GlobalSession的执行closeAndClean操作,先执行close再执行clean。
    • lifecycleListener.onClose()执行DefaultSessionManager的onClose()。
    • DefaultSessionManager的onClose()把设置active标识为false。
    • clean()操作对所有的分支事务branchSession释放锁。这部分逻辑比较复杂单独列出。

    BranchSession的unlock

    public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {
    
        public boolean unlock() throws TransactionException {
            if (lockHolder.size() == 0) {
                return true;
            }
            Iterator<Map.Entry<Map<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Map<String, Long>, Set<String>> entry = it.next();
                Map<String, Long> bucket = entry.getKey();
                Set<String> keys = entry.getValue();
                synchronized (bucket) {
                    for (String key : keys) {
                        Long v = bucket.get(key);
                        if (v == null) {
                            continue;
                        }
                        if (v.longValue() == getTransactionId()) {
                            bucket.remove(key);
                        }
                    }
                }
            }
            lockHolder.clear();
            return true;
        }
    }
    

    说明:

    • BranchSession的unlock()操作对BranchSession 进行清理。
    • BranchSession内部的数据由于暂未阅读该部分代码所以暂时不能解释清楚。
    • 全局清除lockHolder。

    TC执行GlobalCommit

    public class DefaultCore implements Core {
    
        public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
            // 遍历所有的BranchSession执行回滚操作
            for (BranchSession branchSession : globalSession.getSortedBranches()) {
                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    continue;
                }
                try {
                    BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(
                                                branchSession.getTransactionId()), branchSession.getBranchId(),
                        branchSession.getResourceId(), branchSession.getApplicationData());
    
                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            globalSession.removeBranch(branchSession);
                            continue;
                        case PhaseTwo_CommitFailed_Unretryable:
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                                continue;
                            } else {
                                globalSession.changeStatus(GlobalStatus.CommitFailed);
                                globalSession.end();
                                LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",
                                    globalSession.getTransactionId(), branchSession.getBranchId());
                                return;
                            }
                        default:
                            if (!retrying) {
                                queueToRetryCommit(globalSession);
                                return;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);
                                continue;
                            } else {
                                LOGGER.error(
                                    "Failed to commit global[{}] since branch[{}] commit failed, will retry later.",
                                    globalSession.getTransactionId(), branchSession.getBranchId());
                                return;
                            }
    
                    }
    
                } catch (Exception ex) {
                    LOGGER.info("Exception committing branch {}", branchSession, ex);
                    if (!retrying) {
                        queueToRetryCommit(globalSession);
                        if (ex instanceof TransactionException) {
                            throw (TransactionException) ex;
                        } else {
                            throw new TransactionException(ex);
                        }
                    }
    
                }
    
            }
            if (globalSession.hasBranch()) {
                return;
            }
            globalSession.changeStatus(GlobalStatus.Committed);
            globalSession.end();
        }
    }
    

    说明:

    • 对所有的BranchSession执行branchCommit通知
    • 针对branchCommit返回状态进行判断,有一些逻辑在里面,后续阅读了Branch相关资料后再补充状态转移图。

    相关文章

      网友评论

        本文标题:Fescar TC-commit流程

        本文链接:https://www.haomeiwen.com/subject/eolhsqtx.html