前言
上一篇我们介绍了spring-tx
中的底层抽象,本篇我们一起来看看围绕这些抽象概念spring-tx
是如何打造出声明式事务的吧。笼统的说,spring-tx-5.2.6.RELEASE
的实现主要分为两个部分:
-
PlatformTransactionManager
抽象下的事务管理细节 - 基于
spring-aop
的拦截器如何将普通方法增强为事务方法的
这两部分彼此独立又相互成就,并且每个部分都有着大量的源码支撑,本篇我们先来分析spring-tx
中的AOP部分吧。
一切从EnableTransactionManagement说起
EnableTransactionManagement
注解想必大家都很熟悉了,它是启用 Spring 中注释驱动的事务管理功能的关键。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
// 使用cglib subclassing策略还是jdk dynamic proxy
boolean proxyTargetClass() default false;
// 使用spring aop运行时代理机制还是aspect j的静态编译策略
AdviceMode mode() default AdviceMode.PROXY;
// 事务Advisor的优先级
int order() default Ordered.LOWEST_PRECEDENCE;
}
EnableTransactionManagement
注解的主要作用是向容器中导入TransactionManagementConfigurationSelector
,至于注解中定义的几个属性在Spring AOP源码解析中有过详细分析,这里就不再赘述了。
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// AutoProxyRegistrar的作用是开启AOP相关功能
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}
由于我们并没有使用AspectJ
,因此导入容器的自然是ProxyTransactionManagementConfiguration
这个配置类。
@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {
@Nullable
protected AnnotationAttributes enableTx;
@Nullable
protected TransactionManager txManager;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
// 获取EnableTransactionManagement注解中定义的属性
this.enableTx = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
if (this.enableTx == null) {
throw new IllegalArgumentException("@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
}
}
// 通过TransactionManagementConfigurer获取事务管理器
@Autowired(required = false)
void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
}
TransactionManagementConfigurer configurer = configurers.iterator().next();
this.txManager = configurer.annotationDrivenTransactionManager();
}
// 支持事务相关事件的pub-sub功能
@Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
return new TransactionalEventListenerFactory();
}
}
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
// 向容器中注入一个关于事务的Advisor
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
// 事务属性的提取器,用来解析@Transactional注解
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
// 事务拦截器
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
这个配置类的核心是向容器中导入一个类型为BeanFactoryTransactionAttributeSourceAdvisor
的Bean。这是一个PointcutAdvisor
,它的Pointcut
是TransactionAttributeSourcePointcut
,Advice
是TransactionInterceptor
。
TransactionAttributeSourcePointcut
利用TransactionAttributeSource
解析@Transactional
注解的能力来选取标注了@Transactional
注解的方法,而TransactionInterceptor
则根据应用提出的需求(来自对@Transactional
注解的解析)将方法增强为事务方法,因此BeanFactoryTransactionAttributeSourceAdvisor
可以识别出那些标注了@Transactional
注解的方法,为它们应用上事务相关功能。
事务拦截器的工作原理
TransactionAttributeSource
TransactionInterceptor
能对方法进行增强,但是它却不知道该如何增强,比如是为方法新开一个独立事务还是沿用已有的事务?什么情况下需要回滚,什么情况下不需要?必须有一个『人』告诉它该如何增强,这个『人』便是TransactionAttributeSource
。
@Transactional
注解定义了事务的基础信息,它表达了应用程序期望的事务形态。TransactionAttributeSource
的主要作用就是解析@Transactional
注解,提取其属性,包装成TransactionAttribute
,这样TransactionInterceptor
的增强便有了依据。
public interface TransactionAttributeSource {
/**
* 测试某个类是否有可能解析出TransactionAttribute,对于不可能解析出事务属性的类,没有必要
* 调用 #getTransactionAttribute(...),从而提升性能
*/
default boolean isCandidateClass(Class<?> targetClass) {
return true;
}
/**
* 返回方法的事务属性,由@Transactional注解指定
*/
@Nullable
TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass);
}
tx-attr-source.png
前面我们已经见过,spring-tx
使用AnnotationTransactionAttributeSource
来做具体的解析工作,其父类AbstractFallbackTransactionAttributeSource
定义了解析TransactionAttribute
的优先级,核心方法是computeTransactionAttribute(...)
。
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 不解析非public修饰的方法,这也是@Transactional注解失效的一个原因
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 传入的方法可能来自于接口,相比较而言目标类中定义的方法具有更高的优先级
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// 代理给模板方法
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// 尝试在目标类上寻找
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
// 传入的方法确实不是目标类中定义的那一个
if (specificMethod != method) {
// 尝试在传入的方法上获取,这也是@Transactional注解支持定义在接口中的原因
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// 最后尝试在定义方法的类上获取,通常来说是接口本身
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
AnnotationTransactionAttributeSource
默认只解析public
修饰的方法,这也是导致@Transactional
注解失效的一个原因,除此之外它还实现了父类中定义的两个模板方法:
-
findTransactionAttribute(Method method)
,获取定义在方法上的事务属性 -
findTransactionAttribute(Class<?> clazz)
,获取定义在类上的事务属性
同时为了支持 EJB 中定义的javax.ejb.TransactionAttribute
和 JTA 中定义的javax.transaction.Transactional
注解,AnnotationTransactionAttributeSource
选择将实际的提取工作代理给TransactionAnnotationParser
。Spring 提供的@Transactional
注解由SpringTransactionAnnotationParser
进行解析。
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
@Override
public boolean isCandidateClass(Class<?> targetClass) {
return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
// 使用 find 语义来提取 Transactional 注解
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
// 将 Transactional 注解包装成 TransactionAttribute
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
@Override
public boolean equals(@Nullable Object other) {
return (this == other || other instanceof SpringTransactionAnnotationParser);
}
@Override
public int hashCode() {
return SpringTransactionAnnotationParser.class.hashCode();
}
}
SpringTransactionAnnotationParser
的源码还是很简单的,它使用AnnotatedElementUtils
工具类定义的find
语义来获取@Transactional
注解信息。RuleBasedTransactionAttribute
中rollbackOn(...)
的实现还是挺有意思的,其它的都平平无奇。
@Override
public boolean rollbackOn(Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("Applying rules to determine whether transaction should rollback on " + ex);
}
RollbackRuleAttribute winner = null;
int deepest = Integer.MAX_VALUE;
// 选取最接近的 RollbackRuleAttribute
if (this.rollbackRules != null) {
for (RollbackRuleAttribute rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("Winning rollback rule is: " + winner);
}
// 没有RollbackRuleAttribute支持此异常
if (winner == null) {
// 回退到父类的处理方式 —> (ex instanceof RuntimeException || ex instanceof Error)
// 只要是运行时异常或Error都进行回滚
logger.trace("No relevant rollback rule found: applying default rules");
return super.rollbackOn(ex);
}
// 找到了支持此异常的 RollbackRuleAttribute
// 确保它不是 NoRollbackRuleAttribute,NoRollbackRuleAttribute表示的是遇到类型的异常时不回滚
return !(winner instanceof NoRollbackRuleAttribute);
}
RollbackRuleAttribute
是用来确定在发生特定类型的异常(或其子类)时是否应该回滚,而NoRollbackRuleAttribute
继承自RollbackRuleAttribute
,但表达的是相反的含义。RollbackRuleAttribute
持有某个异常的名称,通过getDepth(Throwable ex)
算法来计算指定的Throwable
和持有的异常在继承链上的距离。
public int getDepth(Throwable ex) {
return getDepth(ex.getClass(), 0);
}
private int getDepth(Class<?> exceptionClass, int depth) {
// 名称匹配
if (exceptionClass.getName().contains(this.exceptionName)) {
return depth;
}
// 不在一个继承体系
if (exceptionClass == Throwable.class) {
return -1;
}
// 沿着继承链向上比对
return getDepth(exceptionClass.getSuperclass(), depth + 1);
}
TransactionInterceptor
tx-advice.png 程序猿只有在拿到需求以后才能开工,TransactionInterceptor
也一样,有了TransactionAttributeSource
之后就可以有依据的增强了。观察类图,TransactionInterceptor
实现了MethodInterceptor
接口,那么自然要实现接口中的方法:
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取目标Class
Class<?> targetClass = (invocation.getThis() != null ?
AopUtils.getTargetClass(invocation.getThis())
: null);
// 代理给基类的 #invokeWithinTransaction(...)
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
可以看到,TransactionInterceptor
本身是没有实现任何逻辑的,它更像一个适配器。这样分层以后,TransactionAspectSupport
理论上就可以支持任意类型的Advice
而不只是MethodInterceptor
。实现上TransactionAspectSupport
确实也考虑了这一点,我们马上就会看到。
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 解析 @Transactional 注解,获取事务属性
// 如果不存在事务属性,说明这个方法不是一个事务方法
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ?
tas.getTransactionAttribute(method, targetClass) : null);
// 从容器中选取一个 TransactionManager 来管理事务
final TransactionManager tm = determineTransactionManager(txAttr);
// 1. 略去了 ReactiveTransactionManager 相关内容,不使用webflux的话用不上
// 2. 略去了 CallbackPreferringPlatformTransactionManager 相关内容,除了基于回调,几乎和
// PlatformTransactionManager是一模一样的,限于篇幅,删减掉了
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 方法标识,后续用作事务名称,多用于debug
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 开启事务(尽可能)
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 转向拦截器链中的下一个拦截器
// 由于TransactionInterceptor的优先级默认情况下是最低的,基本上这就是调用目标方法了
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 出现异常时对事务的处理 ——> 根据rollbackFor、noRollbackFor来决定是提交还是回滚事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 执行清理工作
cleanupTransactionInfo(txInfo);
}
// 方法正常执行完时提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
invokeWithinTransaction(...)
的流程还是非常清晰的:
- 获取
TransactionManager
和TransactionAttribute
,这两兄弟一个决定了能否开启事务,另一个决定了如何开启事务 - 尝试开启事务
-
try
块包裹住目标方法的执行,根据执行结果进行相应的处理——提交或回滚
第一步前文已经分析过了,我们来看第二步。
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果未指定事务名称,在这里指定一下
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) { // 代理模式,重新保证一下
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
// txAttr和tm两兄弟都存在的情况下才能开启事务
if (txAttr != null) {
if (tm != null) {
// 根据txAttr的相关属性开启事务
// 这里就是PlatformTransactionManager事务管理的核心逻辑了,我们下篇再说
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 准备TransactionInfo
// 如果不考虑支持不同类型的Advice的话,TransactionInfo其实是不必要的
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 创建一个新的txInfo
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// txAttr存在时才可能获取到TransactionStatus,这也表示是一个事务方法
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 记录下事务的状态
txInfo.newTransactionStatus(status);
}
else {
// 非事务方法
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 绑定到当前线程,以此来支持各种不同类型的Advice
// 比如我们使用BeforeAdvice和AfterAdvice来模拟MethodInterceptor
// 这两Advice都继承自TransactionAspectSupport,TransactionStatus的传递就得依赖于线程私有存储了
// TransactionAspectSupport因此也提供了 #currentTransactionStatus() 方法来获取当前的事务状态
txInfo.bindToThread();
return txInfo;
}
TransactionInfo
是一个非常简单的类,我们就不费什么笔墨去分析它了。接着看第三步,这一步涉及到两个不同的操作——提交或回滚。
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
// 如果存在事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 使用 TransactionManager 进行提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 如果存在事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 查询 rollbackOn 相关配置,如果确实需要回滚的话
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 使用 TransactionManager 进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// 不需要回滚,那么转而提交事务
// 当然了,调用 commit(...) 并不能保证事务一定被提交,还有一些因素会导致事务回滚,比如
// 标记为 rollback-only 的事务,我们下篇再说
try {
// 使用 TransactionManager 进行提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
至此,TransactionInterceptor
于我们而言已经没有任何秘密了。
后记
本篇我们一起分析了spring-tx
是如何通过spring-aop
的拦截器将普通方法增强为事务方法的,下篇就该说道说道PlatformTransactionManager
抽象下的事务管理细节啦,我们下篇再见~~
网友评论