美文网首页
Spring事务——事务原理源码分析

Spring事务——事务原理源码分析

作者: 小波同学 | 来源:发表于2023-02-21 16:15 被阅读0次

    前言

    先贴一张从网上找到的Spring事务图,因为源码比较长,结合图看的话,不容易看到后边忘记


    以SpringBoot为例,看下SpringBoot是怎么做的。

    第一部分:Spring生成事务代理增强类源码分析


    一、/META-INF/spring.factories

    要想实现某个功能,首先要做的肯定是把bean加载到容器中,Spring通过[spring.factories的方式可以加载一些特定的bean

    关于事务的bean请看下图:

    TransactionAutoConfiguration、JtaAutoConfiguration

    二、TransactionAutoConfiguration

    @Configuration
    // 类路径下包含PlatformTransactionManager本类生效
    @ConditionalOnClass(PlatformTransactionManager.class)
    // 在以下四个类本加载后生效
    @AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
            DataSourceTransactionManagerAutoConfiguration.class,
            Neo4jDataAutoConfiguration.class })
    // 确保前缀为 spring.transaction 的事务有关属性配置项被加载到 bean TransactionProperties
    @EnableConfigurationProperties(TransactionProperties.class)
    public class TransactionAutoConfiguration {
    
        
        @Bean
        // 仅在该 bean 不存在时才定义
        @ConditionalOnMissingBean
        public TransactionManagerCustomizers platformTransactionManagerCustomizers(
                ObjectProvider<PlatformTransactionManagerCustomizer<?>> customizers) {
            return new TransactionManagerCustomizers(
                    customizers.orderedStream().collect(Collectors.toList()));
        }
    
        @Configuration
        // 当能够唯一确定一个PlatformTransactionManager bean时才生效
        @ConditionalOnSingleCandidate(PlatformTransactionManager.class)
        public static class TransactionTemplateConfiguration {
    
            private final PlatformTransactionManager transactionManager;
    
            public TransactionTemplateConfiguration(
                    PlatformTransactionManager transactionManager) {
                this.transactionManager = transactionManager;
            }
    
            @Bean
            // 仅在该 bean 不存在时才生效
            @ConditionalOnMissingBean
            public TransactionTemplate transactionTemplate() {
                return new TransactionTemplate(this.transactionManager);
            }
    
        }
    
        // 内部类
        @Configuration
        @ConditionalOnBean(PlatformTransactionManager.class)
        @ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
        public static class EnableTransactionManagementConfiguration {
    
            @Configuration
            // 非常重要的事务注解,下一步要进入这个注解看
            // 在属性 spring.aop.proxy-target-class 被明确设置为 false 时启用注解 
            @EnableTransactionManagement(proxyTargetClass = false)
            @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false", matchIfMissing = false)
            public static class JdkDynamicAutoProxyConfiguration {
    
            }
    
            @Configuration
            // 非常重要的事务注解,下一步要进入这个注解看
            // 在属性 spring.aop.proxy-target-class 缺失或者被明确设置为 true 时启用注解 
            @EnableTransactionManagement(proxyTargetClass = true)
            // matchIfMissing 为true,因为默认是没有配置的,所以spring默认走的是cglib代理
            @ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true", matchIfMissing = true)
            public static class CglibAutoProxyConfiguration {
    
            }
    
        }
    
    }
    

    三、@EnableTransactionManagement

    到这里我们第一个问题已经有答案了,AdviceMode 默认使用的是PROXY,同一个类中方法调用,拦截器不会生效,所以我们在同一类中即使调用一个事务方法,方法拦截器不会生效,事务就不会生效。

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import({TransactionManagementConfigurationSelector.class})
    public @interface EnableTransactionManagement {
    
        // proxyTargetClass = false表示是JDK动态代理支持接口代理。true表示是Cglib代理支持子类继承代理。
        boolean proxyTargetClass() default false;
    
        // 事务通知模式(切面织入方式),默认代理模式(同一个类中方法互相调用拦截器不会生效),可以选择增强型AspectJ
        AdviceMode mode() default AdviceMode.PROXY;
    
        // 连接点上有多个通知时,排序,默认最低。值越大优先级越低。
        int order() default 2147483647;
    }
    

    这里通过@Import导入了TransactionManagementConfigurationSelector类

    四、TransactionManagementConfigurationSelector

    如上图所示,TransactionManagementConfigurationSelector继承自AdviceModeImportSelector实现了ImportSelector接口

    • 给容器中注册一个 InfrastructureAdvisorAutoProxyCreator 组件;利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行调用;
    • ProxyTransactionManagementConfiguration:就是一个配置类,定义了事务增强器。
    public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
        public TransactionManagementConfigurationSelector() {
        }
    
        protected String[] selectImports(AdviceMode adviceMode) {
            switch(adviceMode) {
            // 默认会走这里,载入了AutoProxyRegistrar、ProxyTransactionManagementConfiguration2个类
            case PROXY:
                return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
            case ASPECTJ:
                return new String[]{this.determineTransactionAspectClass()};
            default:
                return null;
            }
        }
    
        //。。。。。。
    }
    

    4.1、AutoProxyRegistrar

    实现了ImportBeanDefinitionRegistrar接口,重写registerBeanDefinitions方法,源码如下:

    最终调用的是:registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);源码自己看下,因为关联关系不大,就不展开了,这个方法的目的是注册InfrastructureAdvisorAutoProxyCreator的BeanDefinitio

    public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
    
        private final Log logger = LogFactory.getLog(getClass());
    
        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
            boolean candidateFound = false;
            Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
            for (String annType : annTypes) {
                AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
                if (candidate == null) {
                    continue;
                }
                Object mode = candidate.get("mode");
                Object proxyTargetClass = candidate.get("proxyTargetClass");
                if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                        Boolean.class == proxyTargetClass.getClass()) {
                    candidateFound = true;
                    // 默认代理模式,进入到这个方法
                    if (mode == AdviceMode.PROXY) {
                        // 最终调用的是:registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);基础构建增强自动代理构造器
                        AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                        if ((Boolean) proxyTargetClass) {
                            AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                            return;
                        }
                    }
                }
            }
            if (!candidateFound && logger.isInfoEnabled()) {
                String name = getClass().getSimpleName();
            }
        }
    
    }
    

    4.2、InfrastructureAdvisorAutoProxyCreator

    Infrastructure:基础设施

    看下类继承结构,这里需要首先了解下Spring容器的加载过程,了解BeanPostProcessor接口的作用

    因为间接实现了InstantiationAwareBeanPostProcessor,AbstractAutoProxyCreator(InfrastructureAdvisorAutoProxyCreator继承了它)重写了接口两个方法,对bean进行了一些操作:

    • postProcessBeforeInstantiation:实例化前
    • postProcessAfterInitialization:初始化后

    4.3、AbstractAutoProxyCreator

    public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
            implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {
            
        /**
         * 在创建Bean的流程中还没调用构造器来实例化Bean的时候进行调用(实例化前后)
         * AOP解析切面以及事务解析事务注解都是在这里完成的
         */
        @Override
        public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) {
            //获取BeanClass的缓存key
            Object cacheKey = getCacheKey(beanClass, beanName);
    
            if (!StringUtils.hasLength(beanName) || !this.targetSourcedBeans.contains(beanName)) {
                //advisedBeans保存了所有已经做过动态代理的Bean
                // 如果被解析过则直接返回
                if (this.advisedBeans.containsKey(cacheKey)) {
                    return null;
                }
                // 1. 判断当前bean是否是基础类型:是否实现了Advice,Pointcut,Advisor,AopInfrastructureBean这些接口或是否是切面(@Aspect注解)
                // 2. 判断是不是应该跳过 (AOP解析直接解析出我们的切面信息,
                // 而事务在这里是不会解析的)
                if (isInfrastructureClass(beanClass) || shouldSkip(beanClass, beanName)) {
                    //添加进advisedBeans ConcurrentHashMap<k=Object,v=Boolean>标记是否需要增强实现,
                    // 这里基础构建bean不需要代理,都置为false,供后面postProcessAfterInitialization实例化后使用。
                    this.advisedBeans.put(cacheKey, Boolean.FALSE);
                    return null;
                }
            }
    
    
            //获取用户自定义的targetSource, 如果存在则直接在对象实例化之前进行代理创建,
            // 避免了目标对象不必要的实例化
            TargetSource targetSource = getCustomTargetSource(beanClass, beanName);
            //如果有自定义targetSource就要这里创建代理对象
            //这样做的好处是被代理的对象可以动态改变,而不是值针对一个target对象(可以对对象池中对象进行代理,可以每次创建代理都创建新对象
            if (targetSource != null) {
                if (StringUtils.hasLength(beanName)) {
                    this.targetSourcedBeans.add(beanName);
                }
                //获取Advisors, 这个是交给子类实现的
                Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(beanClass, beanName, targetSource);
                Object proxy = createProxy(beanClass, beanName, specificInterceptors, targetSource);
                this.proxyTypes.put(cacheKey, proxy.getClass());
                //返回代理的对象
                return proxy;
            }
    
            return null;
        }
    
        @Override
        public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
            if (bean != null) {
                Object cacheKey = getCacheKey(bean.getClass(), beanName);
                //当 Bean 被循环引用, 并且被暴露了,
                // 则会通过 getEarlyBeanReference 来创建代理类;
                // 通过判断 earlyProxyReferences 中
                // 是否存在 beanName 来决定是否需要对 target 进行动态代理
                if (this.earlyProxyReferences.remove(cacheKey) != bean) {
                    //该方法将会返回代理类
                    return wrapIfNecessary(bean, beanName, cacheKey);
                }
            }
            return bean;
        }
    
        /**
         * Spring实现Bean代理的核心方法。wrapIfNecessary在两处会被调用,一处是getEarlyBeanReference,
         * 另一处是postProcessAfterInitialization
         */
        protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            //已经被处理过
            // 1.判断当前bean是否在targetSourcedBeans缓存中存在(已经处理过),如果存在,则直接返回当前bean
            if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
                return bean;
            }
            //不需要被织入逻辑的
            // 2.在advisedBeans缓存中存在,并且value为false,则代表无需处理
            if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
                return bean;
            }
            //是不是基础的bean 是不是需要跳过的
            // 3.bean的类是aop基础设施类 || bean应该跳过,则标记为无需处理,并返回
            if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
                this.advisedBeans.put(cacheKey, Boolean.FALSE);
                return bean;
            }
    
            // Create proxy if we have advice.
            // 返回匹配当前Bean的所有Advice\Advisor\Interceptor
            Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
            // 5.如果存在增强器则创建代理
            if (specificInterceptors != DO_NOT_PROXY) {
                this.advisedBeans.put(cacheKey, Boolean.TRUE);
                //创建Bean对应的代理,SingletonTargetSource用于封装实现类的信息
                // 5.1 创建代理对象:这边SingletonTargetSource的target属性存放的就是我们原来的bean实例(也就是被代理对象),
                // 用于最后增加逻辑执行完毕后,通过反射执行我们真正的方法时使用(method.invoke(bean, args))
                Object proxy = createProxy(
                        bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
                // 5.2 创建完代理后,将cacheKey -> 代理类的class放到缓存
                this.proxyTypes.put(cacheKey, proxy.getClass());
                // 返回代理对象
                return proxy;
            }
            //该Bean是不需要进行代理的,下次就不需要重复生成了
            this.advisedBeans.put(cacheKey, Boolean.FALSE);
            return bean;
        }
    }   
    

    4.4 createProxy

    public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
            implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {
    
        protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
                @Nullable Object[] specificInterceptors, TargetSource targetSource) {
            //如果beanFactory是ConfigurableListableBeanFactory的类型,暴露目标类
            if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
                AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
            }
            //创建一个ProxyFactory,当前ProxyCreator在创建代理时将需要用到的字段赋值到ProxyFactory中去
            ProxyFactory proxyFactory = new ProxyFactory();
            //将 当前的AnnotationAwareAspectJAutoProxyCreator 对象的属性赋值给ProxyFactory对象
            proxyFactory.copyFrom(this);
            // 处理 proxyTargetClass 属性
            // 如果希望使用 CGLIB 来代理接口,可以配置
            // proxy-target-class="true",这样不管有没有接口,都使用 CGLIB 来生成代理:
            // <aop:config proxy-target-class="true"></aop:config>
            if (!proxyFactory.isProxyTargetClass()) {
                // 检查preserveTargetClass属性,判断beanClass是应该基于类代理还是基于接口代理
                if (shouldProxyTargetClass(beanClass, beanName)) {
                    // 如果是基于类代理,则将proxyTargetClass赋值为true
                    proxyFactory.setProxyTargetClass(true);
                }
                else {
                    // 评估bean的代理接口
                    // 1. 有接口的,调用一次或多次:proxyFactory.addInterface(ifc);
                    // 2. 没有接口的,调用:proxyFactory.setProxyTargetClass(true);
                    evaluateProxyInterfaces(beanClass, proxyFactory);
                }
            }
    
            // 这个方法主要来对前面传递进来的横切逻辑实例进行包装
            // 注意:如果 specificInterceptors 中有 Advice 和 Interceptor,它们也会被包装成 Advisor
            // 方法会整理合并得到最终的advisors (毕竟interceptorNames还指定了一些拦截器的)
            // 至于调用的先后顺序,通过方法里的applyCommonInterceptorsFirst参数可以进行设置,
            // 若applyCommonInterceptorsFirst为true,interceptorNames属性指定的Advisor优先调用。默认为true
            Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
            // 将advisors添加到proxyFactory
            proxyFactory.addAdvisors(advisors);
            // 设置要代理的类,将targetSource赋值给proxyFactory的targetSource属性,之后可以通过该属性拿到被代理的bean的实例
            proxyFactory.setTargetSource(targetSource);
            // 这个方法是交给子类的,子类可以继续去定制此proxyFactory
            customizeProxyFactory(proxyFactory);
    
            // 用来控制proxyFactory被配置之后,是否还允许修改通知。默认值为false(即在代理被配置之后,不允许修改代理类的配置)
            proxyFactory.setFrozen(this.freezeProxy);
            // 设置preFiltered的属性值,默认是false。子类:AbstractAdvisorAutoProxyCreator修改为true
            // preFiltered字段意思为:是否已为特定目标类筛选Advisor
            // 这个字段和DefaultAdvisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice获取所有的Advisor有关
            //CglibAopProxy和JdkDynamicAopProxy都会调用此方法,然后递归执行所有的Advisor
            if (advisorsPreFiltered()) {
                proxyFactory.setPreFiltered(true);
            }
    
            // 使用proxyFactory获取代理
            return proxyFactory.getProxy(getProxyClassLoader());
        }
    }
    

    五、看到这里,大家可能对createProxy 有些疑问,我们还没有定义Advisor呀,Advisor是哪来的?

    我们再回到第四步看一下,之前4.1、4.2、4.3、4.4看的全都是AutoProxyRegistrar源码,下边分析下ProxyTransactionManagementConfiguration是干嘛的。

    第一个方法就是定义事务增强器的,返回的BeanFactoryTransactionAttributeSourceAdvisor对象其实是间接实现了Advisor接口的,大家可以看下类图。

    最终,加载了一个事务的Advisor,这个里边配置了Advice:TransactionInterceptor,真正方法执行时会拦截到。

    注意:BeanFactoryTransactionAttributeSourceAdvisor实现的是PointcutAdvisor接口,既可以设置Advice,又可以设置Pointcut。
    但是下面代码中只配置了Adice,而pointcut是由BeanFactoryTransactionAttributeSourceAdvisor类完成的配置。

    @Configuration
    public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
        public ProxyTransactionManagementConfiguration() {
        }
    
    
        /**
         * 定义事务增强器 注册了Advisor
         */
        @Bean(
            name = {"org.springframework.transaction.config.internalTransactionAdvisor"}
        )
        //声明的role为基础类(Spring内部使用的类)
        //int ROLE_INFRASTRUCTURE = 2;
        @Role(2)
        public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
            BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
            advisor.setTransactionAttributeSource(this.transactionAttributeSource());
            //配置Advice 
            // 这里的定义了事务拦截器
            advisor.setAdvice(this.transactionInterceptor());
            if (this.enableTx != null) {
                advisor.setOrder((Integer)this.enableTx.getNumber("order"));
            }
    
            return advisor;
        }
    
    
        /**
         * 定义基于注解的事务属性资源
         */
        @Bean
        @Role(2)
        public TransactionAttributeSource transactionAttributeSource() {
            return new AnnotationTransactionAttributeSource();
        }
    
    
    
        /**
         * 定义事务拦截器 注册了Advice
         */
        @Bean
        @Role(2)
        public TransactionInterceptor transactionInterceptor() {
            TransactionInterceptor interceptor = new TransactionInterceptor();
            interceptor.setTransactionAttributeSource(this.transactionAttributeSource());
            if (this.txManager != null) {
                interceptor.setTransactionManager(this.txManager);
            }
    
            return interceptor;
        }
    }
    
    public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
    
        @Nullable
        private TransactionAttributeSource transactionAttributeSource;
    
        //pointcut 是TransactionAttributeSourcePointcut 的子类,实现了TransactionAttributeSource 方法。
        //该方法的作用,是用户可以配置`解析器`。
        private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
            @Override
            @Nullable
            protected TransactionAttributeSource getTransactionAttributeSource() {
                return transactionAttributeSource;
            }
        };
    
        public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
            this.transactionAttributeSource = transactionAttributeSource;
        }
    
        @Override
        public Pointcut getPointcut() {
            return this.pointcut;
        }
    
    }
    

    六、Spring解析事务注解

    6.1 方法匹配

    如何判断bean方法符合规则?判断是否能获取事务属性对象。

    自动代理器会遍历bean的所有方法,判断是否与MethodMatcher匹配。实际上会调用TransactionAttributeSourcePointcut的matches方法。

    abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
    
        @Override
        public boolean matches(Method method, Class<?> targetClass) {
            if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
                    PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
                    PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
                return false;
            }
            //若TransactionAttributeSource 对象不为空,那么获取方法上的事务属性对象。
            TransactionAttributeSource tas = getTransactionAttributeSource();
            return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
        }
    
    
        /**
         * 抽象方法,但是注册的Advisor中的pointcut是TransactionAttributeSourcePointcut 子类。
         * 实际上获取的是TransactionAttributeSource 对象
         */
        @Nullable
        protected abstract TransactionAttributeSource getTransactionAttributeSource();
    
    }
    

    6.2 事务属性获取

    解析事务注解,获取事务属性对象。

    实际上TransactionAttributeSource为3.1配置的AnnotationTransactionAttributeSource对象。
    当然这个类实现的只是特有的方法,而算法骨干由其父类AbstractFallbackTransactionAttributeSource实现。

    注意getTransactionAttribute方法返回null,则证明pointcut不匹配bean的某方法。

    public abstract class AbstractFallbackTransactionAttributeSource implements TransactionAttributeSource {
    
        @Override
        @Nullable
        public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
            //若是Object的方法,直接返回null
            if (method.getDeclaringClass() == Object.class) {
                return null;
            }
    
            // 首先看缓存是否存在
            Object cacheKey = getCacheKey(method, targetClass);
            TransactionAttribute cached = this.attributeCache.get(cacheKey);
            if (cached != null) {
                // Value will either be canonical value indicating there is no transaction attribute,
                // or an actual transaction attribute.
                if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                    return null;
                }
                else {
                    return cached;
                }
            }
            else {
                //获取TransactionAttribute 对象的核心方法(如下所示)
                TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
                //填充缓存
                if (txAttr == null) {
                    this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
                }
                else {
                    String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                    if (txAttr instanceof DefaultTransactionAttribute) {
                        ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                    }
                    this.attributeCache.put(cacheKey, txAttr);
                }
                return txAttr;
            }
        }
        
        @Nullable
        protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
            // 该方法是否要求必须是public级别(因为自动代理会对所有方法进行代匹配)
            if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
                return null;
            }
    
            //获取明确类的方法(处理桥接方法)
            Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
    
            //由子类实现(获取方法上的事务配置)
            TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
            if (txAttr != null) {
                return txAttr;
            }
    
            //由子类实现(获取类上的事务配置)
            txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
            if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                return txAttr;
            }
    
            //获取原始方法上的事务配置
            if (specificMethod != method) {
                // Fallback is to look at the original method.
                txAttr = findTransactionAttribute(method);
                if (txAttr != null) {
                    return txAttr;
                }
                // Last fallback is the class of the original method.
                txAttr = findTransactionAttribute(method.getDeclaringClass());
                if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                    return txAttr;
                }
            }
    
            return null;
        }
    }   
    

    下图的两个抽象方法,由子类实现。


    还需要注意的一个细节,AnnotationTransactionAttributeSource是其唯一子类。


    public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
            implements Serializable {
    
    
        private final Set<TransactionAnnotationParser> annotationParsers;
    
    
        public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
            this.publicMethodsOnly = publicMethodsOnly;
            if (jta12Present || ejb3Present) {
                this.annotationParsers = new LinkedHashSet<>(4);
                this.annotationParsers.add(new SpringTransactionAnnotationParser());
                if (jta12Present) {
                    this.annotationParsers.add(new JtaTransactionAnnotationParser());
                }
                if (ejb3Present) {
                    this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
                }
            }
            else {
                //通用模式下,为SpringTransactionAnnotationParser解析器
                this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
            }
        }
    
    
        @Override
        @Nullable
        protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
            return determineTransactionAttribute(clazz);
        }
    
        @Override
        @Nullable
        protected TransactionAttribute findTransactionAttribute(Method method) {
            return determineTransactionAttribute(method);
        }
    
        //遍历解析器,获取事务注解
        @Nullable
        protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
            for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
                TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
                if (attr != null) {
                    return attr;
                }
            }
            return null;
        }
    }
    

    parseTransactionAnnotation()方法,遍历方法/类上的事务注解。获取到TransactionAttribute对象。

    public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
    
        @Override
        @Nullable
        public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
            AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
                    element, Transactional.class, false, false);
            if (attributes != null) {
                return parseTransactionAnnotation(attributes);
            }
            else {
                return null;
            }
        }
    }
    

    若MethodMatcher返回true,则证明该方法可以被事务增强器去增强。将Advisor加入到List中,并开始处理下一个Advisor。最终获取到所有可切入的Advisor。去创建代理对象。

    总结

    • 1、SpringBoot通过spring.factories文件引入TransactionAutoConfiguration类
    • 2、TransactionAutoConfiguration默认设置CglibAutoProxyConfiguration 做代理,开启了@EnableTransactionManagement注解
    • 3、@EnableTransactionManagement注解引入了TransactionManagementConfigurationSelector类
    • 4、TransactionManagementConfigurationSelector加载了:
      • AutoProxyRegistrar:用于注册自动代理生成器(InfrastructureAdvisorAutoProxyCreator)
      • ProxyTransactionManagementConfiguration:配置引入了事务的增强器
    • 5、在Spring初始化的时候,InfrastructureAdvisorAutoProxyCreator在bean的实例化前和初始化后,执行生成事务增强代理的逻辑,生成事务增强器
    • 6、配置了Advice:TransactionInterceptor,真正方法执行时会拦截到

    第二部分:Spring 事务执行


    上面讲到 TransactionManagementConfigurationSelector加载了ProxyTransactionManagementConfiguration,里边注册了一个bean:TransactionInterceptor,实现了MethodInterceptor(Spring方法拦截器),事务就是在这个类中进行处理的,首先看一下类图


    因为实现了MethodInterceptor,看下invoke()方法,方法拦截的核心代码。

    一、invoke()

    调用了父类TransactionAspectSupport的invokeWithinTransaction()方法

    invokeWithinTransaction方法的大致流程是这样的:

    • 获取要被代理的方法绑定的事务属性TransactionAttribute,属性不为空或者是非回调偏向的事务管理器,开始进行声明式事务;
    • determineTransactionManager方法获取PlatformTransactionManager;
    • createTransactionIfNecessary方法创建TransactionInfo(开启事务);
    • 执行 retVal = invocation.proceedWithInvocation();
    • catch抛出异常是判断事务是否需要回滚
    • finally 清除当前事务信息
    • 最终提交事务
    public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    
        @Override
        @Nullable
        public Object invoke(MethodInvocation invocation) throws Throwable {
            // Work out the target class: may be {@code null}.
            // The TransactionAttributeSource should be passed the target class
            // as well as the method, which may be from an interface.
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
            // Adapt to TransactionAspectSupport's invokeWithinTransaction...
            // 调用TransactionAspectSupport的 invokeWithinTransaction方法
            return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
        }
    }
    
    public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    
        @Nullable
        protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                final InvocationCallback invocation) throws Throwable {
    
            // If the transaction attribute is null, the method is non-transactional.
            TransactionAttributeSource tas = getTransactionAttributeSource();
            // 如果transaction attribute为空,该方法就是非事务(非编程式事务)
            final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
            final PlatformTransactionManager tm = determineTransactionManager(txAttr);
            final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
            // 标准声明式事务:如果事务属性为空 或者 非回调偏向的事务管理器
            if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
                // Standard transaction demarcation with getTransaction and commit/rollback calls.
                TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    
                Object retVal;
                try {
                    // 这里就是一个环绕增强,在这个proceed前后可以自己定义增强实现
                    // 实现类 ReflectiveMethodInvocation
                    retVal = invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    // 根据事务定义的,该异常需要回滚就回滚,否则提交事务
                    completeTransactionAfterThrowing(txInfo, ex);
                    throw ex;
                }
                finally {
                    //清空当前事务信息,重置为老的
                    cleanupTransactionInfo(txInfo);
                }
                //返回结果之前提交事务
                commitTransactionAfterReturning(txInfo);
                return retVal;
            }
    
            // 从此往下是编程式事务,暂时不做了解
            else {
                final ThrowableHolder throwableHolder = new ThrowableHolder();
    
                // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
                try {
                    Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                        TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                        try {
                            return invocation.proceedWithInvocation();
                        }
                        catch (Throwable ex) {
                            if (txAttr.rollbackOn(ex)) {
                                // A RuntimeException: will lead to a rollback.
                                if (ex instanceof RuntimeException) {
                                    throw (RuntimeException) ex;
                                }
                                else {
                                    throw new ThrowableHolderException(ex);
                                }
                            }
                            else {
                                // A normal return value: will lead to a commit.
                                throwableHolder.throwable = ex;
                                return null;
                            }
                        }
                        finally {
                            cleanupTransactionInfo(txInfo);
                        }
                    });
    
                    // Check result state: It might indicate a Throwable to rethrow.
                    if (throwableHolder.throwable != null) {
                        throw throwableHolder.throwable;
                    }
                    return result;
                }
                catch (ThrowableHolderException ex) {
                    throw ex.getCause();
                }
                catch (TransactionSystemException ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                        ex2.initApplicationException(throwableHolder.throwable);
                    }
                    throw ex2;
                }
                catch (Throwable ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    }
                    throw ex2;
                }
            }
        }
    }
    

    总结

    到此,Spring事务的流程就完了,上边提到的几个方法感兴趣的可以去翻一下源码

    第三部分:PlatformTransactionManager


    四、PlatformTransactionManager

    jdbc主要用到的管理器是DataSourceTransactionManager,实现了PlatformTransactionManager,分析一下三个方法。


    public interface PlatformTransactionManager {
    
        // 获取事务状态
        TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException;
    
        // 提交
        void commit(TransactionStatus status) throws TransactionException;
    
        // 回滚
        void rollback(TransactionStatus status) throws TransactionException;
    
    }
    

    五、getTransaction获取事务

    Spring事务的传播行为实现就在这里,上图。


    public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    
        @Override
        public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
            Object transaction = doGetTransaction();
    
            boolean debugEnabled = logger.isDebugEnabled();
    
            if (definition == null) {
                // 事务属性为空,使用默认的属性
                definition = new DefaultTransactionDefinition();
            }
    
            // 如果当前已经存在事务
            if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                // 根据不同传播机制不同处理
                return handleExistingTransaction(definition, transaction, debugEnabled);
            }
    
            // 超时不能小于默认值
            // Check definition settings for new transaction.
            if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
            }
    
            // No existing transaction found -> check propagation behavior to find out how to proceed.
            // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            // 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
            else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                    logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
                }
                try {
                    // 是否需要开启同步
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    // 开启新事务
                    doBegin(transaction, definition);
                    //准备同步
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException | Error ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
            }
            else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,
                // 创建“空”事务:没有实际事务,但可能是同步。
                // 警告:定义了隔离级别,但并没有真实的事务初始化,
                // 隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                    logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                            "isolation level will effectively be ignored: " + definition);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
            }
        }
    }
    

    这里分成了两部分:

    • 当前已存在事务:isExistingTransaction()判断是否存在事务,存在事务handleExistingTransaction()根据不同传播机制不同处理
    • 当前不存在事务:不同传播机制不同处理

    handleExistingTransaction()源码如下:

    public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    
        /**
         * Create a TransactionStatus for an existing transaction.
         */
        private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
    
            // 1.NERVER(不支持当前事务;如果当前事务存在,抛出异常)报错
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                        "Existing transaction found for transaction marked with propagation 'never'");
            }
    
            // 2.NOT_SUPPORTED(不支持当前事务,现有同步将被挂起)挂起当前事务
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction");
                }
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(
                        definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            }
    
            // 3.REQUIRES_NEW挂起当前事务,创建新事务
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction, creating new transaction with name [" +
                            definition.getName() + "]");
                }
                // 挂起当前事务
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException | Error beginEx) {
                    resumeAfterBeginException(transaction, suspendedResources, beginEx);
                    throw beginEx;
                }
            }
    
            // 4.NESTED嵌套事务
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                if (!isNestedTransactionAllowed()) {
                    throw new NestedTransactionNotSupportedException(
                            "Transaction manager does not allow nested transactions by default - " +
                            "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                
                // 是否支持保存点:非JTA事务走这个分支。
                // AbstractPlatformTransactionManager默认是true,
                // JtaTransactionManager复写了该方法false,
                // DataSourceTransactionmanager没有复写,
                if (useSavepointForNestedTransaction()) {
                    // Create savepoint within existing Spring-managed transaction,
                    // through the SavepointManager API implemented by TransactionStatus.
                    // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                    DefaultTransactionStatus status =
                            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                    // 创建保存点
                    status.createAndHoldSavepoint();
                    return status;
                }
                else {
                    // Nested transaction through nested begin and commit/rollback calls.
                    // Usually only for JTA: Spring synchronization might get activated here
                    // in case of a pre-existing JTA transaction.
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, null);
                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
            }
    
            // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
            if (debugEnabled) {
                logger.debug("Participating in existing transaction");
            }
            if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                (currentIsolationLevel != null ?
                                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                        "(unknown)"));
                    }
                }
                if (!definition.isReadOnly()) {
                    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] is not marked as read-only but existing transaction is");
                    }
                }
            }
            // 到这里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,
            // 存在事务加入事务即可,prepareTransactionStatus第三个参数就是是否需要新事务。false代表不需要新事务
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
        }
    }
    

    这里有两个核心方法

    • suspend():

      • 把当前事务的connectionHolder数据库连接持有者清空;
      • 当前线程解绑datasource.其实就是ThreadLocal移除对应变量(TransactionSynchronizationManager类中定义的private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");)
    • doBegin()

    参考:
    https://blog.csdn.net/StringBuff/article/details/111812282

    https://blog.csdn.net/StringBuff/article/details/112094055

    https://www.cnblogs.com/chenxingyang/p/15559352.html

    相关文章

      网友评论

          本文标题:Spring事务——事务原理源码分析

          本文链接:https://www.haomeiwen.com/subject/daonkdtx.html