美文网首页Spring相关时序图系列
Spring AOP(9)事务传播属性再解析

Spring AOP(9)事务传播属性再解析

作者: 涣涣虚心0215 | 来源:发表于2021-01-25 01:06 被阅读0次

    Spring Transaction时序图这里面看代码及方法调用会更加清晰。
    总的看下来涉及到的类就是JDKDynamicAopProxy,TransactionInterceptor (继承自TransactionAspectSupport),DataSourceTransactionManager(继承AbstractPlatformTransactionManager)。
    大体流程就是TransactionInterceptor提供了为目标对象提供创建事务代理对象,使得事务生效,它通过调用TransactionManager来创建Transaction对象,根据TransactionAttribute来判断是不是挂起当前事务,会在doBegin中将autoCommit设置为false,并且根据DataSource创建connectionHolder,在此创建的connectionHolder会放入TransactionSynchronizationManager的ThreadLocal的变量中,方便jdbcTemplate等获取connection对象执行SQL,最后再通过TransactionManager做commit或者rollback。

    上一篇中在getTransaction()的过程中没有分析一个复杂的场景,即存在Transaction的时候如何根据TransactionAttribute的传播属性处理当前Transaction。
    核心代码即handleExistingTransaction()

    //TransactionDefinition是TransactionAttribute的父类,这里传入的transaction和TransactionAttribute
    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        //如果传播属性是PROPAGATION_NEVER,表示不需要Transaction
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        //如果是PROPAGATION_NOT_SUPPORTED,则需要挂起当前线程Transaction,并且将connection从ThreadLocal对象移除。
        //  返回prepareTransactionStatus里面的transaction是null
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //null是代表新的Transaction为null,没有Transaction
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        //如果是PROPAGATION_REQUIRES_NEW,挂起当前事务,创建新的事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            //suspend的意思,其实是将Transaction对象类的connectionHolder踢出来,保存到SuspendedResourcesHolder中,在新事务commit之后还会通过SuspendedResourcesHolder,resume回来。
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //这里的Transaction其实是空的,所以doBegin会创建新的connection放到Transaction中
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }
        //PROPAGATION_NESTED,创建savepoint
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }
    
        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
    

    其实主要看一下PROPAGATION_REQUIRES_NEW,意思就是如果当前线程不存在事务,就创建新的事务;如果当前存在事务,就挂起当前事务,创建新的事务。
    事务Transaction,就是DataSourceTransactionObject,它里面存在一个connectionHolder,而且每次通过代理调用的时候都会在getTransaction() --> doGetTransaction()里new创建新的DataSourceTransactionObject,但是里面的connectionHolder是从TransactionSynchronizationManager里面获得的,即ThreadLocal变量中获取。
    所以第二个事务发现当前DataSourceTransactionObject里面存在connectionHolder,即存在事务,那么就会挂起当前事务,那么suspend(transaction)应该就是将当前connectionHolder设置为null,并调用unbindResource将connectionHolder从ThreadLocal中移除并返回存放至suspendedResources中,等待被resume,后面接着第二个事务会进行doBegin(),这里就会为第二事务回去新的connection,从而开启新的事务。

    commit方法里就能看到,事务最终的本质还是通过connection对象去提交SQL,而基于DataSource的事务里面涉及到的对象主要有:

    • TransactionInfo包含了TransactionStatus。
    • TransactionStatus里面包含了Transaction对象DataSourceTransactionObject。
    • DataSourceTransactionObject包含了ConnectionHolder。
    • ConnectionHolder里面即包含了connection对象。
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }
    

    相关文章

      网友评论

        本文标题:Spring AOP(9)事务传播属性再解析

        本文链接:https://www.haomeiwen.com/subject/tmdyzktx.html