美文网首页
Spring 源码分析之事务

Spring 源码分析之事务

作者: 突突兔007 | 来源:发表于2020-09-02 18:46 被阅读0次

    Spring 源码分析之事务1
    Spring 源码分析之事务2 TransactionStatus与TransactionInfo
    Spring 源码分析之事务3 事务的提交与回滚

    事务的传播特性:

    1. PROPAGATION_REQUIRED: 如果存在一个事务,则支持当前事务。如果没有事务则开启
    2. PROPAGATION_SUPPORTS: 如果存在一个事务,支持当前事务。如果没有事务,则非事务的执行
    3. PROPAGATION_MANDATORY: 如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常。
    4. PROPAGATION_REQUIRES_NEW: 总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。
    5. PROPAGATION_NOT_SUPPORTED: 总是非事务地执行,并挂起任何存在的事务。
    6. PROPAGATION_NEVER: 总是非事务地执行,如果存在一个活动事务,则抛出异常 。
    7.PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 PROPAGATION_REQUIRED 类似的操作

    目前只需要关注PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED

    贴一段很常见代码

    @Service
    public class UserServiceImpl implements UserService {
    
        @Autowired
        private SysBalanceService sysBalanceService;
    
        @Autowired
        private SysLogService sysLogService;
    
        @Transactional
        @Override
        public void transfer(BigDecimal amount) throws SQLException{
            sysBalanceService.updateBalance(amount); //①
            sysLogService.addLog("zhangsan",amount);//②
        }
    }
    
    @Service
    public class SysBalanceServiceImpl implements SysBalanceService {
        @Autowired
        private SysBalanceDao sysBalanceDao;
    
        @Transactional(propagation = Propagation.REQUIRED)
        @Override
        public void updateBalance(BigDecimal amount) {
            SysBalance balance = sysBalanceDao.selectByPrimaryKey(1);
            balance.setBalance(balance.getBalance().subtract(amount));
            sysBalanceDao.updateByPrimaryKeySelective(balance);
            balance = sysBalanceDao.selectByPrimaryKey(2);
            balance.setBalance(balance.getBalance().add(amount));
            sysBalanceDao.updateByPrimaryKeySelective(balance);
        }
    }
    
    @Service
    public class SysLogServiceImpl implements SysLogService {
    
        @Autowired
        private SysLogDao sysLogDao;
    
        @Transactional(propagation = Propagation.REQUIRED)
        @Override
        public void addLog(String userName, BigDecimal amount) {
            SysLog log = new SysLog();
            log.setCreateTime(new Date());
            log.setOperate("转账");
            log.setUserName(userName);
            sysLogDao.insertSelective(log);
        }
    }
    

    在transfer()方法中假如①或者②出抛异常,事务是否会回滚?我们分析一下

    假如在transfer方法里抛出Sql异常:

     @Transactional
        @Override
        public void transfer(BigDecimal amount) throws SQLException{
            sysBalanceService.updateBalance(amount);
            sysLogService.addLog("zhangsan",amount);
            if(true) {
                throw new SQLException("sql异常");
            }
        }
    

    事务还会不会回滚?答案是否,是不会回滚的。如果我们把抛出异常类型换成如下:

        @Transactional
        @Override
        public void transfer(BigDecimal amount) throws SQLException{
            sysBalanceService.updateBalance(amount);
            sysLogService.addLog("zhangsan",amount);
            //if(true) {throw new SQLException("sql异常");}
            if(true) {throw new RuntimeException("RunTime异常");}
        }
    

    事务还会不会回滚?答案是可以的。为什么抛出SQLException异常事务不会滚,而抛出RuntimeException事务回滚呢?我们跟进源码看下。

    1.@Transaction

    调试断点我们可以发现一个很关键的地方:


    image.png

    我们跟进invoke方法


    image.png

    继续跟进查看invokeWithinTransaction()

    image.png

    继续查看completeTransactionAfterThrowing

    image.png
    查看txInfo.transactionAttribute.rollbackOn(ex)
    @Override
        public boolean rollbackOn(Throwable ex) {
            return (ex instanceof RuntimeException || ex instanceof Error);
        }
    

    我们发现只有当抛出的异常是RuntimeException 和Error类型或是其子类型的时候条件为true,也就会回滚。
    所以当我们抛出SqlException异常,事务是不会回滚的。
    我们回过头来继续看核心方法invokeWithinTransaction()

    @Nullable
        protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                final InvocationCallback invocation) throws Throwable {
    
            // If the transaction attribute is null, the method is non-transactional.
            TransactionAttributeSource tas = getTransactionAttributeSource();
            final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
            final PlatformTransactionManager tm = determineTransactionManager(txAttr);
            final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
            if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
                // Standard transaction demarcation with getTransaction and commit/rollback calls.
                //核心开启事务
                TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    
                Object retVal;
                try {
                    // This is an around advice: Invoke the next interceptor in the chain.
                    // This will normally result in a target object being invoked.
                    //执行我们自己的业务逻辑
                    retVal = invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    // target invocation exception
                    //如果执行的业务逻辑抛出异常,那么执行
                    completeTransactionAfterThrowing(txInfo, ex);
                    throw ex;
                }
                finally {
                    cleanupTransactionInfo(txInfo);
                }
                commitTransactionAfterReturning(txInfo);
                return retVal;
            }
    
            else {
                //自定义事务管理器
                Object result;
                final ThrowableHolder throwableHolder = new ThrowableHolder();
    
                // It's a CallbackPreferringPlatformTransactionManager: pass a Tran
                      ......
                }
    
    

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager))
    如果这个为true,则进入spring的事务管理,否则进入else(自定义事务管理器)
    自定义事务管理器后面再说。我们先看spring的事务管理器。

    spring的事务的核心方法之一,开启事务
    //核心:开启事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    这里有一个重要的方法

    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    
            // If no name specified, apply method identification as transaction name.
                  ......
            TransactionStatus status = null;
            if (txAttr != null) {
                if (tm != null) {
                    // 此处获取一个事务.
                    status = tm.getTransaction(txAttr);
                }
                ......
            }
            return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    
    @Override
        public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException {
    
            // Use defaults if no transaction definition given.
            TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
            // 获取事务,第一次获取不到事务,所有为null
            Object transaction = doGetTransaction();
            boolean debugEnabled = logger.isDebugEnabled();
            // 第一次事务为null,不会进入,但是第二次就会进入这里,这里就设计到事务的传播特性的处理了 
            if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                return handleExistingTransaction(def, transaction, debugEnabled);
            }
    
            // Check definition settings for new transaction.
            if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
            }
    
            // No existing transaction found -> check propagation behavior to find out how to proceed.
            if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            // 添加@Transcational,不做任何配置,传播属性默认为required,所以进入这里
            else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 挂起一个空事务
                SuspendedResourcesHolder suspendedResources = suspend(null);
                try {
            // 开启一个事务
                    return startTransaction(def, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
            }
            else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                    logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                            "isolation level will effectively be ignored: " + def);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
            }
        }
    

    startTransaction()

    /**
         * Start a new transaction.
         */
        private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //重点这里
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    

    dobegin

    @Override
        protected void doBegin(Object transaction, TransactionDefinition definition) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            Connection con = null;
    
            try {
                if (!txObject.hasConnectionHolder() ||
                        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                      //从datasource中拿到当前连接
                    Connection newCon = obtainDataSource().getConnection();
            // 并持有当前数据库连接
                    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }
      
                ......
    
                // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
                // so we don't want to do it unnecessarily (for example if we've explicitly
                // configured the connection pool to set it already)
    
    
                // 这里连接的自动提交设置为手动提交,也就是将con.setAutoCommit(false); 大家可以想想为什么?
                if (con.getAutoCommit()) {
                    txObject.setMustRestoreAutoCommit(true);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                    }
                    con.setAutoCommit(false);
                }
    
                ......
    
                // 这里判断当前的连接是不是新连接,如果是新连接,就把当前数据库连接绑定到当前线程
                if (txObject.isNewConnectionHolder()) {
                    TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
            }
    
            ......
        }
    

    我们也可以看看bindResource()方法

    public static void bindResource(Object key, Object value) throws IllegalStateException {
            Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
            Assert.notNull(value, "Value must not be null");
            Map<Object, Object> map = resources.get();
            // set ThreadLocal Map if none found
            if (map == null) {
                map = new HashMap<>();
                resources.set(map);
            }
              Object oldValue = map.put(actualKey, value);
            // Transparently suppress a ResourceHolder that was marked as void...
            if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
                oldValue = null;
            }
            if (oldValue != null) {
                throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                    actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
            }
            
        }
    

    可以看看resources的类型是什么,就是一个ThreadLocal。

    执行完doBegin()完成了一下功能:

    • DataSourceTransactionObject txObject,持有当前数据库的连接
    • 设置Connection的自动提交为手动提交
    • 设置当前持有的当前连接为active
    • 绑定当前数据库连接到本地线程变量
    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
    

    这个时候startTransaction返回的事务对象DefaultTransactionStatus status,就是持有doBegin执行完之后设置的事务属性,startTransaction返回的对象也就是DefaultTransactionStatus status

    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    
            ......
    
            TransactionStatus status = null;
            if (txAttr != null) {
                if (tm != null) {
                    //此时返回的status是DefaultTransactionStatus status,status里持有当前数据库连接,并且设置当前数据库连接为active
                    status = tm.getTransaction(txAttr);
                }
                else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                                "] because no transaction manager has been configured");
                    }
                }
            }
            //这里继续对返回的DefaultTransactionStatus status继续封装,返回TransactionInfo.
            return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
        }
    

    继续分析prepareTransactionInfo

    这里先说一下TransactionInfo对象结构:重点关注圈中的对象。


    image.png

    源码:这里主要做了两件事

        protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                @Nullable TransactionStatus status) {
    
            TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
            if (txAttr != null) {
                
                // 第一步:将持有当前数据库连接的DefaultTransactionStatus对象设置到TransactionInfo中
                txInfo.newTransactionStatus(status);
            }
            ......
    
            // We always bind the TransactionInfo to the thread, even if we didn't create
            // a new transaction here. This guarantees that the TransactionInfo stack
            // will be managed correctly even if no transaction was created by this aspect.
            
            //第二部:将新建的txInfo绑定到当前本地线程
            txInfo.bindToThread();
            return txInfo;
        }
    

    这个时候createTransactionIfNecessary方法中执行完毕,返回刚才的TransactionInfo txInfo。事务的提交与回滚都依赖此对象。

    一直往上返回,我们就可以确定createTransactionIfNecessary() 就是拿到了事务对象,拿到的事务是:
    ①将事务的自动提交设置为手动提交 ; ②持有当前数据库连接ThreadLocal,接着往下执行。

    image.png

    当执行transfer()方法中的第一个业务sysBalanceService.updateBalance(amount)的时候会第二次进入invokeWithinTransaction方法。
    第二次进入invokeWithinTransaction()方法,是因为在此方法会在service方法里调用其他带有@Transactional注解的方法时候,都会被调用。
    也会第二次进入createTransactionIfNecessary()-->getTransaction()->doGetTransaction()
    这个时候doGetTransaction()的返回值就有值了,为什么有我们可以看到:

    @Override
        protected Object doGetTransaction() {
            DataSourceTransactionObject txObject = new DataSourceTransactionObject();
            txObject.setSavepointAllowed(isNestedTransactionAllowed());
            ConnectionHolder conHolder =
                    (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    ...
            return txObject;
        }
    
    public static Object getResource(Object key) {
            ......
            Object value = doGetResource(actualKey);
            ...
            return value;
        }
    
    private static Object doGetResource(Object actualKey) {
            //第二次从ThreadLocal获取的map就有值了,有值直接返回,即还是当前数据库连接。
            Map<Object, Object> map = resources.get();
            if (map == null) {
                return null;
            }
            Object value = map.get(actualKey);
            ......
            return value;
        }
    

    doGetTransaction()返回DataSourceTransactionObject txObject,这一次返回的txObject还是持有当前数据库连接,但是当前持有的数据库连接设置成了不是新连接即newConnectionHolder=false。
    继续跟踪代码执行如下:

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException {
    
            // Use defaults if no transaction definition given.
            TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    
            Object transaction = doGetTransaction();
             //第二次就会进入if,执行handleExistingTransaction(),
            if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                return handleExistingTransaction(def, transaction, debugEnabled);
            }
            ......
    

    所以第二次会进入if,执行handleExistingTransaction(),而不会执行到下面的startTransaction去开启事务了。

    /**
         * Create a TransactionStatus for an existing transaction.
         */
        private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                        "Existing transaction found for transaction marked with propagation 'never'");
            }
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction");
                }
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(
                        definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            }
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction, creating new transaction with name [" +
                            definition.getName() + "]");
                }
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                    return startTransaction(definition, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error beginEx) {
                    resumeAfterBeginException(transaction, suspendedResources, beginEx);
                    throw beginEx;
                }
            }
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                if (!isNestedTransactionAllowed()) {
                    throw new NestedTransactionNotSupportedException(
                            "Transaction manager does not allow nested transactions by default - " +
                            "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                if (useSavepointForNestedTransaction()) {
                    // Create savepoint within existing Spring-managed transaction,
                    // through the SavepointManager API implemented by TransactionStatus.
                    // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                    DefaultTransactionStatus status =
                            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                    status.createAndHoldSavepoint();
                    return status;
                }
                else {
                    // Nested transaction through nested begin and commit/rollback calls.
                    // Usually only for JTA: Spring synchronization might get activated here
                    // in case of a pre-existing JTA transaction.
                    return startTransaction(definition, transaction, debugEnabled, null);
                }
            }
    
            // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
            if (debugEnabled) {
                logger.debug("Participating in existing transaction");
            }
            if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                (currentIsolationLevel != null ?
                                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                        "(unknown)"));
                    }
                }
                if (!definition.isReadOnly()) {
                    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] is not marked as read-only but existing transaction is");
                    }
                }
            }
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
        }
    

    可以看到这里是对已存在的事务的处理,主要是看@Transaction注解中propagation属性的配置的啥
    propagation属性值如下:而我们最常用的就是REQUIRED,也就默认值

    image.png

    这里中间的逻辑不会执行,知会执行最后两行代码

    //newSynchronization = true; 
     boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
     return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    

    这里我们需要记住newTransaction的值,也就是这里的第三个参数,目前值是false。这个参数很重要,关系到后面的事务是否提交。
    这次获取到的事务里面的newTransaction=false,继续往下执行

    加入我们执行我们自己的业务逻辑之后,没有问题,则会进入finally块
    cleanupTransactionInfo(txInfo);

    protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
            if (txInfo != null) {
                txInfo.restoreThreadLocalStatus();
            }
        }
    

    恢复ThreadLocal状态,每次调用新业务的时候,都会把上一次的事务信息保存在当前线程里。直至拿到最初的事务状态也就是newTransaction=true。最后会执行事务提交操作commitTransactionAfterReturning(txInfo);

    /**
         * Execute after successful completion of call, but not after an exception was handled.
         * Do nothing if we didn't create a transaction.
         * @param txInfo information about the current transaction
         */
        protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
            if (txInfo != null && txInfo.getTransactionStatus() != null) {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
        }
    
    @Override
        public final void commit(TransactionStatus status) throws TransactionException {
            ......
            processCommit(defStatus);
        }
    
    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
            try {
                
    
                try {
                    ......
    
                    if (status.hasSavepoint()) {
                        unexpectedRollback = status.isGlobalRollbackOnly();
                        status.releaseHeldSavepoint();
                    }
    //这里通过判断isNewTransaction是否为true,决定是否提交事务
                    else if (status.isNewTransaction()) {
                        if (status.isDebug()) {
                            logger.debug("Initiating transaction commit");
                        }
                        unexpectedRollback = status.isGlobalRollbackOnly();
                        doCommit(status);
                    }
                    ......
                catch (UnexpectedRollbackException ex) {
            }
            ......
        }
    

    至此事务大体执行流程分析完毕。

    2.Spring 事务传播属性

    对于事务传播特性也是主要分析获取事务的相关逻辑。这里就不在分析了。

    3.Spring 事务流程处理

    Spring 事务流程处理

    Spring事务流程图.jpg

    相关文章

      网友评论

          本文标题:Spring 源码分析之事务

          本文链接:https://www.haomeiwen.com/subject/yoobsktx.html