美文网首页简友广场Java
Spring主事务中使用“事件发布”/"异步"等具有异步含义的操

Spring主事务中使用“事件发布”/"异步"等具有异步含义的操

作者: 西安法律咨询服务平台与程序员 | 来源:发表于2021-07-13 18:11 被阅读0次

问题说明

演示代码如下:

@Service
public class PersonService {
    private final PersonRepository personRepository;
    private final ApplicationEventPublisher applicationEventPublisher;

    public PersonService(PersonRepository personRepository, ApplicationEventPublisher applicationEventPublisher) {
        this.personRepository = personRepository;
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Transactional
    public void save(Person person){
        personRepository.save(person);  // ①
        applicationEventPublisher.publishEvent(new SendEmailEvent(person.getId()));  // ②
    }
}
@Component
public class SendEmailEventListener {
    private final PersonRepository personRepository;

    public SendEmailEventListener(PersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @EventListener
    @Transactional
    public void handle(SendEmailEvent emailEvent){
        Person person = personRepository.findById(emailEvent.getPersonId());  // ③
        //...send email
    }
}

标题中描述的问题是,③处根据Id去数据库中读取person时,有可能①出的save 的事务还未提交。
上面的的演示代码使用Spring的事件发布监听机制来举例说明异步操作可能存在的问题,严格上讲,在具有异步含义所有操作中,存在数据依赖关系的事务,很有可能存在上述问题。在Java Web中具有异步含义的操作有,Spring的时间发布&监听机制(ApplicationEventPublisher、ApplicationEvent、@EventListener)、异步处理(@Async)、MQ等。

解决方案

通过语义分析,只要这些异步操作在该事务成功提交后才执行,回滚则不执行即可。

  • 使用TransactionSynchronizationManager.registerSynchronization
@Component
public class SendEmailEventListener {
    private final PersonRepository personRepository;

    public SendEmailEventListener(PersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @EventListener
    @Transactional
    public void handle(SendEmailEvent emailEvent){
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                personRepository.findById(emailEvent.getPersonId());  // ④
                //...send email
            }
        });
    }
}

这段代码使,事务提交后,才执行④处的逻辑。
-- @TransactionEventListener的方式
在Spring4.2+,有一种叫做@TransactionEventListener的方式,能够 控制 在事务的时候Event事件的处理方式。

@Component
public class SendEmailEventListener {
    private final PersonRepository personRepository;

    public SendEmailEventListener(PersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @TransactionalEventListener
    @Transactional
    public void handle(SendEmailEvent emailEvent){
        personRepository.findById(emailEvent.getPersonId());
        //...send email
    }
}

对于其他具有异步语义的操作,可以采用TransactionSynchronizationManager.registerSynchronization的方式来处理。

分析

我们分析一下,TransactionSynchronizationManager.registerSynchronization做的事情:

    public static void registerSynchronization(TransactionSynchronization synchronization)
            throws IllegalStateException {

        Assert.notNull(synchronization, "TransactionSynchronization must not be null");
        Set<TransactionSynchronization> synchs = synchronizations.get();
        if (synchs == null) {
            throw new IllegalStateException("Transaction synchronization is not active");
        }
        synchs.add(synchronization);
    }

主要是将我们创建的TransactionSynchronization添加到Set中。那这个set何时会被使用呢?

  • 我们从TransactionInterceptor的invoke开始分析
public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }

-- TransactionInterceptor中invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                    try {
                        return invocation.proceedWithInvocation();
                    }
                    catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            // A RuntimeException: will lead to a rollback.
                            if (ex instanceof RuntimeException) {
                                throw (RuntimeException) ex;
                            }
                            else {
                                throw new ThrowableHolderException(ex);
                            }
                        }
                        else {
                            // A normal return value: will lead to a commit.
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    }
                    finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });

                // Check result state: It might indicate a Throwable to rethrow.
                if (throwableHolder.throwable != null) {
                    throw throwableHolder.throwable;
                }
                return result;
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
            catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            }
            catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }
        }
    }

-- TransactionInterceptor中commitTransactionAfterReturning

    protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
    }

-- AbstractPlatformTransactionManager中commit

    public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus, false);
            return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            processRollback(defStatus, true);
            return;
        }

        processCommit(defStatus);
    }
  • AbstractPlatformTransactionManager中processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    doCommit(status);
                }
                else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }

                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException | Error ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }
  • AbstractPlatformTransactionManager中triggerAfterCommit
    private void triggerAfterCommit(DefaultTransactionStatus status) {
        if (status.isNewSynchronization()) {
            if (status.isDebug()) {
                logger.trace("Triggering afterCommit synchronization");
            }
            TransactionSynchronizationUtils.triggerAfterCommit();
        }
    }

-TransactionSynchronizationUtils中triggerAfterCommit

    public static void triggerAfterCommit() {
        invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
    }
  • TransactionSynchronizationUtils中invokeAfterCommit
    public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
        if (synchronizations != null) {
            for (TransactionSynchronization synchronization : synchronizations) {
                synchronization.afterCommit();
            }
        }
    }

从TransactionSynchronizationUtils中invokeAfterCommit可以看到,TransactionSynchronizationManager.registerSynchronization注册的TransactionSynchronization。

相关文章

网友评论

    本文标题:Spring主事务中使用“事件发布”/"异步"等具有异步含义的操

    本文链接:https://www.haomeiwen.com/subject/ynrspltx.html