问题说明
演示代码如下:
@Service
public class PersonService {
private final PersonRepository personRepository;
private final ApplicationEventPublisher applicationEventPublisher;
public PersonService(PersonRepository personRepository, ApplicationEventPublisher applicationEventPublisher) {
this.personRepository = personRepository;
this.applicationEventPublisher = applicationEventPublisher;
}
@Transactional
public void save(Person person){
personRepository.save(person); // ①
applicationEventPublisher.publishEvent(new SendEmailEvent(person.getId())); // ②
}
}
@Component
public class SendEmailEventListener {
private final PersonRepository personRepository;
public SendEmailEventListener(PersonRepository personRepository) {
this.personRepository = personRepository;
}
@EventListener
@Transactional
public void handle(SendEmailEvent emailEvent){
Person person = personRepository.findById(emailEvent.getPersonId()); // ③
//...send email
}
}
标题中描述的问题是,③处根据Id去数据库中读取person时,有可能①出的save 的事务还未提交。
上面的的演示代码使用Spring的事件发布监听机制来举例说明异步操作可能存在的问题,严格上讲,在具有异步含义所有操作中,存在数据依赖关系的事务,很有可能存在上述问题。在Java Web中具有异步含义的操作有,Spring的时间发布&监听机制(ApplicationEventPublisher、ApplicationEvent、@EventListener)、异步处理(@Async)、MQ等。
解决方案
通过语义分析,只要这些异步操作在该事务成功提交后才执行,回滚则不执行即可。
- 使用TransactionSynchronizationManager.registerSynchronization
@Component
public class SendEmailEventListener {
private final PersonRepository personRepository;
public SendEmailEventListener(PersonRepository personRepository) {
this.personRepository = personRepository;
}
@EventListener
@Transactional
public void handle(SendEmailEvent emailEvent){
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
personRepository.findById(emailEvent.getPersonId()); // ④
//...send email
}
});
}
}
这段代码使,事务提交后,才执行④处的逻辑。
-- @TransactionEventListener的方式
在Spring4.2+,有一种叫做@TransactionEventListener的方式,能够 控制 在事务的时候Event事件的处理方式。
@Component
public class SendEmailEventListener {
private final PersonRepository personRepository;
public SendEmailEventListener(PersonRepository personRepository) {
this.personRepository = personRepository;
}
@TransactionalEventListener
@Transactional
public void handle(SendEmailEvent emailEvent){
personRepository.findById(emailEvent.getPersonId());
//...send email
}
}
对于其他具有异步语义的操作,可以采用TransactionSynchronizationManager.registerSynchronization的方式来处理。
分析
我们分析一下,TransactionSynchronizationManager.registerSynchronization做的事情:
public static void registerSynchronization(TransactionSynchronization synchronization)
throws IllegalStateException {
Assert.notNull(synchronization, "TransactionSynchronization must not be null");
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
主要是将我们创建的TransactionSynchronization添加到Set中。那这个set何时会被使用呢?
- 我们从TransactionInterceptor的invoke开始分析
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
-- TransactionInterceptor中invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
-- TransactionInterceptor中commitTransactionAfterReturning
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
-- AbstractPlatformTransactionManager中commit
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
- AbstractPlatformTransactionManager中processCommit
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
- AbstractPlatformTransactionManager中triggerAfterCommit
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering afterCommit synchronization");
}
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
-TransactionSynchronizationUtils中triggerAfterCommit
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
- TransactionSynchronizationUtils中invokeAfterCommit
public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
从TransactionSynchronizationUtils中invokeAfterCommit可以看到,TransactionSynchronizationManager.registerSynchronization注册的TransactionSynchronization。
网友评论