关于AT模式的流程,我们在上一篇文章 一文简要概述Seata AT与TCC的区别
已经介绍的差不多了。但是日常学习中,我们仅仅只知道一个流程是远远不够的。我们往往会遇到许多问题,现在根据几个我在学习Seata时产生的几个问题,进入源码中了解下具体是怎么实现的。
开始之前,介绍几个AT模式下比较重要的类
TM 操作核心代码 io.seata.tm.api.TransactionalTemplate
RM 操作核心代码 io.seata.rm.datasource.exec.ExecuteTemplate
2.1 TM是如何开启全局事务的
// io.seata.tm.api.TransactionalTemplate#beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 钩子方法
triggerBeforeBegin();
// 开去全局事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
// 钩子方法
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
// io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// 通过RPC请求调用到TC服务
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
// io.seata.server.coordinator.DefaultCore#begin
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建全局事务
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin();
// transaction start event
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
// 返回xid给TM
return session.getXid();
}
2.2 TM是如何提交/回滚全局事务的
// io.seata.tm.api.TransactionalTemplate#execute
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 开启全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
// 执行业务代码,调用对应的TM服务
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 回滚全局事务 具体如何提交/回滚,留到2.4中描述,此处仅看TM是如何触发提交/回滚操作的
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
// 提交全局事务
commitTransaction(tx);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
2.3 RM的回滚日志是怎么生成的
// io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
{
throw new NotSupportYetException("multi pk only support mysql!");
}
// 生成执行前的镜象
TableRecords beforeImage = beforeImage();
// 获取执行的结果
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 生成执行后的镜象
TableRecords afterImage = afterImage(beforeImage);
// 生成UndoLog
prepareUndoLog(beforeImage, afterImage);
return result;
}
2.4 RM提交之前需要获取全局锁,全局锁是如何获取的(用于防止多个分布式事务修改到相同的数据导致问题)
// io.seata.rm.datasource.ConnectionProxy#doCommit
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// 判断是否含有xid
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
// 判断是否需要GlobalLock
processLocalCommitWithGlobalLocks();
} else {
// 直接提交
targetConnection.commit();
}
}
// io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务 此处调用TC进行获取锁
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
// 上报一阶段结果
report(true);
}
context.reset();
}
// io.seata.server.coordinator.AbstractCore#branchRegister
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
// 上锁
branchSessionLock(globalSession, branchSession);
try {
// 注册分支
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
return branchSession.getBranchId();
});
}
// io.seata.server.lock.AbstractLockManager#acquireLock(具体上锁的代码)
@Override
public boolean acquireLock(BranchSession branchSession) throws TransactionException {
if (branchSession == null) {
throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
}
String lockKey = branchSession.getLockKey();
if (StringUtils.isNullOrEmpty(lockKey)) {
// no lock
return true;
}
// get locks of branch
List<RowLock> locks = collectRowLocks(branchSession);
if (CollectionUtils.isEmpty(locks)) {
// no lock
return true;
}
return getLocker(branchSession).acquireLock(locks);
}
2.5 TC是如何协调各个分支事务的
// 以commit为例,TM调用COMMIT时最终会调用到TC的commit方法
// io.seata.tm.api.DefaultGlobalTransaction#commit
@Override
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
// 包含了重试
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
// 异常最终会向上抛出至TM处,触发全局回滚
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
// io.seata.server.coordinator.DefaultCore#commit
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
// 是否可以提交全局事务
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
if (success && !globalSession.getBranchSessions().isEmpty()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
网友评论