开篇
这篇文章主要是梳理TC处理TM发送消息的过程,由于消息种类较多所以打算额外写篇文章分析,这篇文章主要把进入网络层以后的基本流程梳理下,方便大家阅读源码。
这篇文章的没有针对TM的接收部分进行分析,针对收到报文以后的处理流程。
Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
TC处理时序图
TC.jpg说明:
- TC处理流程核心模块是DefaultCoordinator和DefaultCore。
- DefaultCoordinator和DefaultCore的调用方式是直接方法进行调用。
TC处理流程源码分析
public class DefaultCoordinator extends AbstractTCInboundHandler
implements TransactionMessageHandler, ResourceManagerInbound {
private ServerMessageSender messageSender;
private Core core = CoreFactory.get();
public DefaultCoordinator(ServerMessageSender messageSender) {
this.messageSender = messageSender;
core.setResourceManagerInbound(this);
}
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));
}
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.rollback(XID.generateXID(request.getTransactionId())));
}
@Override
protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext)
throws TransactionException {
response.setGlobalStatus(core.getStatus(XID.generateXID(request.getTransactionId())));
}
}
说明:
-
DefaultCoordinator提供doGlobalBegin实现事务的开启。
-
DefaultCoordinator提供doGlobalCommit实现事务的提交。
-
DefaultCoordinator提供doGlobalRollback实现事务的回滚。
-
DefaultCoordinator的Core通过CoreFactory方法获取。
public class CoreFactory {
private static class SingletonHolder {
private static Core INSTANCE = new DefaultCore();
}
public static final Core get() {
return CoreFactory.SingletonHolder.INSTANCE;
}
public static void set(Core core) {
CoreFactory.SingletonHolder.INSTANCE = core;
}
}
说明:
- CoreFactory提供获取DefaultCore的方法。
public class DefaultCore implements Core {
private LockManager lockManager = LockManagerFactory.get();
private ResourceManagerInbound resourceManagerInbound;
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {}
@Override
public GlobalStatus commit(String xid) throws TransactionException {}
@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {}
private void asyncCommit(GlobalSession globalSession) throws TransactionException {}
private void queueToRetryCommit(GlobalSession globalSession) throws TransactionException {}
private void queueToRetryRollback(GlobalSession globalSession) throws TransactionException {}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {}
@Override
public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {}
}
说明:
-
DefaultCore 是TC执行事务操作的核心。
-
DefaultCore 提供begin/commit/rollback等操作。
网友评论