美文网首页
聊聊spring事务的REQUIRES_NEW

聊聊spring事务的REQUIRES_NEW

作者: go4it | 来源:发表于2023-09-13 20:55 被阅读0次

    本文主要研究一下spring事务的REQUIRES_NEW

    TransactionDefinition

    org/springframework/transaction/TransactionDefinition.java

        /**
         * Create a new transaction, suspending the current transaction if one exists.
         * Analogous to the EJB transaction attribute of the same name.
         * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
         * on all transaction managers. This in particular applies to
         * {@link org.springframework.transaction.jta.JtaTransactionManager},
         * which requires the {@code javax.transaction.TransactionManager} to be
         * made available it to it (which is server-specific in standard Java EE).
         * <p>A {@code PROPAGATION_REQUIRES_NEW} scope always defines its own
         * transaction synchronizations. Existing synchronizations will be suspended
         * and resumed appropriately.
         * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
         */
        int PROPAGATION_REQUIRES_NEW = 3;
    

    PROPAGATION_REQUIRES_NEW在有事务的场景下会suspend当前事务,然后创建新事务

    AbstractPlatformTransactionManager

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction, creating new transaction with name [" +
                            definition.getName() + "]");
                }
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException | Error beginEx) {
                    resumeAfterBeginException(transaction, suspendedResources, beginEx);
                    throw beginEx;
                }
            }
    

    handleExistingTransaction方法在判断是PROPAGATION_REQUIRES_NEW,会执行suspend方法,然后newTransactionStatus,执行doBegin及prepareSynchronization

    suspend

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

        /**
         * Suspend the given transaction. Suspends transaction synchronization first,
         * then delegates to the {@code doSuspend} template method.
         * @param transaction the current transaction object
         * (or {@code null} to just suspend active synchronizations, if any)
         * @return an object that holds suspended resources
         * (or {@code null} if neither transaction nor synchronization active)
         * @see #doSuspend
         * @see #resume
         */
        @Nullable
        protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
            if (TransactionSynchronizationManager.isSynchronizationActive()) {
                List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
                try {
                    Object suspendedResources = null;
                    if (transaction != null) {
                        suspendedResources = doSuspend(transaction);
                    }
                    String name = TransactionSynchronizationManager.getCurrentTransactionName();
                    TransactionSynchronizationManager.setCurrentTransactionName(null);
                    boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                    TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                    Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                    boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                    TransactionSynchronizationManager.setActualTransactionActive(false);
                    return new SuspendedResourcesHolder(
                            suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
                }
                catch (RuntimeException | Error ex) {
                    // doSuspend failed - original transaction is still active...
                    doResumeSynchronization(suspendedSynchronizations);
                    throw ex;
                }
            }
            else if (transaction != null) {
                // Transaction active but no synchronization active.
                Object suspendedResources = doSuspend(transaction);
                return new SuspendedResourcesHolder(suspendedResources);
            }
            else {
                // Neither transaction nor synchronization active.
                return null;
            }
        }
    

    suspend方法主要是执行doSuspendSynchronization方法返回suspendedSynchronizations,执行doSuspend返回suspendedResources,最后根据这两个创建SuspendedResourcesHolder

    doSuspendSynchronization

        /**
         * Suspend all current synchronizations and deactivate transaction
         * synchronization for the current thread.
         * @return the List of suspended TransactionSynchronization objects
         */
        private List<TransactionSynchronization> doSuspendSynchronization() {
            List<TransactionSynchronization> suspendedSynchronizations =
                    TransactionSynchronizationManager.getSynchronizations();
            for (TransactionSynchronization synchronization : suspendedSynchronizations) {
                synchronization.suspend();
            }
            TransactionSynchronizationManager.clearSynchronization();
            return suspendedSynchronizations;
        }
    

    doSuspendSynchronization这个遍历suspendedSynchronizations,挨个执行suspend,然后clearSynchronization

    ResourceHolderSynchronization

    org/springframework/transaction/support/ResourceHolderSynchronization.java

        public void suspend() {
            if (this.holderActive) {
                TransactionSynchronizationManager.unbindResource(this.resourceKey);
            }
        }
    

    ResourceHolderSynchronization的suspend执行的是TransactionSynchronizationManager.unbindResource

    unbindResource

    org/springframework/transaction/support/TransactionSynchronizationManager.java

        /**
         * Unbind a resource for the given key from the current thread.
         * @param key the key to unbind (usually the resource factory)
         * @return the previously bound value (usually the active resource object)
         * @throws IllegalStateException if there is no value bound to the thread
         * @see ResourceTransactionManager#getResourceFactory()
         */
        public static Object unbindResource(Object key) throws IllegalStateException {
            Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
            Object value = doUnbindResource(actualKey);
            if (value == null) {
                throw new IllegalStateException(
                        "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
            }
            return value;
        }
    
        /**
         * Actually remove the value of the resource that is bound for the given key.
         */
        @Nullable
        private static Object doUnbindResource(Object actualKey) {
            Map<Object, Object> map = resources.get();
            if (map == null) {
                return null;
            }
            Object value = map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            // Transparently suppress a ResourceHolder that was marked as void...
            if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
                value = null;
            }
            if (value != null && logger.isTraceEnabled()) {
                logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
                        Thread.currentThread().getName() + "]");
            }
            return value;
        }
    

    unbindResource主要是执行doUnbindResource,从resources中移除

    cleanupAfterCompletion

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

        /**
         * Clean up after completion, clearing synchronization if necessary,
         * and invoking doCleanupAfterCompletion.
         * @param status object representing the transaction
         * @see #doCleanupAfterCompletion
         */
        private void cleanupAfterCompletion(DefaultTransactionStatus status) {
            status.setCompleted();
            if (status.isNewSynchronization()) {
                TransactionSynchronizationManager.clear();
            }
            if (status.isNewTransaction()) {
                doCleanupAfterCompletion(status.getTransaction());
            }
            if (status.getSuspendedResources() != null) {
                if (status.isDebug()) {
                    logger.debug("Resuming suspended transaction after completion of inner transaction");
                }
                Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
                resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
            }
        }
    

    在内嵌事务执行完之后,会判断是否有suspendedResources,如果有则执行resume,恢复之前suspend的事务

    resume

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

        /**
         * Resume the given transaction. Delegates to the {@code doResume}
         * template method first, then resuming transaction synchronization.
         * @param transaction the current transaction object
         * @param resourcesHolder the object that holds suspended resources,
         * as returned by {@code suspend} (or {@code null} to just
         * resume synchronizations, if any)
         * @see #doResume
         * @see #suspend
         */
        protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
                throws TransactionException {
    
            if (resourcesHolder != null) {
                Object suspendedResources = resourcesHolder.suspendedResources;
                if (suspendedResources != null) {
                    doResume(transaction, suspendedResources);
                }
                List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
                if (suspendedSynchronizations != null) {
                    TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                    TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                    TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                    TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                    doResumeSynchronization(suspendedSynchronizations);
                }
            }
        }
    

    resume方法执行doResume,然后恢复之前的TransactionSynchronizationManager的一些设置

    doResume

    org/springframework/orm/jpa/JpaTransactionManager.java

        @Override
        protected void doResume(@Nullable Object transaction, Object suspendedResources) {
            SuspendedResourcesHolder resourcesHolder = (SuspendedResourcesHolder) suspendedResources;
            TransactionSynchronizationManager.bindResource(
                    obtainEntityManagerFactory(), resourcesHolder.getEntityManagerHolder());
            if (getDataSource() != null && resourcesHolder.getConnectionHolder() != null) {
                TransactionSynchronizationManager.bindResource(getDataSource(), resourcesHolder.getConnectionHolder());
            }
        }
    

    doResume这里就是给bind回来

    小结

    spring事务的REQUIRES_NEW传播级别的实现就是对当前事务进行suspend,底层是unbind,然后创建新事务,执行完毕判断是否有suspend的事务,有则执行resume,底层是bind。具体对于mysql来讲,它不感知这些嵌套事务,它先接收到的是内嵌的新事务的sql,然后提交,最后接收到了外层resume回来的事务。

    相关文章

      网友评论

          本文标题:聊聊spring事务的REQUIRES_NEW

          本文链接:https://www.haomeiwen.com/subject/adipvdtx.html