美文网首页
Spring Tx源码解析(一)

Spring Tx源码解析(一)

作者: anyoptional | 来源:发表于2021-04-19 07:00 被阅读0次

    引言

      Spring提供的声明式事务想必大家都很熟悉了,简简单单一个@Transactional注解便能提供如此强大的功能,那么它是如何实现的呢?带着这点好奇心,我们一起扒拉扒拉spring-tx-5.2.6.RELEASE的源码吧。

      所谓工欲善其事,必先利其器,深入源码之前了解清楚spring-tx的相关概念还是很有必要的。本篇算是Spring AOP源码解析的姊妹篇,毕竟声明式事务就是AOP思想的一个实际应用嘛。

    Spring Tx Concepts

    PlatformTransactionManager

      PlatformTransactionManager是Spring事务管理的核心接口,它规范了应用程序操作事务的方式。

    public interface PlatformTransactionManager extends TransactionManager {
        /**
         * 获取事务的状态信息(依据传播行为的不同,可能返回一个已激活的事务或创建一个新的独立事务)
         */
        TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
    
        /**
         * 提交当前事务,依据事务的当前状态,也可能会进行回滚,比如标记为rollback-only的事务
         */
        void commit(TransactionStatus status) throws TransactionException;
    
        /**
         * 回滚当前事务,依据传播行为的不同,非独立的内部事务仅会打上rollback-only标记
         */
        void rollback(TransactionStatus status) throws TransactionException;
    }
    
    
    TransactionDefinition

      TransactionDefinition描述了事务的相关属性,比如事务的隔离级别、传播行为,亦或是超时时长是多少等等。

    public interface TransactionDefinition {
    
        /**
         * 返回事务的传播行为
         */
        default int getPropagationBehavior() {
            return PROPAGATION_REQUIRED;
        }
    
        /**
         * 返回事务的隔离级别
         */
        default int getIsolationLevel() {
            return ISOLATION_DEFAULT;
        }
    
        /**
         * 获取事务的超时时长
         */
        default int getTimeout() {
            return TIMEOUT_DEFAULT;
        }
    
        /**
         * 是否是只读事务
         */
        default boolean isReadOnly() {
            return false;
        }
    
        /**
         * 获取事务名称,多用于debug
         */
        @Nullable
        default String getName() {
            return null;
        }
    }
    

      事务的隔离级别想必不用多做解释,传播行为(Propagation Behavior)是什么概念呢?简单来说,当一个事务方法被另一个事务方法调用时,传播行为可以控制是否需要创建事务以及如何创建事务,spring-tx中定义了7种事务传播行为:

    1. PROPAGATION_REQUIRED: 表示方法必须运行在事务中,如果当前事务存在,方法将会在该事务中运行,否则将开启一个新的独立事务
    2. PROPAGATION_SUPPORTS:表示方法支持在事务中运行,如果当前事务存在,方法将会在该事务中运行,否则以非事务方式运行
    3. PROPAGATION_MANDATORY:表示方法必须运行在事务中,如果当前事务不存在,抛出异常
    4. PROPAGATION_REQUIRES_NEW:表示方法必须运行在独立事务中,无论当前是否存在事务,该级别总会开启一个新的独立事务
    5. PROPAGATION_NOT_SUPPORTED:表示方法不支持在事务中运行,如果当前事务存在,挂起当前事务从而以非事务方式运行
    6. PROPAGATION_NEVER:表示方法不支持在事务中运行,如果当前事务存在,抛出异常
    7. PROPAGATION_NESTED:表示方法必须运行在事务中,如果当前事务存在,开启一个嵌套事务(Savepoint),否则等同于PROPAGATION_REQUIRED

      TransactionDefition更多的是表达通用的概念,它的子接口TransactionAttribute添加了基于AOP的rollbackOn(...)操作。

    public interface TransactionAttribute extends TransactionDefinition {
       /**
        * 用于在Spring IoC Container中获取PlatformTransactionManager
        */
       @Nullable
       String getQualifier();
    
       /**
        * 判断是否在遇到指定类型的异常时进行回滚
        */
       boolean rollbackOn(Throwable ex);
    }
    

    再看一眼@Transactional注解的定义。

    public @interface Transactional {
    
        @AliasFor("transactionManager")
        String value() default "";
    
        @AliasFor("value")
        String transactionManager() default "";
    
        Propagation propagation() default Propagation.REQUIRED;
    
        Isolation isolation() default Isolation.DEFAULT;
    
        int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
    
        boolean readOnly() default false;
    
        Class<? extends Throwable>[] rollbackFor() default {};
    
        String[] rollbackForClassName() default {};
    
        Class<? extends Throwable>[] noRollbackFor() default {};
    
        String[] noRollbackForClassName() default {};
    
    }
    

    想必你也看出来了,TransactionAttribute正是对运行时获取到的@Transactional注解的封装。

    TransctionStatus

      TransctionStatus描述了某一时间点上事务的状态信息,比如是否是新开启的独立事务、是否已完成以及是否打上了rollback-only标记等等,并且为了支持嵌套事务,TransctionStatus还额外提供了对保存点(Savepoint)的支持。

    // 管理Savepoint
    public interface SavepointManager {
        /**
         * 创建一个新的Savepoint,后续可以回滚到指定的SavePoint
         */
        Object createSavepoint() throws TransactionException;
    
        /**
         * 回滚到指定的Savepoint
         */
        void rollbackToSavepoint(Object savepoint) throws TransactionException;
    
        /**
         * 释放指定Savepoint
         */
        void releaseSavepoint(Object savepoint) throws TransactionException;
    }
    
    // 代表事务的当前状态
    public interface TransactionExecution {
        /**
         * 检查当前事务是否是一个独立事务,返回false表示是加入的一个已存在的事务或以非事务方式运行
         */
        boolean isNewTransaction();
    
        /**
         * 给当前事务打上rollback-only标记,被标记为rollback-only的事务只能被回滚而不会被提交
         */
        void setRollbackOnly();
    
        /**
         * 检查当前事务是否打上了rollback-only标记
         */
        boolean isRollbackOnly();
    
        /**
       * 检查当前事务是否已完成,已提交或已回滚都认为是已完成
         */
        boolean isCompleted();
    }
    
    public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
        /**
         * 检查当前事务是否携带有Savepoint,也就是说是否创建了嵌套事务
         */
        boolean hasSavepoint();
    
        /**
         * 如果底层TxManager支持的话就有用,比如Hibernate Session,
         * 而对于JDBC DataSource/Connection来讲是没有flush这个概念的,基本上是no-op
         */
        @Override
        void flush();
    }
    

    Related Concepts

    ResourceHolder

      当一个事务方法 A 调用另一个事务方法 B 的时候,如何保证这两个方法运行在同一个事务中呢?如果方法 A 和 B 使用不同的java.sql.Connection来操作数据库,能保证它们运行在同一个事务中吗?很明显,是不能的。多个方法运行在同一个事务中的前提是它们必须使用同一个java.sql.Connection,以伪代码的形式就是:

    try {
      connection.setAutoCommit(false)
        // 方法 A 执行 sql
      methodA(connection);
        // 方法 B 执行 sql
      methodB(connection);
        connection.commit() 
    } catch (Exception ex) {
      connection.rollback() 
    }
    

    java.sql.Connection实例必须传递给方法 A 和方法 B,这样才能保证它们使用同一个连接对象。实际开发中,我们并没有像这样传递过连接对象,spring-tx将我们从这些细节中解放了出来。

      ResouceHolder就是设计来包裹底层连接资源的,spring-tx内部会使用线程私有存储ThreadLocal在同一个线程中进行传递,对方法 A 和 方法 B 来说,只要它们运行在同一个线程中,就能使用上同一个连接对象。当然了,ResourceHolder并非只能携带java.sql.Connection,对于使用MyBatis的用户来说,它携带的就是SqlSession了。

    public interface ResourceHolder {
        /**
         * 重置
         */
        void reset();
    
        /**
         * 解绑连接资源
         */
        void unbound();
    
        /**
       * 检查此Holder是否携带有连接资源
         */
        boolean isVoid();
    }
    
    TransactionSynchronization

      由于spring-tx全盘接管了事务管理,那么它自然可以管理事务的生命周期。 TransactionSynchronization就是这样一个回调接口,它为我们揭示了事务运行时的各个阶段,如果我们需要在事务执行前后做一些额外的操作,使用它就再好不过了。

    public interface TransactionSynchronization extends Flushable {
        /** 事务已提交 */
        int STATUS_COMMITTED = 0;
        /** 事务已回滚 */
        int STATUS_ROLLED_BACK = 1;
        /** 状态未知 */
        int STATUS_UNKNOWN = 2;
    
        /**
         * 事务被挂起时回调
         */
        default void suspend() {
        }
    
        /**
       * 事务恢复时回调
         */
        default void resume() {
        }
    
        @Override
        default void flush() {
        }
    
        /**
         * 事务提交前回调
         */
        default void beforeCommit(boolean readOnly) {
        }
    
        /**
       * 事务完成前回调,也就是在事务管理器 commit/rollback 之前
         */
        default void beforeCompletion() {
        }
    
        /**
         * 事务成功提交后回调
         */
        default void afterCommit() {
        }
    
        /**
         * 事务完成后回调,status揭示了事务当前状态
         */
        default void afterCompletion(int status) {
        }
    }
    
    TransactionSynchronizationManager

      spring-txResourceHolder的绑定和传递、TransactionSynchronization的注册和获取,均是代理给TransactionSynchronizationManager来完成的。

    public abstract class TransactionSynchronizationManager {
    
       private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
    
       // ResourceHolder的绑定关系
       // 比如在DataSourceTransactionManager中key是java.sql.DataSource,value是ConnectionHolder
       private static final ThreadLocal<Map<Object, Object>> resources =
             new NamedThreadLocal<>("Transactional resources");
         // 已注册的TransactionSynchronization
       private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
             new NamedThreadLocal<>("Transaction synchronizations");
         // 当前的事务名称
       private static final ThreadLocal<String> currentTransactionName =
             new NamedThreadLocal<>("Current transaction name");
         // 当前事务是否是只读事务
       private static final ThreadLocal<Boolean> currentTransactionReadOnly =
             new NamedThreadLocal<>("Current transaction read-only status");
       // 当前事务的隔离级别
       private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
             new NamedThreadLocal<>("Current transaction isolation level");
         // 事务是否真的被激活
       private static final ThreadLocal<Boolean> actualTransactionActive =
             new NamedThreadLocal<>("Actual transaction active");
    
       //-------------------------------------------------------------------------
       // Management of transaction-associated resource handles
       //-------------------------------------------------------------------------
    
       /**
        * 根据提供的key找寻底层资源
        */
       @Nullable
       public static Object getResource(Object key) {
          Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
          Object value = doGetResource(actualKey);
          if (value != null && logger.isTraceEnabled()) {
             logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                   Thread.currentThread().getName() + "]");
          }
          return value;
       }
    
       @Nullable
       private static Object doGetResource(Object actualKey) {
          Map<Object, Object> map = resources.get();
          if (map == null) {
             return null;
          }
          Object value = map.get(actualKey);
          // 对于ResourceHolder#isVid()的情况,认为其没有实际绑定上任何资源
          if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
             map.remove(actualKey);
             if (map.isEmpty()) {
                resources.remove();
             }
             value = null;
          }
          return value;
       }
    
       /**
        * 绑定一个资源
        */
       public static void bindResource(Object key, Object value) throws IllegalStateException {
          Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
          Assert.notNull(value, "Value must not be null");
          Map<Object, Object> map = resources.get();
            if (map == null) {
             map = new HashMap<>();
             resources.set(map);
          }
          Object oldValue = map.put(actualKey, value);
          // ResourceHolder#isVid()等同于不存在任何资源
          if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
             oldValue = null;
          }
          // TransactionSynchronizationManager中的任何操作,都只能先解除再操作,而不能进行覆盖
          if (oldValue != null) {
             throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                   actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
          }
          if (logger.isTraceEnabled()) {
             logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                   Thread.currentThread().getName() + "]");
          }
       }
    
       /**
            * 解绑资源,如果资源不存在抛出异常
        */
       public static Object unbindResource(Object key) throws IllegalStateException {
          Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
          Object value = doUnbindResource(actualKey);
          if (value == null) {
             throw new IllegalStateException(
                   "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
          }
          return value;
       }
    
       /**
            * 解绑资源,资源不存在等于no-op
        */
       @Nullable
       public static Object unbindResourceIfPossible(Object key) {
          Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
          return doUnbindResource(actualKey);
       }
    
       @Nullable
       private static Object doUnbindResource(Object actualKey) {
          Map<Object, Object> map = resources.get();
          if (map == null) {
             return null;
          }
          Object value = map.remove(actualKey);
          if (map.isEmpty()) {
             resources.remove();
          }
          // ResourceHolder#isVid()等同于不存在任何资源
          if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
             value = null;
          }
          if (value != null && logger.isTraceEnabled()) {
             logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
                   Thread.currentThread().getName() + "]");
          }
          return value;
       }
    
       //-------------------------------------------------------------------------
       // Management of transaction synchronizations
       //-------------------------------------------------------------------------
    
       /**
        * 检查transaction synchronization是否已激活
        */
       public static boolean isSynchronizationActive() {
          return (synchronizations.get() != null);
       }
    
       /**
            * 激活transaction synchronization,如果已激活,抛出异常
        */
       public static void initSynchronization() throws IllegalStateException {
          if (isSynchronizationActive()) {
             throw new IllegalStateException("Cannot activate transaction synchronization - already active");
          }
          logger.trace("Initializing transaction synchronization");
          synchronizations.set(new LinkedHashSet<>());
       }
    
       /**
            * 注册一个TransactionSynchronization,如果transaction synchronization未激活,抛出异常
        */
       public static void registerSynchronization(TransactionSynchronization synchronization)
             throws IllegalStateException {
          Assert.notNull(synchronization, "TransactionSynchronization must not be null");
          Set<TransactionSynchronization> synchs = synchronizations.get();
          if (synchs == null) {
             throw new IllegalStateException("Transaction synchronization is not active");
          }
          synchs.add(synchronization);
       }
    
       /**
        * 返回已注册的所有TransactionSynchronization,如果transaction synchronization未激活,抛出异常
        */
       public static List<TransactionSynchronization> getSynchronizations() throws IllegalStateException {
          Set<TransactionSynchronization> synchs = synchronizations.get();
          if (synchs == null) {
             throw new IllegalStateException("Transaction synchronization is not active");
          }
          if (synchs.isEmpty()) {
             return Collections.emptyList();
          }
          else {
             List<TransactionSynchronization> sortedSynchs = new ArrayList<>(synchs);
             // 排序,TransactionSynchronization可实现Ordered接口或使用@Order注解指定优先级
             AnnotationAwareOrderComparator.sort(sortedSynchs);
             return Collections.unmodifiableList(sortedSynchs);
          }
       }
    
       /**
            * 关闭transaction synchronization
        */
       public static void clearSynchronization() throws IllegalStateException {
          if (!isSynchronizationActive()) {
             throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
          }
          logger.trace("Clearing transaction synchronization");
          synchronizations.remove();
       }
    
       //-------------------------------------------------------------------------
       // Exposure of transaction characteristics
       //-------------------------------------------------------------------------
    
       /**
            * 记录当前事务名称
        */
       public static void setCurrentTransactionName(@Nullable String name) {
          currentTransactionName.set(name);
       }
    
       /**
            * 返回当前事务名称
        */
       @Nullable
       public static String getCurrentTransactionName() {
          return currentTransactionName.get();
       }
    
       /**
            * 设置当前事务为只读事务
        */
       public static void setCurrentTransactionReadOnly(boolean readOnly) {
          currentTransactionReadOnly.set(readOnly ? Boolean.TRUE : null);
       }
    
       /**
            * 检查当前事务是否为只读事务
        */
       public static boolean isCurrentTransactionReadOnly() {
          return (currentTransactionReadOnly.get() != null);
       }
    
       /**
            * 设置当前事务的隔离级别
        */
       public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel) {
          currentTransactionIsolationLevel.set(isolationLevel);
       }
    
       /**
            * 返回当前事务的隔离级别
        */
       @Nullable
       public static Integer getCurrentTransactionIsolationLevel() {
          return currentTransactionIsolationLevel.get();
       }
    
       /**
        * 设置当前事务已开启
        */
       public static void setActualTransactionActive(boolean active) {
          actualTransactionActive.set(active ? Boolean.TRUE : null);
       }
    
       /**
        * 检查当前事务已开启,也就是说是否真的存在事务而不仅仅只有transaction synchronization
        */
       public static boolean isActualTransactionActive() {
          return (actualTransactionActive.get() != null);
       }
    
       /**
            * 完全清除transaction synchronization各种状态信息
        */
       public static void clear() {
          synchronizations.remove();
          currentTransactionName.remove();
          currentTransactionReadOnly.remove();
          currentTransactionIsolationLevel.remove();
          actualTransactionActive.remove();
       }
    }
    

    注意,TransactionSynchronizationManager在操作ResourceHolder时是不允许直接覆盖的,旧的资源必须先解绑才能绑定新的资源。同时TransactionSynchronization只能在transaction synchronization激活时才能绑定,为此TransactionSynchronizationManager提供了initSynchronization()clearSynchronization()来分别开启开启和关闭transaction synchronization

    结语

      抽象是编程的先决条件,编码不过是对抽象的具体实现。好的抽象才能带出好的代码,spring-tx也是如此,下一篇让我们一起钻到具体的实现细节里去吧~~

    相关文章

      网友评论

          本文标题:Spring Tx源码解析(一)

          本文链接:https://www.haomeiwen.com/subject/bkrmlltx.html