美文网首页
Spring的Aop与事务源码分析

Spring的Aop与事务源码分析

作者: 知止9528 | 来源:发表于2019-01-20 13:08 被阅读11次

    Aop原理

    1.分析入口@EnableAspectJAutoProxy

    1.@EnableAspectJAutoProxy
           @Import(AspectJAutoProxyRegistrar.class):给容器中导入AspectJAutoProxyRegistrar
                利用AspectJAutoProxyRegistrar自定义给容器中注册bean;
                internalAutoProxyCreator=AnnotationAwareAspectJAutoProxyCreator
          给容器中注册一个AnnotationAwareAspectJAutoProxyCreator
    

    2.AnnotationAwareAspectJAutoProxyCreator的作用

    2.AnnotationAwareAspectJAutoProxyCreator的作用
        ->AspectJAwareAdvisorAutoProxyCreator
         ->AbstractAdvisorAutoProxyCreator
          ->AbstractAutoProxyCreator
           ->implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware
           关注后置处理器(可以在bean初始化前后进行操作),自动装配BeanFactory做了什么事情
    

    3.AbstractAutoProxyCreator.setBeanFactory()

    3.AbstractAutoProxyCreator.setBeanFactory()
    AbstractAutoProxyCreator.后置处理器逻辑
    
    AbstractAdvisorAutoProxyCreator.setBeanFactory()  -》initBeanFactory
    AbstractAdvisorAutoProxyCreator.initBeanFactory()
    
    AnnotationAwareAspectJAutoProxyCreator.initBeanFactory()
    
    在这几个地方打上断点
    

    4.启动容器

    4.启动容器
       基本流程
       (1)传入配置类(配置类中传入了internalAutoProxyCreator)
       (2)注册配置类,调用refresh(),刷新容器;
       (3)registerBeanPostProcessors(beanFactory);注册bean的后置处理器来方便拦截bean的创建;
            1)先获取ioc容器已经定义了的需要创建的BeanPostProcessor组件
            2)给容器中加入别的BeanPostProcessor
            3)优先注册实现了PriorityOrdered接口的BeanPostProcessor;
            4)再给容器中注册实现了Ordered接口的BeanPostProcessor;
            5)注册没实现优先级接口的BeanPostProcessor;
            6)注册BeanPostProcessor,实际上就是创建BeanPostProcessor对象,保存在容器中;
                创建internalAutoProxyCreator的BeanPostProcessor【AnnotationAwareAspectJAutoProxyCreator】
                   1)创建Bean的实例;
                   2)populateBean(beanName, mbd, instanceWrapper);
                   3)initializeBean:初始化bean
                          1)invokeAwareMethods():处理Aware接口,让bean对象能感知到容器的存在,从容器中获取一些东西
                          2)applyBeanPostProcessorsBeforeInitialization():应用后置处理器的beanPostProcessorsBeforeInitialization()
                          3)invokeInitMethods():执行自定义的初始化方法
                          4)applyBeanPostProcessorsAfterInitialization():执行后置处理器的beanPostProcessorsAfterInitialization()
                   4)BeanPostProcessor(AnnotationAwareAspectJAutoProxyCreator)创建成功:aspectJAutoProxyBuilder
                         
    

    以上为创建和注册AnnotationAwareAspectJAutoProxyCreator的过程

    5.执行时机

    5.执行时机
    AnnotationAwareAspectJAutoProxyCreator -》InstantiationAwareBeanPostProcessor
    finishBeanFactoryInitialization(beanFactory);完成单实例bean的创建
       1)遍历获取容器中所有的Bean,依次创建对象getBean(beanName);
           getBean ->doGetBean() ->getSigleton()->
       2)创建bean
           1)先从缓存中获取当前Bean,如果获取到,直接使用;
           只要创建好的bean会被缓存起来
           2)createBean();创建bean
              1)resolveBeforeInstantiation(beanName, mbdToUse);解析BeforeInstantiation,希望后置处理器在此能够返回一个代理对象;如果能返回代理对象就使用,如果不能就继续
                      后置处理器先尝试返回对象;
                      bean = applyBeanPostProcessorsBeforeInstantiation();
                      拿到后置处理器,如果bean是InstantiationAwareBeanPostProcessor则会调用postProcessBeforeInstantiation()方法;
    
                      如果返回bean不为空,则继续调用applyBeanPostProcessorsAfterInitialization方法
                      if (bean != null) {
                            bean = applyBeanPostProcessorsAfterInitialization(bean, beanName);
                      }
              2)doCreateBean(beanName, mbdToUse, args);
                   1)创建Bean的实例;
                   2)populateBean(beanName, mbd, instanceWrapper);
                   3)initializeBean:初始化bean
                          1)invokeAwareMethods():处理Aware接口,让bean对象能感知到容器的存在,从容器中获取一些东西
                          2)applyBeanPostProcessorsBeforeInitialization():应用后置处理器的beanPostProcessorsBeforeInitialization()
                          3)invokeInitMethods():执行自定义的初始化方法
                          4)applyBeanPostProcessorsAfterInitialization():执行后置处理器的beanPostProcessorsAfterInitialization()
             
    

    6.创建代理

    6.创建代理
        根据上面的分析,会调用到
    1)AbstractAutoProxyCreator.postProcessBeforeInstantiation()
        1)判断当前bean是否在advisedBeans中(保存了所有需要增强的bean)
        2)判断当前bean是否是基础类型的Advice,Pointcut,Advisor,AopInfrastructureBean或者是否是切面(@Aspect)
        3)是否需要跳过
            1)获取候选增强器(切面里面的通知方法)【List<Advisor> candidateAdvisors】
                 每一个封装的通知方法的增强器是InstantianModelAwarePointAdvisor;
                 判断每一个增强器是否是AspectJPointcutAdvisor类型的;返回true
            2)永远返回false
    
    2)创建对象
    AbstractAutoProxyCreator.postProcessorsAfterInitialization();
         return wrapIfNecessary(bean, beanName, cacheKey);包装如果有需要的情况下
         1)获取当前bean的所有增强方法(即我们的通知方法,before,after等等) Obejct[] specifcInterceptors
             1)找到候选的所有增强器(找哪些通知方法是需要切入当前bean方法的)
             2)获取到能在bean使用的增强器.
             3)给增强器排序
         2)保存当前bean在advisedBeans中;
         3)如果当前bean需要增强,则创建当前bean代理对象
             1)获取所有增强器(通知方法)
             2)保存到proxyFactory
             3)创建代理对象
                JdkDynamicAopProxy(AdvisedSupport config);jdk动态代理
                CglibAopProxy(AdvisedSupport config);cglib动态代理
         4)返回代理对象
         5)以后容器中获取到的就是这个组件的代理对象,执行目标方法的时候,代理对象就会执行通知方法的流程.
    
    

    AbstractAutoProxyCreator.postProcessorsAfterInitialization();代码如下

    
    public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
            Object cacheKey = getCacheKey(beanClass, beanName);
    
            if (!StringUtils.hasLength(beanName) || !this.targetSourcedBeans.contains(beanName)) {
                if (this.advisedBeans.containsKey(cacheKey)) {
                    return null;
                }
                if (isInfrastructureClass(beanClass) || shouldSkip(beanClass, beanName)) {
                    this.advisedBeans.put(cacheKey, Boolean.FALSE);
                    return null;
                }
            }
    
            // Create proxy here if we have a custom TargetSource.
            // Suppresses unnecessary default instantiation of the target bean:
            // The TargetSource will handle target instances in a custom fashion.
            TargetSource targetSource = getCustomTargetSource(beanClass, beanName);
            if (targetSource != null) {
                if (StringUtils.hasLength(beanName)) {
                    this.targetSourcedBeans.add(beanName);
                }
                Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(beanClass, beanName, targetSource);
                Object proxy = createProxy(beanClass, beanName, specificInterceptors, targetSource);
                this.proxyTypes.put(cacheKey, proxy.getClass());
                return proxy;
            }
    
            return null;
        }
    

    postProcessorsAfterInitialization()代码如下

    public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) throws BeansException {
            if (bean != null) {
                Object cacheKey = getCacheKey(bean.getClass(), beanName);
                if (!this.earlyProxyReferences.contains(cacheKey)) {
                    return wrapIfNecessary(bean, beanName, cacheKey);
                }
            }
            return bean;
        }
    

    AbstractAutoProxyCreator#wrapIfNecessary源码如下

    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
                return bean;
            }
            if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
                return bean;
            }
            if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
                this.advisedBeans.put(cacheKey, Boolean.FALSE);
                return bean;
            }
    
            // Create proxy if we have advice.
            Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
            if (specificInterceptors != DO_NOT_PROXY) {
                this.advisedBeans.put(cacheKey, Boolean.TRUE);
                Object proxy = createProxy(
                        bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
                this.proxyTypes.put(cacheKey, proxy.getClass());
                return proxy;
            }
    
            this.advisedBeans.put(cacheKey, Boolean.FALSE);
            return bean;
        }
    

    调用了createProxy()
    最终会调用到
    DefaultAopProxyFactory.createAopProxy(AdvisedSupport config)

    public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
            if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
                Class<?> targetClass = config.getTargetClass();
                if (targetClass == null) {
                    throw new AopConfigException("TargetSource cannot determine target class: " +
                            "Either an interface or a target is required for proxy creation.");
                }
                if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
                    return new JdkDynamicAopProxy(config);
                }
                return new ObjenesisCglibAopProxy(config);
            }
            else {
                return new JdkDynamicAopProxy(config);
            }
        }
    

    即如果实现了接口则用jdk动态代理,当然也可以通过配置参数Optimize=true 或ProxyTargetClass="cglib"来使用cglib动态代理


    7.获取拦截器链MethodInterceptor

    目标方法执行;
        容器中保存了组件的代理对象(cglib增强后的对象),这个对象里保存了详细信息(比如增强器,目标对象,xxx);
          1)CglibAopProxy.intercept();拦截目标方法的执行;
          2)根据ProxyFactory对象获取将要执行的目标方法拦截器链;
              List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
              1)List<Object> interceptorList 保存所有拦截器5
                    一个默认的ExposeInvocationInterceptor和4个增强器
              2)遍历所有的增强器,将其转为Interceptor;
                  registry.getInterceptors(advisor);
              3)将增强器转为List<MethodInterceptor>;
                 如果是MethodInterceptor,直接加入到集合中
                 如果不是,使用AdvisorAdaptor将增强器转为MethodInterceptor;   
                 转换完成返回MethodInterceptor数组;
    
          3)如果没有拦截器链,直接执行目标方法;
               拦截器链(每一个方法又被包装为方法拦截器,利用MethodInterceptor机制)
          4)如果有拦截器链,把需要执行的目标对象,目标方法,拦截器等信息传入
            创建一个CglibMethodInvocation对象,并调用Object retVal =mi.proceed();
          5)拦截器链的触发过程
             1)
    

    8.拦截器链的触发过程

    mi.proceed()的执行
    
    Aop执行过程.png

    org.springframework.aop.framework.ReflectiveMethodInvocation#proceed

    public Object proceed() throws Throwable {
            //  We start with an index of -1 and increment early.
                   满足长度相等的条件,就真正去执行目标方法了
            if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
                return invokeJoinpoint();
            }
    

    事务原理

    1.@EnableTransactionManagement
           利用TransactionManagementConfigurationSelector给容器中导入了两个组件
           AutoProxyRegistrar
           ProxyTransactionManagementConfiguration
    

    2.AutoProxyRegistrar
        给容器中注册一个InfrastructureAdvisorAutoProxyCreator组件;
        InfrastructureAdvisorAutoProxyCreator的作用
        就是利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行处理
    

    3.ProxyTransactionManagementConfiguration
         1.给容器中注册事务增强器
           (1)事务增强器需要的事务注解信息,AnnotationTransactionAttributeSource解析事务
           (2)事务拦截器
               TransactionInterceptor;保存了事务属性信息,事务管理器;
               同时它是一个MethodInterceptor;
                   在目标方法执行的时候;
                       执行拦截器链;
                       事务拦截器:
                           ①先获取事务相关属性
                           ②再获取PlatformTranactionManager,如果没有先添加指定任何PlatformTranactionManager,最终会从容器中按照类型获取一个PlatformTranactionManager
                           ③执行目标方法
                               如果异常,获取到事务管理器,利用事务管理器回滚操作;
                               如果正常,利用事务提交,进行提交
    

    整个调用链条如下

    org.springframework.transaction.interceptor.TransactionInterceptor.invoke
      ->org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction  
       ->org.springframework.transaction.interceptor.TransactionAspectSupport.getTransactionAttributeSource  获取事务相关属性
       ->org.springframework.transaction.interceptor.TransactionAspectSupport.determineTransactionManager  再获取PlatformTranactionManager
       ->org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary   
        ->org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction  事务的传播属性就是在这个方法里面判断的 
         ->org.springframework.jca.cci.connection.CciLocalTransactionManager.doBegin   获取底层数据库连接
       ->org.springframework.transaction.interceptor.TransactionAspectSupport.InvocationCallback.proceedWithInvocation  执行目标方法
       ->org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing 异常则回滚事务
       ->org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning  成功则提交事务 
    

    事务的主要组件

    PlatformTransactionManager
    PlatformTransactionManager 这个接口中定义了 Spring 执行事务的主方法:

    public interface PlatformTransactionManager {
        // 开始事务
        TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
        // 提交事务
        void commit(TransactionStatus status) throws TransactionException;
        // 回滚事务
        void rollback(TransactionStatus status) throws TransactionException;
    }
    

    具体的抽象类为
    AbstractPlatformTransactionManager 中进行了相应的实现(PS: 其实这里主要运用了 策略模式 + 模版模式),
    其主要属性有

    // 永远激活 事务同步器
    public static final int SYNCHRONIZATION_ALWAYS = 0;
    
    // 只有在有事务时才激活事务同步器
    public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;
    
    // Never active transaction synchronization, not even for actual transactions.
    // 从不激活事务同步器
    public static final int SYNCHRONIZATION_NEVER = 2;
    
    /** Constants instance for AbstractPlatformTransactionManager */
    // 可通过 Constants 设置AbstractPlatformTransactionManager中的属性 --> 类似于 BeanWrapper
    private static final Constants constants = new Constants(AbstractPlatformTransactionManager.class);
    
    // 是否 开启事务同步器支持
    private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;
    
    // 事务默认的超时时间
    private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;
    
    // 嵌套式事务是否允许
    private boolean nestedTransactionAllowed = false;
    
    // 是否校验 是否存在事务
    private boolean validateExistingTransaction = false;
    
    // 分布式事务中的 rollback 属性
    private boolean globalRollbackOnParticipationFailure = true;
    
    // 分布式事务中的 rollback 属性
    private boolean failEarlyOnGlobalRollbackOnly = false;
    
    // 在 commit 过程中若出现 异常是否会 rollback
    private boolean rollbackOnCommitFailure = false;
    

    AbstractPlatformTransactionManager 开启事务方法
    其主逻辑如下:

    1. 获取事务连接器, 比如 DataSourceTransactionManager 中就是 DataSourceTransactionObject 对象(存放的是 connect, savePoint, 是否是新事务)
    2. 若不存在 TransactionDefinition, 则创建一个默认的 TransactionDefinition
    3. 判断当前是否存在事务中
        3.1 如果当前已经存在事务, 且当前事务的传播属性设置为 PROPAGATION_NEVER, 那么抛出异常
        3.2 如果当前事务的配置属性是 PROPAGATION_NOT_SUPPORTED, 同时当前线程已经存在事务了, 那么将事务挂起, 并且封装 TransactionStatus
        3.3 如果当前事务的配置属性是 PROPAGATION_REQUIRES_NEW, 创建新事务, 同时将当前线程存在的事务挂起, 与创建全新事务的过程类是, 区别在于在创建全新事务时不用考虑已有事务的挂起, 但在这里, 需要考虑已有事务的挂起
        3.4  嵌套事务的处理, 创建 TransactionStatus, 创建保存点
        3.5 对于那些没有匹配的传播级别, 默认的封装以下 TransactionStatus
    4. 检查事务属性中 timeout 的设置是否合理  <-- 这里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有对应的检查工作
    5. 如果当前线程不存在事务, 但是 propagationBehavior 被设置为 PROPAGATION_MANDATORY 抛弃异常
    6. PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事务
    7.  创建 TransactionStatus, 开始事务, 准备 TransactionSynchronous
    

    TransactionDefinition
    TransactionDefinition 是一个接口, 其主要包含了 Spring 中对事务传播类别的定义, 隔离级别的定义, 代码如下:

    public interface TransactionDefinition {
    
        // 若当前线程不存在事务中, 则开启一个事务, 若当前存在事务, 则加入其中
        int PROPAGATION_REQUIRED = 0;
    
        // 若当前存在事务, 则加入到事务中, 若不存在, 则以非事务的方式运行
        int PROPAGATION_SUPPORTS = 1;
    
        // 若有事务, 则用当前的事务, 若没有, 则直接抛出异常
        int PROPAGATION_MANDATORY = 2;
    
        // 若当前存在事务, 则挂起事务, 若当前不存在事务, 则开启一个新事务运行
        int PROPAGATION_REQUIRES_NEW = 3;
    
        // 不支持以事务的方式运行, 若当前存在事务, 则将当前的事务挂起
        int PROPAGATION_NOT_SUPPORTED = 4;
    
        // 不支持事务, 若当前线程含有事务, 则直接抛出异常
        int PROPAGATION_NEVER = 5;
    
        // 这个时在原来的事务中通过 savepoint 的方式 开启一个局部事务
        int PROPAGATION_NESTED = 6;
    
        // 默认隔离级别
        int ISOLATION_DEFAULT = -1;
    
        // read_uncommitted 级别
        int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
    
        // READ_COMMITTED 级别
        int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
    
        // REPEATABLE_READ 级别
        int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
    
        // SERIALIZABLE 级别
        int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;
    
        // 默认超时时间
        int TIMEOUT_DEFAULT = -1;
    
        // 获取传播行为
        int getPropagationBehavior();
    
        //  获取隔离级别
        int getIsolationLevel();
    
        // 获取超时时间
        int getTimeout();
    
        // 事务是否是只读模式
        boolean isReadOnly();
    
        // 返回事务的名字
        String getName();
    }
    

    其默认的实现是 DefaultTransactionDefinition, 在这个类中主要对一些值增加了默认值的赋值, 并增加一些对属性赋值的设定, 其中有个赋值工具类 Constants, 可以通过这个类将 字符串转为数字.


    TransactionStatus
    TransactionStatus 这个接口中定义了事务执行过程中的一些属性, 是否有savePoint, 是否 rollBackOnly, 事务是否已经完成; 而其抽象类中主要是完成 savePoint 以及 rollBackOnly 的具体实现; 其默认的实现类是 DefaultTransactionStatus, 在这个类中有:

    // 事务连接器, 比如 DataSourceTransactionManager 中的 DataSourceTransactionObject
    private final Object transaction;
    // 是否是新事务
    private final boolean newTransaction;
    // 是否开启 事务同步器 <- 其实就是在 TransactionSynchronousManager 中注册属性信息
    private final boolean newSynchronization;
    // 这个事务是否是 readOnly
    private final boolean readOnly;
    // debug模式
    private final boolean debug;
    //  suspend 的上个事务的信息, suspendedResources 可能是 null
    private final Object suspendedResources;
    

    TransactionAttributeSource
    TransactionAttributeSource 它是事务属性获取器(PS: 这里出现了 TransactionAttribute这个对象, 其实就是 TransactionDefinition 加上 一些其他属性), 主要的 TransactionAttributeSource 有如下:

    1. NameMatchTransactionAttributeSource:   通过将 Properties 里面的属性转化成 methodName <--> TransactionAttribute 的TransactionAttributeSource
    2. MethodMapTransactionAttributeSource:   通过配置文件配置 className.methodName <--> TransactionAttribute 形式注入的 MethodMapTransactionAttributeSource
    3. MatchAlwaysTransactionAttributeSource: 只要是用户定义的方法就返回 true 的 TransactionAttributeSource
    4. CompositeTransactionAttributeSource:   组合多个 TransactionAttributeSource, 只要其中有一个获取 TransactionAttribute, 就 OK
    5. AnnotationTransactionAttributeSource:  通过获取方法上的注解信息来获知 事务的属性, 解析主要由 SpringTransactionAnnotationParser 来进行
    
    在上面几个类中, AnnotationTransactionAttributeSource 是我们最常使用的 TransactionDefinition 的解析器, 它内部其实蛮简单的, 主要还是通过 SpringTransactionAnnotationParser 解析方法上注解 @Transactional 中的信息来获得事务属性
    
    

    TransactionSynchronizationManager
    在事务执行的过程中, 需要保存很多变量值, 包括一些回调函数
    每次在事务 suspend 或 resume 时, 其实操作的就是通过TransactionSynchronizationManager 将属性放在 ThreadLocal 中
    TransactionSynchronization

    private static final ThreadLocal<Map<Object, Object>> resources =                    // key 是 dataSource, value 是 ConnectionHolder, 这里的 Map 是为了解决, 同一个线程操作多个 DataSource 而准备de
            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
    
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = // 存储 TransactionSynchronization <- 这里面存储的都是回调函数
            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
    
    private static final ThreadLocal<String> currentTransactionName =                    // 当前事务的名称
            new NamedThreadLocal<String>("Current transaction name");
    
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =               // 当前事务是否是 readOnly
            new NamedThreadLocal<Boolean>("Current transaction read-only status");
    
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =         // 当前事务的隔离级别
            new NamedThreadLocal<Integer>("Current transaction isolation level");
    
    private static final ThreadLocal<Boolean> actualTransactionActive =                  // 当前线程是否已经在事务中
            new NamedThreadLocal<Boolean>("Actual transaction active");
    

    TransactionAspectSupport
    这是事务支持的一个工具类, 其也是 TransactionInterceptor 的父类, 在这个类中定义了执行事务的主逻辑 -> 方法 invokeWithinTransaction (PS: 其实就是aop 中的 aroundAdvice)


    TransactionInterceptor 类如下代码如下

    public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    .....
    }
    

    invoke()方法

    public Object invoke(final MethodInvocation invocation) throws Throwable {
            // Work out the target class: may be {@code null}.
            // The TransactionAttributeSource should be passed the target class
            // as well as the method, which may be from an interface.
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
            // Adapt to TransactionAspectSupport's invokeWithinTransaction...
            return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
        }
    

    invokeWithinTransaction()方法

    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {
        // If the transaction attribute is null, the method is non-transactional.
        // 这里读取事务的属性和设置, 通过 TransactionAttributeSource 对象取得
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        // 获取 beanFactory 中的 transactionManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        // 构造方法唯一标识(类, 方法, 如 service.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
        /**
         * 这里区分不同类型的 PlatformTransactionManager 因为它们的调用方式不同
         * 对 CallbackPreferringPlatformTransactionManager 来说, 需要回调函数来
         * 实现事务的创建和提交
         * 对于非 CallbackPreferringPlatformTransactionManager 来说, 不需要通过
         * 回调函数来实现事务的创建和提交
         * 像 DataSourceTransactionManager 就不是 CallbackPreferringPlatformTransactionManager
         * 不需要通过回调的方式来使用
         */
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            // 这里创建事务, 同时把创建事务过程中得到的信息放到 TransactionInfo 中去 (创建事务的起点)
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                // 这里的调用使用处理沿着拦截器链进行, 使最后目标对象的方法得到调用
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                // 如果在事务处理方法调用中出现异常, 事务处理如何进行需要根据具体的情况考虑回滚或者提交
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                // 这里把与线程绑定的 TransactionInfo 设置为 oldTransactionInfo
                cleanupTransactionInfo(txInfo);
            }
            // 这里通过事务处理器来对事务进行提交
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
    }
    

    getTransaction()

    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        // 这个 doGetTransaction() 抽象函数, transaction对象的获取 由具体的事务处理器实现, 比如 DataSourceTransactionManager
        // 这里是 DataSourceTransactionObject 对象(存放的是 connect, savePoint, 是否是新事务)
        Object transaction = doGetTransaction();
    
        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();
    
        // 关于这个 DefaultTransactionDefinition, 在前面编程式使用事务处理的时候遇到过, 这个 DefaultTransactionDefinition 的默认事务处理属性是
        // propagationBehavior = PROPAGATION_REQUIRED; isolationLevel = ISOLATION_DEFAULT; timeout=TIMEOUT_DEFAULT; readlyOnly = false
        if (definition == null) {                       // 如果参数 definition == null -> 则创建一个默认的 TransactionDefinition <- 这其实就是一个事务属性配置的对象
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
    
        // 判断当前被线程是否存在事务, 判断依据为当前线程记录的连接不为空且连接中(connectionHolder) 中的 tranactionActive 属性 = true
        // 如果已经存在事务, 那么需要要根据在事务属性中定义的事务传播属性配置来处理事务
        if (isExistingTransaction(transaction)) {
            // 这里对当前线程中已经由事务存在的情况进行处理, 所有的处理结果都封装在 TransactionStatus 中
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }
    
        // 检查事务属性中 timeout 的设置是否合理  <-- 这里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有对应的检查工作
        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }
    
        // 如果当前线程不存在事务, 但是 propagationBehavior 被设置为 PROPAGATION_MANDATORY 抛弃异常
        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    
            // 空挂起 <- 这里挂起 (PS: 有同学可能疑问, 上面明明判断了是否在事务中, 这里没有必要挂起, 但有种情况 txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive() == false)
            SuspendedResourcesHolder suspendedResources = suspend(null); // <- 这里挂起其实主要还是 TransactionSynchronizationManager 中配置的资源
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                // 是否开启一个新的 事务同步器
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                // 创建事务状态 DefaultTransactionStatus
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 构造 transaction, 包括设置 ConnectionHolder, 隔离级别, timeout, 如果是新连接, 绑定到当前线程
                doBegin(transaction, definition);
                // 新同步事务的设置, 针对当前线程的设置
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);  // resume 挂起来的 资源
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);  // resume 挂起来的 资源
                throw err;
            }
        }
        else {
            // 这里其实就是没有开启事务, 相比上面的 if 中, 就少了 doBegin 函数的调用
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);    // 关键是这里第二个参数是 nul
        }
    }
    

    handleExistingTransaction()处理已存在的事务

    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
    
        // 如果当前已经存在事务, 且当前事务的传播属性设置为 PROPAGATION_NEVER, 那么抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
    
        // 如果当前事务的配置属性是 PROPAGATION_NOT_SUPPORTED, 同时当前线程已经存在事务了, 那么将事务挂起
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            // 将事务的挂起 <- 其实就是 TransactionSynchronizationManager 中的属性
            Object suspendedResources = suspend(transaction);
            // 是否开启一个新的事务同步器
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    
            // 意味着事务方法不需要放在事务环境中执行, 同时挂起事务的信息保存在 TransactionStatus 中, 这里包括了, 进程 ThreadLocal 对事务信息的记录
            return prepareTransactionStatus( // 注意这里第二个参数是 null
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
    
        // 如果当前事务的配置属性是 PROPAGATION_REQUIRES_NEW, 创建新事务, 同时将当前线程存在的事务挂起, 与创建全新事务的过程类是, 区别在于在创建全新事务时不用考虑已有事务的挂起, 但在这里, 需要考虑已有事务的挂起
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            // 将事务的挂起 <- 其实就是 TransactionSynchronizationManager 中的属性
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                // 是否开启一个新的事务同步器
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                // 挂起事务的信息记录保存在 TransactionStatus 中,  这里包括ThreadLocal 对事务信息的记录
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 构造 transaction, 包括设置 ConnectionHolder, 隔离级别, timeout, 如果是新连接, 绑定到当前线程
                doBegin(transaction, definition);
                // 新同步事务的设置, 针对当前线程的设置
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException beginEx) { // 抛出异常的话, 直接恢复 刚才挂起的事务
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
    
        // 嵌套事务的处理, 创建 TransactionStatus, 创建保存点
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {                        // 检查是否允许嵌套事务
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // 如果没有可以使用保存点的方式控制事务回滚, 那么在嵌套式事务的建立初始建立保存点
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                // 在 Spring 管理的事务中, 创建事务保存点
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);   //  <-- 这里的两个 false 分别表示 是否是新事务, 新的事务同步器
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                // 有些情况是不能使用保存点操作, 比如 JTA, 那么就建立新事务
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                // 挂起事务的信息记录保存在 TransactionStatus 中,  这里包括ThreadLocal 对事务信息的记录
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                // 构造 transaction, 包括设置 ConnectionHolder, 隔离级别, timeout, 如果是新连接, 绑定到当前线程
                doBegin(transaction, definition);
                // 新同步事务的设置, 针对当前线程的设置
                prepareSynchronization(status, definition);
                return status;
            }
        }
    
        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        // 对已经存在的事务的属性进行校验
        if (isValidateExistingTransaction()) {
            // 隔离级别的校验 <- 是否一致性 TransactionDefinition 与 TransactionSynchronizationManager 中的值
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            // readOnly 的校验 <- 是否一致性 TransactionDefinition 与 TransactionSynchronizationManager 中的值
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        //  返回 TransactionStatus 注意第三个参数 false 表示 当前事务没有使用新事务
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
    

    提交事务
    在提交事务时, 主要是放在 TransactionSynchronousManager 中回调函数的调用, 事务的提交, savePoint 的处理

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                // 事务提交的准备工作由具体的事务处理器完成
                prepareForCommit(status);                   // 预留方法, 留给子类区扩充
                triggerBeforeCommit(status);                // 添加的 TransactionSynchronization 中的对应方法 beforeCommit 的调用
                triggerBeforeCompletion(status);            // 添加的 TransactionSynchronization 中的对应方法 beforeCompletion 的调用
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;         // 手动设置回回滚 <- 这里是 针对分布式事务
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {                // 若是嵌套事务, 则直接释放保存点
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();          // 如果存在保存点则清除保存点信息
                }
                /**
                 * 下面对当前线程中保存的事务状态进行处理, 如果当前的事务是一个新事务, 调用具体的事务处理器完成提交
                 * 如果当前所持有的事务不是新事务, 则不提交, 由已有的事务来完成提交
                 */
                else if (status.isNewTransaction()) {       // 若是新事务, 则直接提交
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    doCommit(status);                       // 进行事务的提交 <- 这是个抽象方法, 交由子类实现
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {                   // 分布式事务里面手动设置了 rollbackOnly 则直接抛出异常 <-- 这个现在很少使用了
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {         // 在 commit 提交异常时, 是否需要进行回滚操作
                    doRollbackOnCommitException(status, ex);
                }
                else {                                     // 添加的 TransactionSynchronization 中的对应方法 afterCompletion 的调用
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {                  // 提交的过程中, 若出现 RuntimeException 则直接回滚
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);      // 添加的 TransactionSynchronization 中的对应方法 beforeCompletion 的调用
                }
                doRollbackOnCommitException(status, err); // 提交过程中出现异常则回滚
                throw err;
            }
            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            // 触发 AfterCommit 回滚
            try {
                triggerAfterCommit(status);               // 添加的 TransactionSynchronization 中的对应方法 afterCommit 的调用
            }
            finally {                                     // 添加的 TransactionSynchronization 中的对应方法 afterCompletion 的调用
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }
        }
        finally {                                          // commit 完成, 这里做一些清理数据的工作
            cleanupAfterCompletion(status);
        }
    }
    

    相关文章

      网友评论

          本文标题:Spring的Aop与事务源码分析

          本文链接:https://www.haomeiwen.com/subject/gtqxjqtx.html