背景
在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。
例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码,但是在用户保存时出现提交事务异常,数据库进行回滚,用户实际没有注册成功,但是用户却收到了激活码,此时,正确的是应该在用户注册保存事务提交完成之后,然后发送激活码。
解决方案
1、使用注解@TransactionalEventListener
@Service
@Transactional
public class FooService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public User saveUser(User user) {
userDao.save(user);
// 注册事件
applicationEventPublisher.publishEvent(new SavedUserEvent(user.getId()));
}
}
// -------------------------------------
/**
* 保存用户事件
*/
public class SavedUserEvent {
private int userId;
public SavedUserEvent(int userId) {
this.userId = userId;
}
// getter and setter
}
/**
* 事件侦听,处理对应事件
*/
@Component
public class FooEventListener() {
@Autowired
private UserDao userDao;
@Autowired
private MailService mailService;
@TransactionalEventListener
public sendEmail(SavedUserEvent savedUserEvent) {
User user = userDao.get(userId);
String email = user.getEmail();
mailService.send(email);
}
}
publishEvent 底层调用了一个SimpleApplicationEventMulticaster 来发布事件,属性有一个Executor 可以用来设置异步的方式。
1.1 设置线程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class Config {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);//核心线程数
executor.setMaxPoolSize(10);//最大线程数
executor.setQueueCapacity(100);//队列大小
return executor;
}
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(ThreadPoolTaskExecutor executor){
SimpleApplicationEventMulticaster multicaste = new SimpleApplicationEventMulticaster();
multicaste.setTaskExecutor(executor);
return multicaste;
}
}
1.2 发布事件后测试
-
有2个线程来发布事件
2、使用TransactionSynchronizationManager 和 TransactionSynchronizationAdapter
@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
public User saveUser(User user) {
// 保存用户
userDao.save(user);
final int userId = user.getId();
// 兼容无论是否有事务
if(TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
jmsProducer.sendEmail(userId);
}
});
} else {
jmsProducer.sendEmail(userId);
}
}
如果saveUser()和sendEmail()这两个方法使用了相同的事务,但是需要注意的是sendEmail()方法是在afterCommit事务提交之后执行的,此时会导致sendEmail()中的JPA数据保存最终无法提交。所以我们需要使sendEmail()进入一个新的事务中。
如果afterCommit()方法中执行的方法也包含事务,在该方法的@Transactional注解中使用propagation参数用来控制事务的传播,其默认被设置为Propagation.REQUIRED
2.1 解决办法
在@Transactional注解中propagation参数用来控制事务的传播。其默认被设置为Propagation.REQUIRED
Propagation.REQUIRED其逻辑是,如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。而上面的业务中我们并不希望其加入已有的事务中,所以单介绍上面的逻辑,假如希望JPA的数据保存到数据库中,需要在事务注解修改为@Transactional(propagation = Propagation.REQUIRES_NEW)参数
然而在很多时候我们希望新加入的方法能够被同一个事务所管理,而使用Propagation.REQUIRES_NEW会导致当前操作脱离上一级事务的控制。所以在使用@Transactional(propagation = Propagation.REQUIRES_NEW)的时候一定要慎重,并且严格控制其被滥用。
2.2 存在的问题
TransactionSynchronizationAdapter的事务提交afterCommit方法后执行,虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。
让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。
- 1、构建注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PostCommit {
}
- 2、构建 PostCommitAdapter
PostCommitAdapter 将提供两个功能
- 它将注册runnables到一个ThreadLocal
- 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
@Slf4j
@Component
public class PostCommitAdapter extends TransactionSynchronizationAdapter {
private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>();
// 为提交后执行注册一个新的 runnable
public void execute(Runnable runnable) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<Runnable> runnables = RUNNABLE.get();
if (runnables == null) {
runnables = new ArrayList<>();
RUNNABLE.set(runnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
return;
}
// 如果事务同步未激活
runnable.run();
}
@Override
public void afterCommit() {
List<Runnable> runnables = RUNNABLE.get();
runnables.forEach(Runnable::run);
}
@Override
public void afterCompletion(int status) {
RUNNABLE.remove();
}
}
如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables。
- 3、使用 around 建议连接适配器和注释
为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect
@Slf4j
@AllArgsConstructor
@Configuration
public class PostCommitAnnotationAspect {
private final PostCommitAdapter postCommitAdapter;
@Pointcut("@annotation(com...<package>..PostCommit)")
private void postCommitPointCut(){}
@Around("postCommitPointCut()")
public Object aroundAdvice(final ProceedingJoinPoint pjp) {
postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp));
return null;
}
private static final class PjpAfterCommitRunnable implements Runnable {
private final ProceedingJoinPoint pjp;
public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
this.pjp = pjp;
}
@Override
public void run() {
try {
pjp.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
- 4、用法
一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。
示例:考虑具有 PostCommit 注释方法的两个类 A 和 B
Class A {
@PostCommit
void log(){
log.info("log from class A")
}
}
Class B {
@PostCommit
void log(){
log.info("log from class B")
}
}
一个驱动类调用这些方法:
Class mainClass {
@Transactional
void transactionalMethod(Entity entity){
someOperation(entity)
log.info("inside transaction");
a.log();
b.log();
save(entity);
log.info("end of method");
}
}
执行后输出:
> inside transaction
> ** saving entity
> log from class A
> log from Class B
参考:
https://blog.csdn.net/weixin_35973945/article/details/115067904
https://blog.csdn.net/qq330983778/article/details/112255441
网友评论