业务场景:
在一个事务操作中,当数据入库之后,继续做其他异步或同步操作,如消息通知、远程接口调用等。
存在的问题:
- 事务原子性不能保证:如果出现事务回滚,则数据入库失败,然而异步操作却不能回滚,继续执行,这就会出现与业务预期不一致的结果(如数据入库失败,但是消息通知则照常触发);
- 数据正确性无法保证:如果异步操作需要反查数据库上一步入库的结果,而上一步的事务由于数据库压力或IO等原因导致事务提交延迟,这时异步操作去数据库里查询数据就会失败;
解决方案:
这就要求我们保证事务的原子性,数据库入库失败,那异步操作也不能执行;另外还要保证在异步操作执行前事务一定要是已提交状态。
- 使用TransactionSynchronizationManager保证异步操作只在事务正确commit之后才执行。这也是个人较为推荐的一种做法。
定义一个组件,该组件首先检查当前上下文是否开启事务,如果存在事务,当事务正确commit后,执行由调用者传过来的异步执行逻辑。
/**
* 事务提交后的处理器,action执行严格依赖调用方的事务提交
* 如果调用方没有事务,不执行;
* 如果调用方事务回滚,不执行;
* action异常不影响调用方事务提交;
*
* @Author tz
* @Date 2020/12/18 14:12
* @Version 1.0
*/
@Component
public class TransactionCommitHandler {
public void handle(Runnable action){
if (TransactionSynchronizationManager.isActualTransactionActive()){
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
//具体的异步操作
action.run();
}
});
}
}
}
调用方:
/**
* 保存工单及发送消息
*/
@Transactional(isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void submitOrder(Integer ottStatus, Integer mppStatus, AssetPlaylistDto newPl) throws Exception {
//... 其它操作
AssetPlaylistOrders order = this.constructOrder(ottStatus, mppStatus, newPl);
//保存工单
this.save(order);
transactionCommitHandler.handle(() -> {
//发送复审mq
rabbitMqService.sendOrderMsg(order.getOrderId(), order.getPlId(), AssetTypeEnum.AUTO_PLAY);
});
}
- Spring4.2之后,可以使用TransactionalEventListener监听事务提交,并在调用方发送event。这种方式需要维护过多的事件及事件处理器,可维护性较差,相对而言上面第一种方案的函数式输入会简单一点。
业务实现:
@Service
@Slf4j
public class UserService extends ServiceImpl<UserDao, User> {
@Autowired
ApplicationEventPublisher eventPublisher;
@Transactional
public void add(User user){
super.save(user);
eventPublisher.publishEvent(new UserAddEvent(user.getId()));
}
}
自定义事件:
@Data
public class UserAddEvent extends ApplicationEvent {
private Integer userId;
public UserAddEvent(Integer userId) {
this.userId = userId;
}
}
监听器实现:
@Slf4j
@Component
public class UserListener {
@Autowired
EmailService emailService;
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserAddEvent.class)
public void onUserAddEvent(UserAddEvent event) {
emailService.sendEmail(event.getUserId());
}
}
还有一些其它个人觉得不太靠谱的方式,如去掉整个事务,甚至手动延迟异步操作,这些不论是从事务的ACID上,还是软件鲁棒性上来讲,都不是很好的解决方案。
网友评论