美文网首页
浅析@Transactional

浅析@Transactional

作者: b335eb9201c3 | 来源:发表于2021-01-19 14:20 被阅读0次

0.@Transactional的工作原理***

1.Spring事务管理是基于spring动态代理技术,通过AOP实施事务增强的。

2.spring动态代理分为jdk动态代理和CGLib,这两种动态代理的实现,都是基于接口或者类来实现,因此事物生效也是基于类的,也就是事物必须过切面!

 3.对于基于接口动态代理的AOP事务增强来说,由于接口的方法是public的,这就要求实现类的实现方法必须是public的(不能是protected,private等),同时不能使用static的修饰符。所以,可以实施接口动态代理的方法只能是使用“public”或“public final”修饰符的方法,其它方法不可能被动态代理,相应的也就不能实施AOP增强,也即不能进行Spring事务增强。

4.基于CGLib字节码动态代理的方案是通过扩展被增强类,动态创建子类的方式进行AOP增强植入的。由于使用final,static,private修饰符的方法都不能被子类覆盖,相应的,这些方法将不能被实施的AOP增强。

小结:基于@Transactional注解,必须过切面也就是垮类,必须是public修饰,不能有static修饰,最好也不要有final修饰。

0-1.TransactionInterceptor初识***

image

继承自TransactionAspectSupport类(该类包含与Spring的底层事务API的集成),实现了MethodInterceptor接事务拦截器的拦截功能就是依靠实现了MethodInterceptor接口,熟悉spring的同学肯定很熟悉MethodInterceptor了,这个是spring的方法拦截器,主要看invoke方法。

演示

image.png image.png image.png image.png

1. TransactionInterceptor 整体流程***

当程序执行事务方法的时候,就会先走增强器TransactionInterceptor#invoke方法:

/**
*invocation目标方法
*/
public Object invoke(final MethodInvocation invocation) throws Throwable 
        // 获取目标类
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        // 调用抽象类TransactionAspectSupport.java类中invokeWithinTransaction方法
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
           //回调方法,就是目标类的方法调用
            @Override
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {
        // 获取对应事务属性
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        // 获取 beanFactory 中的 transactionManager,需要配置
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        // 获取方法唯一标识(类.方法 如 xxx.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
        // 声明式事务处理 @Transactional
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 创建事物
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // 执行被增强方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {//Reset the TransactionInfo ThreadLocal.
                cleanupTransactionInfo(txInfo);
            }
            // 提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        // 编程式事务
         else {
             final ThrowableHolder throwableHolder = new ThrowableHolder();

             // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
             try {
                 Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                             @Override
                             public Object doInTransaction(TransactionStatus status) {
                                 TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                 try {
                                     return invocation.proceedWithInvocation();
                                 }
                                 catch (Throwable ex) {// 如果该异常需要回滚
                                    if (txAttr.rollbackOn(ex)) {
                                         // 如果是运行时异常返回
                                         if (ex instanceof RuntimeException) {
                                             throw (RuntimeException) ex;
                                         }// 如果是其它异常都抛ThrowableHolderException
                                         else {
                                             throw new ThrowableHolderException(ex);
                                         }
                                     }// 如果不需要回滚
                                     else {
                                         // 定义异常,最终就直接提交事务了
                                         throwableHolder.throwable = ex;
                                         return null;
                                     }
                                 }
                                 finally {//清空当前事务信息,重置为老的
                                     cleanupTransactionInfo(txInfo);
                                 }
                             }
                         });
 
                 // 上抛异常
                 if (throwableHolder.throwable != null) {
                     throw throwableHolder.throwable;
                 }
                 return result;
             }
             catch (ThrowableHolderException ex) {
                 throw ex.getCause();
             }
             catch (TransactionSystemException ex2) {
                 if (throwableHolder.throwable != null) {
                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                     ex2.initApplicationException(throwableHolder.throwable);
                 }
                 throw ex2;
             }
             catch (Throwable ex2) {
                 if (throwableHolder.throwable != null) {
                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                 }
                 throw ex2;
             }
         }
     }
image.png

从上面的函数中,我们尝试整理下事务处理的脉络,在 Spring 中支持两种事务处理方式,分别是声明式事务与编程式事务处理,两者对于开发人员来说差别很大,但是对于 Spring 中的实现来讲,大同小异。在invoke中,我们也看到这两种方式的实现。考虑到声明式事务相对流行些,我们就以此种方式进行分析。

对于声明式事务处理主要有以下几个步骤:

  1. 获取事务属性。
    该部分我们在分析事务准备阶段时已经分析了,这里就不累述。
  2. 加载配置中配置的TransactionManager
  3. 不同的事务处理方式使用不同的逻辑。
    对于声明式事务的处理与编程式事务的处理,最主要的区别在于TransactionManager上,CallbackPreferringPlatformTransactionManager实现PlatformTransactionManager接口,暴露出一个方法用于执行事务处理中的回调。
  4. 在目标方法执行前获取事务并且收集事务信息
    事务信息与事务属性并不相同,也就是TransactionInfoTransactionaAttribute并不相同,TransactionInfo包含TransactionaAttribute,但是除了TransactionaAttribute外还有其他事务信息,例如PlatformTransactionManager以及TransactionStatus相关信息。
  5. 执行目标方法。
  6. 一旦出现异常,尝试异常处理。
    并不是所有的异常,Spring 都会将其回滚,默认只对RuntimeException回滚,当然这个可以指定。
  7. 提交事务前的事务信息清除。
  8. 提交事务。

上面的步骤分析旨在让大家对事务功能与步骤大致的了解,具体功能还需详细的分析。

2. 创建事务

我们首先分析事务创建的过程。

protected TransactionInfo createTransactionIfNecessary(
            PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
        // 如果没有名称指定,则使用方法唯一标识
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                // 获取 TransactionStatus
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        // 根据指定的属性和 status 构建一个 TransactionInfo
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

对于createTransactionIfNecessary函数主要做了这样几件事情。

  1. 获取事务
  2. 构建事务信息
2.1 获取事务

getTransaction函数,提供了获取事务的功能:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        // 获取 transaction
        Object transaction = doGetTransaction();
        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();
        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }
        // 判断当前线程是否存在事务,判断的依据是当前线程记录的连接不为空且连接(connectionHolder)中的 transactionActive 属性不为空
        if (isExistingTransaction(transaction)) {
            // 当线程已经存在事务
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }
        // 事务超时设置验证
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }
        //如果当前线程不存在事务,但是 propagation 却被声明为 PROPAGATION_MANDATORY(强制必须要有事务),则抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 新建事务,空挂起
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 构造 transaction,包括设置 ConnectionHolder、隔离级别、timeout,如果是新连接,绑定到当前线程
                doBegin(transaction, definition);
                // 新同步事务的设置,针对当前线程的设置
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
image.png

该方法主要有以下几个步骤:

  1. 获取事务。
    创建对应的事务实例。将ConnectionHolder存入事务实例中,ConnectionHolder底层是ThreadLocal保存了当前线程的数据库连接信息,如果当前线程没有数据库连接信息,则存入 null。
  2. 如果当前线程存在事务,则转向嵌套事务的处理。
  3. 事务超时设置验证。
  4. 事务propagationBehavior属性的设置验证。
  5. 构建DefaultTransactionStatus
  6. 完善事务信息transaction,包括设置ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程。

对于一些隔离级别、timeout 等功能的设置并不是由 Spring 来完成的,而是委托底层的数据库连接去做的,而对于数据库连接的设置就是doBegin函数中处理的:

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            // 设置隔离级别
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            // 更改自动提交设置,由 Spring 控制提交
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }
            prepareTransactionalConnection(con, definition);
            // 设置判断当前线程是否存在事务的依据
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                // 将当前获取到的连接绑定到当前线程
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

可以说事务是从这个函数开始的,因为这个函数中已经开始尝试对数据库连接的获取了。当然,在获取数据库连接的同时,一些必要的设置也是需要同步设置的。

  1. 尝试获取连接。
    如果当前线程中的connectionHolder不存在,或者对于事务同步表示设置为 true 的,需要重新连接。反之,复用当前线程连接。
  2. 设置隔离级别以及只读标识。
    你是否有过这样的错觉?事务中的只读配置是 Spring 中做了一些处理? Spring 中确实是针对只读做了一些处理,但是核心的实现是设置connection上的readOnly属性。同样,对于隔离级别的控制也是交由connection去控制的。
  3. 更改默认的提交设置。
    如果事务属性是自动提交,那么需要改变这种设置,而将提交操作交由 Spring 来处理。
  4. 设置标识位,标识当前连接已经被事务激活。
  5. 设置过期时间。
  6. connectionHolder绑定到当前线程。
2.2 处理已经存在的事务

上面讲述的是普通事务创建的过程,但是 Spring 还支持多种事务的传播规则,比如 PROPAGATION_NESTED、PROPAGATION_REQUIRES_NEW 等,这些都是在已经存在事务的基础上进一步的处理。那么对于已经存在的事务,Spring 又是如何处理的呢?

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        // 如果传播方式为 PROPAGATION_NOT_SUPPORTED,则挂起当前事务,并且事务信息设置为 null
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // 如果传播方式为 PROPAGATION_REQUIRES_NEW,则挂起当前事务,并且新建一个独立的事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                // 这里会创建一个新的事务,并且是新的数据库连接
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
        // 嵌入式事务的处理
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // 创建保存点
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // 有些情况是不能使用保存点操作的,比如 JTA,那么建立新事物
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }
        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

对于已经存在事务的处理过程中,函数对已经存在事务处理考虑两种情况:

  • PROPAGATION_REQUIRES_NEW:表示当前方法必须在它自己的事务里运行,一个新的事务将被启动(新的数据库连接),而如果有一个事务正在运行的话,则在这个方法运行期间被挂起。将事务挂起的目的当然是为了在当前事务执行完毕后再将原事务恢复。
  • PROPAGATION_NESTED:表示如果当前正有一个事务正常运行中,则方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于外部事物进行提交或者回滚。如果外部事物不存在,行为就像 PROPAGATION_REQUIRES_NEW。对于嵌入式事务,Spring 主要考虑了两种方式的处理:
  1. Spring 中允许嵌入事务的时候(默认允许),则首选设置保存点的方式作为异常回滚的处理。但是内部事务的数据库连接依旧是外部事务的连接。
  2. 对于其他方式,比如 JTA(跨数据源的分布式事务)无法使用保存点的方式,那么处理方式与 PROPAGATION_REQUIRES_NEW 相同。

3. 回滚事务处理

之前已经完成了目标方法执行之前的事务准备工作,而这些准备工作最大的目的无非就是对于程序没有按照我们期待的那样进行,也就是出现特定的错误。那么出现错误的时候,Spring 是如何对数据进行恢复的呢?

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            // 判断异常是否是 RuntimeException 类型或者是 Error 类型
            if (txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    // 根据 TransactionStatus 信息进行回滚处理
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
                catch (Error err) {
                    logger.error("Application exception overridden by rollback error", ex);
                    throw err;
                }
            }
            else {
                // 如果不满足回滚条件,即使抛出异常也同样会提交
                try {
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
                catch (Error err) {
                    logger.error("Application exception overridden by commit error", ex);
                    throw err;
                }
            }
        }
    }
3.1 回滚条件
public boolean rollbackOn(Throwable ex) {
        return (ex instanceof RuntimeException || ex instanceof Error);
    }

默认情况下,Spring 中的事务异常处理机制只对RuntimeExceptionError两种情况有效。当然你可以指定异常处理类型,例如:

@Transactional(rollbackFor=Exception.class)

3.2 回滚处理

当然,一旦符合回滚条件,那么 Spring 就会将程序引导至回滚的处理函数。

public final void rollback(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        processRollback(defStatus);
    }
private void processRollback(DefaultTransactionStatus status) {
        try {
            try {
                // 激活所有 TransactionSynchronization 中对应的 beforeCompletion 方法
                triggerBeforeCompletion(status);
                // 如果有保存点,则回退到保存点
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                   
                    status.rollbackToHeldSavepoint();
                }
                // 如果当前事务为独立的新事物,则直接回滚
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    
                    doRollback(status);
                }
                // 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚
                else if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }
                        doSetRollbackOnly(status);
                    }
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
            }
            catch (RuntimeException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            catch (Error err) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw err;
            }
            // 激活所有 TransactionSynchronization 中对应的 afterCompletion 方法
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        }
        finally {
            // 清空记录的资源并将挂起的资源恢复
            cleanupAfterCompletion(status);
        }
    }

我们总结下 Spring 中对于回滚处理的大致脉络:

  1. 首先是自定义触发器的调用,包括在回滚前、回滚后的调用。对于触发器的注册,常见的是在回调过程中通过TransactionSynchronizationManager类中的静态方法直接注册:
public static void registerSynchronization(TransactionSynchronization synchronization)
  1. 除了触发监听函数外,就是真正的回滚逻辑处理了。
  • 当之前已经存在保存点信息的时候,使用保存点信息进行回滚。常用于嵌入式事务,对于嵌入式事务的处理,内部的事务异常并不会引起外部事务的回滚,只有外部事务的回滚或者提交才是事务完成的标志
    保存点回滚的实现方式其实是通过底层的数据库连接进行的。

  • 当事务信息为新事务时,那么直接回滚。常用于单独事务的处理,如 PROPAGATION_REQUIRED 和 PROPAGATION_REQUIRES_NEW。对于没有保存点的回滚,Spring 同样是使用底层数据库连接提供的 API 来操作的。

  • 当事务信息中表明已经存在事务,又不属于以上两种情况,多数用于 JTA,只做回滚标识,等到提交的时候统一进行回滚。

3.3 回滚后的信息清除

对于回滚逻辑执行结束后,无论回滚成功与否,都必须要做的事情就是事务结束后的收尾工作。

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        // 设置完成状态
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
            if (status.isDebug()) {
                logger.debug("Resuming suspended transaction after completion of inner transaction");
            }
            // 将挂起的事务恢复
            resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

从函数中得知,事务处理的收尾工作包括如下内容:

  1. 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除。
  2. 如果当前事务是新事务,则做些清除资源的操作。如:
  • 将数据库连接从当前线程中接触绑定
  • 恢复数据库连接的自动提交属性,并且重置数据库连接
  • 如果当前事务是独立的新事务,则释放数据库连接
  • 清空ConnectionHolder
  1. 如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起的事务恢复。

4. 提交事务处理

之前我们分析了 Spring 的事务异常处理机制,那么事务的执行并没有出现任何异常,也就意味着可以走正常的事务提交流程了。

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }
public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        // 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus);
            return;
        }
        // 如果当前事务被标记为回滚状态,则进行回滚操作
        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            processRollback(defStatus);
            // Throw UnexpectedRollbackException only at outermost transaction boundary
            // or if explicitly asked to.
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
            return;
        }
        // 执行提交操作
        processCommit(defStatus);
    }

而事务执行一切正常的时候,便可以真正进入提交流程了。

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                prepareForCommit(status);
                // 激活 TransactionSynchronization 中的 beforeCommit 方法
                triggerBeforeCommit(status);
               // 激活 TransactionSynchronization 中的 beforeCompletion 方法
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                // 如果存在保存点则清除保存点信息
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();
                }
                // 如果是独立的新事务则直接提交
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }
            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

在提交过程中并不是直接提交的,而是考虑了诸多方面,符合提交的条件如下:

  • 当事务状态中有保存点信息的便不会去提交事务
  • 当事务非新事务的时候也不会去执行提交事务操作

此条件主要考虑内嵌事务的情况,对于内嵌事务,在 Spring 中正常的处理方式是将内嵌事务开始之前设置保存点,一旦内嵌事务出现异常便根据保存点信息进行回滚。但是如果没有出现异常,内嵌事务并不会单独提交,而是根据事务流由最外层事务负责提交,所以如果当前存在保存点信息便不是最外层事务,不做提交操作。即只有是独立的新事务才会提交。

5. @transational属性介绍***

下面看一下@Transactional注解的各项参数。

public @interface  Transactional { 
@AliasFor("transactionManager")
String value() default "";
@AliasFor("value")
String transactionManager() default "";

Propagation propagation() default Propagation.REQUIRED;

Isolation isolation() default Isolation.DEFAULT;

int timeout() default -1;

boolean readOnly() default false;

Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};
}

5.1 timtout(超时)不常用

timtout是用来设置事务的超时时间,可以看到默认为-1,不会超时。

5.2 isolation(隔离级别) 不常用

isolation属性是用来设置事务的隔离级别,数据库有四种隔离级别:读未提交、读已提交、可重复读、可串行化。MySQL的默认隔离级别是可重复读。

public enum Isolation {

DEFAULT(-1),默认使用使用数据库的隔离级别

READ_UNCOMMITTED(1), // 读未提交

READ_COMMITTED(2), // 读已提交

REPEATABLE_READ(4), // 可重复读

SERIALIZABLE(8); // 可串行化

}

5.3 readOnly(只读) 不常用

readOnly属性用来设置该属性是否是只读事务,只读事务要从两方面来理解:它的功能是设置了只读事务后在整个事务的过程中,其他事务提交的内容对当前事务是不可见的。

那为什么要设置只读事务呢?它的好处是什么?可以看出它的作用是保持整个事务的数据一致性,如果事务中有多次查询,不会出现数据不一致的情况。所以在一个事务中如果有多次查询,可以启用只读事务,如果只有一次查询就无需只读事务了。

另外,使用了只读事务,数据库会提供一些优化。

但要注意的是,只读事务中只能有读操作,不能含有写操作,否则会报错。

5.4 propagation最常用

propagation属性用来设置事务的传播行为,对传播行为的理解,可以参考如下场景,一个开启了事务的方法A,调用了另一个开启了事务的方法B,此时会出现什么情况?这就要看传播行为的设置了。

propagation属性用来设置事务的传播行为,对传播行为的理解,可以参考如下场景,一个开启了事务的方法A,调用了另一个开启了事务的方法B,此时会出现什么情况?这就要看传播行为的设置了。

public enum Propagation {

REQUIRED(0), // 如果有事务则加入,没有则新建,默认

SUPPORTS(1), // 如果已有事务就用,如果没有就不开启(继承关系)

MANDATORY(2), // 必须在已有事务中

REQUIRES_NEW(3), // 不管是否已有事务,都要开启新事务,老事务挂起

NOT_SUPPORTED(4), // 不开启事务

NEVER(5), // 必须在没有事务的方法中调用,否则抛出异常

NEVER(5), // 如果已有事务,则嵌套执行,如果没有,就新建(和REQUIRED类似,和REQUIRES_NEW容易混淆)

}

REQUIRES_NEW 和 NESTED非常容易混淆,因为它们都是开启了一个新的事务。我去查询了一下它们之间的区别,大概是这样:

REQUIRES_NEW是开启一个完全的全新事务,和当前事务没有任何关系,可以单独地失败、回滚、提交。并不依赖外部事务。在新事务执行过程中,老事务是挂起的。

NESTED也是开启新事务,但它开启的是基于当前事务的子事务,如果失败的话单独回滚,但如果成功的话,并不会立即commit,而是等待外部事务的执行结果,外部事务commit时,子事务才会commit。

5.5 rollbackFor 常用

当方法内抛出指定的异常时,进行事务回滚。rollbackForClassName也是类似的。

rollbackFor有个问题是默认情况会做什么,以前认为默认会对所有异常进行回滚,但其实默认情况下只对RuntimeException回滚。

所以,使用需要显示指定回滚异常,示例

image.png

5.6 noRollbackFor 不常用

这个和上面正好相反,用来设置出现指定的异常时,不进行回滚。

6. @transational不生效解决方案***

6.1 常见不生效原因

       1.项目未配置支持事物!概率较小。

2.方法内使用try-catch捕获了异常。概率较大

3.方法是非public,概率较小。

4.事物未显示指定回滚异常,概率较小,因为一般抛出的异常都是运行时异常

5.同一类中方法互相调用,概率极大。

6.2 对同一类中方法互相调用的解决方案:

情形:

1.同一个类无@Transational的A方法调用有@Transational的B方法,B方法的事物无效。

2.同一类中有@Transational的A方法调用有@Transational的B方法,B方法的事物传播级别是新建事物REQUEST_NEW,B方法的事物无效。

解决:

1.对B方法新建一个B类。

缺点:虽能解决问题,但是不能通用,也不符合迪米特法则。

2.采用代理的设计模式构建通用的事物接口;

示例:

第一步:声明事物接口

image.png

第二步:声明事物客户接口

image.png

第三步:提供事物的代理

image.png

第四步:应用

image.png

7. @transational事物提交后执行

一言以蔽之:将操作绑定到数据库事务上,事物提交则执行,否则不执行。

TransactionSynchronizationAdapter是一个适配器:它实现了TransactionSynchronization接口,并为每一个接口方法提供了一个空的实现。这类适配器的基本思想是:接口中定义了很多方法,然而业务代码往往只需要实现其中一小部分。利用这种“空实现”适配器,我们可以专注于业务上需要处理的回调方法,而不用在业务类中放大量而且重复的空方法。

    结合TransactionSynchronizationManager和TransactionSynchronizationAdapter利用ThreadPoolExecutor实现一个事务后多线程处理功能。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@Service 
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
private static final ThreadLocal<List> RUNNABLES = new ThreadLocal<List>();
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
@Override
public void execute(Runnable runnable) {
    LOGGER.info("Submitting new runnable {} to run after commit", runnable);
    //如果事务同步未启用则认为事务已经提交,马上进行异步处理
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
        runnable.run();
        return;
    }
    //如果存在事务则在事务结束后异步处理
    List<Runnable> threadRunnables = RUNNABLES.get();
    if (threadRunnables == null) {
        threadRunnables = new ArrayList<Runnable>();
        RUNNABLES.set(threadRunnables);
        TransactionSynchronizationManager.registerSynchronization(this);
    }
    threadRunnables.add(runnable);
}

@Override
public void afterCommit() {
    List<Runnable> threadRunnables = RUNNABLES.get();
    LOGGER.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
    for (int i = 0; i < threadRunnables.size(); i++) {
        Runnable runnable = threadRunnables.get(i);
        LOGGER.info("Executing runnable {}", runnable);
        try {
            threadPool.execute(runnable);
        } catch (RuntimeException e) {
            LOGGER.error("Failed to execute runnable " + runnable, e);
        }
    }
}

@Override
public void afterCompletion(int status) {
    LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
    RUNNABLES.remove();
}

相关文章

网友评论

      本文标题:浅析@Transactional

      本文链接:https://www.haomeiwen.com/subject/ebiqzktx.html