概要
过度
我们上文介绍了Spring-tx中对类扫描的相关操作,在返回true后AOP会接受增强操作,我们需要关注的是增强操作后调用链上事务的相关处理逻辑,这也是本文要着重记录的。
内容简介
本文记录事务调用链的相关处理逻辑。
所属环节
事物具体工作逻辑
上下环节
上文:事物注册相关逻辑
下文:无
源码解析
入口
我们上文介绍Spring-tx在解析标签时向上下文注册了BeanFactoryTransactionAttributeSourceAdvisor
方法,它集成了用于保存事物注解属性的TransactionAttributeSource
和用来处理事物的TransactionInterceptor
。
我们先看BeanFactoryTransactionAttributeSourceAdvisor
暴露Advice
的逻辑:
public Advice getAdvice() {
Advice advice = this.advice;
if (advice != null) {
return advice;
}
Assert.state(this.adviceBeanName != null, "'adviceBeanName' must be specified");
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve 'adviceBeanName'");
if (this.beanFactory.isSingleton(this.adviceBeanName)) {
// Rely on singleton semantics provided by the factory.
advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
this.advice = advice;
return advice;
} else {
// No singleton guarantees from the factory -> let's lock locally but
// reuse the factory's singleton lock, just in case a lazy dependency
// of our advice bean happens to trigger the singleton lock implicitly...
synchronized (this.adviceMonitor) {
advice = this.advice;
if (advice == null) {
advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
this.advice = advice;
}
return advice;
}
}
}
Emmm,基本没有什么特殊逻辑,我们直接看TransactionInterceptor
吧。
TransactionInterceptor
调用逻辑
我们先看它的继承树:
1.png我们为了思路清晰,还是从调用的入口开始看。
invoke()
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
// 因为后面我们拿到事务相关配置属性时需要调用目标的信息,所以我们需要先获得被增强的类、方法信息
// 根据当前情况得到道理的目标对象。
// 后面要根据当前情况获得该目标(方法级别)对应的事务配置(就是你的@Transactional中的属性)
// (事务配置的获得规则前面进行了获得并缓存)
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
我们继续看事物相关的处理:
// 一个基于 around 增强器的通用事务委托,可以处理 CallbackPreferringPlatformTransactionManager、PlatformTransactionManager
// 两种事务管理器【有时间可以看看上那两个各自的特点和适用场景】
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// 拿到前面从注解中找出来的事务属性,如果没有配置,说明没有打事务注解
TransactionAttributeSource tas = getTransactionAttributeSource();
// 拿到本次代理适用的事务属性配置
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 拿到对应的事务管理器
// TODO 这里后面二刷,争取连起来
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 根据当前方法情况,生成唯一标示
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// TODO 声明式事务处理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 创建 TransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 继续调用拦截器链
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// target invocation exception
// 异常回滚 TransactionInfo
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 清除 TransactionInfo 信息
cleanupTransactionInfo(txInfo);
}
// 提交 TransactionInfo 【事务】
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// TODO 编程式事务处理
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
} finally {
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
} catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
思路如流程图所示:
2.png我们的关注点主要是在创建事务、根据异常处理回滚信息、清理事务信息、提交事务几个阶段。
创建事务
此处是对createTransactionIfNecessary()
方法的讲述
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 如果事务属性没有指定名称,就用前面生成的唯一标识
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 直接获得事务状态
status = tm.getTransaction(txAttr);
} else {
// 没有 manager ,就不进行事务功能处理
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 根据:
// 1. 事务信息
// 2. 事务管理器
// 3. 唯一标识【这个感觉有点多余了】
// 4. 事务状态
// 创建事务属性【事务属性用于实际实现事务功能,其中有以上东西的引用】
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
和Spring函数的一贯特点相似,将功能进行了分割和委托:
- 先借助
TransactionManager
根据TransactionAttribute
得到TransactionStatus
对象,此对象我们叫它事务信息,里面不止有我们的事务配置,还有此次调用的事务及时属性,比如事务的配置、当前占用的Connection
等等 - 然后借助
prepareTransactionInfo()
方法,得到TransactionInfo
,用于在事务的Interceptor
中进行事务状态的记录传递
我们先看tm.getTransaction()
,我们关注主要逻辑,所以直接进入DataSourceTransactionManager
:
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 拿到当前线程的锁对象
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// 如果当前线程已经占有锁
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 判断配置合理性
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 在当前没有事务的情况下,根据事务的配置决定采取的策略
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// 如果是 PROPAGATION_MANDATORY ,强制要求事前存在事务,报错
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 此类三种情况要求如果没有事务就新建事务
// 阻塞空事务(拿到一个阻塞的holder,后面用来唤醒)
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 如果没有强行配置不允许事务,就可以开始创建事务了
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 设置事务,如从数据源获得connection、开启同步、将配置好的信息保存至线程变量
doBegin(transaction, definition);
// 将事务的相关属性设置到线程变量中
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
// 适用默认的隔离状态????
// TODO 后面根据实际适用情况看吧
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
主要思路如下:
3.png我们依次看这两种情况:
处理新事务
4.png我们看一下阻塞事务的逻辑,这里可能涉及对底层API的调用
5.png感觉就是把当前的所有配置保存到一个节点中,然后清空当前所有配置,然后返回保存的节点,由调用方进行保存、传递。
我们看一下唤醒旧事务的操作:
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) { // 保存事务和同步信息的holder不为空
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) { // 将事务的资源绑定到 ThreadLocal 中【这里就是数据源和 Connection 的映射关系】
doResume(transaction, suspendedResources);
}
// 把节点中存储的配置进行还原
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.resume();
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
流程如下:
6.png总体来说就是把事务的占用的配置重新压进存储数据源和占用 Connection
的 ThradLocal
中,然后把所有的同步配置也都恢复至各自的 ThreadLocal
。
我们看一下开启新事务和设置新同步的操作:
7.png 8.png处理已经存在的事务
本节的内容是handleExistingTransaction()
方法的记录:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 不支持事务,有事务就抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 不支持事务,有就阻塞
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 必须新的,阻塞现有的
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 内嵌
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 默认使用保存点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
} else {
// 不支持保存点,就在不阻塞现有事务的基础上直接占用新的 Connection 进行事务,
// 这里存在配置覆盖,原有事务、同步设置到 ThreadLocal 中的东西会被覆盖。
// 但是会将覆盖前的配置传出去,需要调用者自行在结束此事务之后恢复
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
// 如果当前事务存在就内嵌,不存在就创建新的
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) { // 看TransactionManager是否配置了在加入当前事务前要先校验属性,默认为false
// 事务配置的传播属性和当前事务的传播属性如果不兼容,就报错
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// 事务配置的只读属性和抢断事务的只读属性不兼容,就报错【当前只读,配置的要求可以写就挂了】
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 校验完成,直接加入当前事务即可。即在当前事务中加入新的同步
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
流程图如下:
9.png将新事务押入调用堆栈
prepareTransactionInfo()
的方法,会将上面的到的事务所有信息,包括属性、状态、使用的TM等等构建成一个TransactionInfo
,用来在ThreadLocal
中拉链,并返回。
总结
总的来说创建事务的过程主要是处理当前事务和新事务的兼容问题,如果单纯创建新事务其实很简单,就是
从数据源占据一个Connection
,然后把事务、同步属性进行设置即可。当事务出现冲突时在判断状态合理的情况下可以选择
- 创建新的:阻塞当前事务(把当前事务占据的
Connection
进行保存,然后解绑ThreadLocal
属性),保存当前同步属性。然后按照从配置中读出来的东西开启新事务、设置新的同步 - 内嵌事务:
- 或者使用保存点,继续占用当前事务(使用当前
Connection
),使用当前的同步属性,然后创建保存点,结束。 - 不允许使用保存点,只能将就着模拟,放弃当前事务,新创建一个事务(占用一个
Connection
),照常设置同步、返回
- 或者使用保存点,继续占用当前事务(使用当前
- 不支持事务:当前事务阻塞,不占用新的
Connection
,整一个空事务
处理异常回滚信息
本节主要处理调用抛出异常后的回滚相关操作。
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 如果事务存在【事务属性配置了】且表明事务要针对此异常回滚,就回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
这里的思路比较明白,如果事务存在且根据配置项要对此异常回滚,就回滚;否则提交。
我们继续看回滚的逻辑,后面会专门看提交 :
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
这里主要做了一个入参合理性判断:已经完成的事务不能再次进行回滚、提交。将实际的回滚逻辑继续做了委托:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 如果是新的同步快结束的话,调用同步的钩子
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
// 如果有保存点,就回滚至保存点
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
// 如果是新事务,直接回滚
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
} else {
// Participating in larger transaction
// 是更大事务的一部分【从逻辑上来说存在嵌套,但是不支持保存点和创建嵌套那种】
if (status.hasTransaction()) {
// status 设置了"设置回滚标志位" 或者全局设置了"设置回滚标志位"
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
} else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 调用同步的结束后的钩子
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
cleanupAfterCompletion(status);
}
}
我们不在乎什么钩子的调用和钩子预留位,这里其实就是一个if-else
:
- 如果有保存点,说明是嵌套事务,直接控制回滚至保存点即可
- 如果是新事务,直接回滚即可
- 如果是更大事务的一部分,且没有使用保存点,设置回滚标志位,等待外层函数完成后进行回滚
当然,最后调用了cleanupAfterCompletion()
,对现场进行了清理:
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 设置事务状态为结束
status.setCompleted();
// 如果是新的同步,就清理同步属性设置
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 如果是新的事务 ,就清理事务设置【归还 Connection】
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
// 如果当前事务的创建涉及另一个事务的阻塞,就唤醒阻塞的事务
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
清理事务信息
我们在“创建事务”中将新事务的TransactionInfo
压入了调用堆栈,这里进行出栈,其他的没有啥东西
提交事务
我们先看commitTransactionAfterReturning()
:
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
这里照例只进行了参数的判断,干掉了一些可以快速结束的情况,将具体的提交逻辑进行了委托:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 之前基于此事务的逻辑失败了,并标记了回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
这里进行了一个判断:如果前面有嵌套事务给自己设置了回滚标志位,这里就回滚,否则照常提交,对提交事务API的调用过程,这里进行了委托:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预留
prepareForCommit(status);
// 调用钩子
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 有保存点,就清除保存点【此事务为内嵌事务,交给外部事务进行提交即可,此处不再提交】
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}
这里主要有以下几步:
- 调用钩子
- 如果有
SavePoint
,说明此事务不是最外层事务,释放保存点退出即可,外部事务会帮助提交 - 如果是新事务,就提交
- 调用钩子、清理现场
具体的提交事务API的调用在doCommit()
中:
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
总结
上面算是把事务的相关东西过了一遍,忽略了很多东西,注意几点即可:
- 主要看看针对事务配置的传播方式的实现逻辑即可,毕竟学主要是为了用,别死钻
- 关注一下嵌套事务的回滚点、回滚标志位的相关逻辑,了解即可
- 当前环节的事务栈以
ThreadLocal<TransactionInfo> transactionInfoHolder
为准,那些杂乱的ThreadLocal
在某些特殊情况下会丢东西。
在实用方面的一些经验:
- 很多时候我们出错都抛出
RuntimeException
,然后一直上抛的,所以事务的传递机制很多时候是用的默认的REQUIRE_NEW
,需要继续就catch
,不需要停止就不管,分别对应了两种嵌套的策略。很多时候也没有针对事务配置做那么多东西 - 事务这里只是针对jdbc,对一些dubbo、redis之类的设置不生效
网友评论