美文网首页
Spring事务事件控制,解决业务异步操作

Spring事务事件控制,解决业务异步操作

作者: 小波同学 | 来源:发表于2022-11-14 23:28 被阅读0次

背景

在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。

例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码,但是在用户保存时出现提交事务异常,数据库进行回滚,用户实际没有注册成功,但是用户却收到了激活码,此时,正确的是应该在用户注册保存事务提交完成之后,然后发送激活码。

解决方案

1、使用注解@TransactionalEventListener

@Service
@Transactional
public class FooService {
    @Autowired
    private  ApplicationEventPublisher applicationEventPublisher;
 
    public User saveUser(User user) {
        userDao.save(user);
        // 注册事件
        applicationEventPublisher.publishEvent(new SavedUserEvent(user.getId()));
    }
}
 
// -------------------------------------
/**
 * 保存用户事件
 */
public class SavedUserEvent {
    private int userId;
    
    public SavedUserEvent(int userId) {
        this.userId = userId;
    }
    
    // getter and setter
}
 
/**
 * 事件侦听,处理对应事件
 */
@Component
public class FooEventListener() {
    @Autowired
    private UserDao userDao;
    @Autowired
    private MailService mailService;
 
    @TransactionalEventListener
    public sendEmail(SavedUserEvent savedUserEvent) {
        User user = userDao.get(userId);
        String email = user.getEmail();
        mailService.send(email);
    }
}

publishEvent 底层调用了一个SimpleApplicationEventMulticaster 来发布事件,属性有一个Executor 可以用来设置异步的方式。


1.1 设置线程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class Config {
    @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);//核心线程数
        executor.setMaxPoolSize(10);//最大线程数
        executor.setQueueCapacity(100);//队列大小
        return  executor;
    }

    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(ThreadPoolTaskExecutor executor){
        SimpleApplicationEventMulticaster multicaste = new SimpleApplicationEventMulticaster();
         multicaste.setTaskExecutor(executor);
         return multicaste;
    }
}

1.2 发布事件后测试

  • 有2个线程来发布事件


2、使用TransactionSynchronizationManager 和 TransactionSynchronizationAdapter

@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
 
public User saveUser(User user) {
    // 保存用户
    userDao.save(user);
    final int userId = user.getId();
 
    // 兼容无论是否有事务
    if(TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                jmsProducer.sendEmail(userId);
            }
        });
    } else {
        jmsProducer.sendEmail(userId);
    }
}

如果saveUser()和sendEmail()这两个方法使用了相同的事务,但是需要注意的是sendEmail()方法是在afterCommit事务提交之后执行的,此时会导致sendEmail()中的JPA数据保存最终无法提交。所以我们需要使sendEmail()进入一个新的事务中。

如果afterCommit()方法中执行的方法也包含事务,在该方法的@Transactional注解中使用propagation参数用来控制事务的传播,其默认被设置为Propagation.REQUIRED

2.1 解决办法

在@Transactional注解中propagation参数用来控制事务的传播。其默认被设置为Propagation.REQUIRED

Propagation.REQUIRED其逻辑是,如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。而上面的业务中我们并不希望其加入已有的事务中,所以单介绍上面的逻辑,假如希望JPA的数据保存到数据库中,需要在事务注解修改为@Transactional(propagation = Propagation.REQUIRES_NEW)参数

然而在很多时候我们希望新加入的方法能够被同一个事务所管理,而使用Propagation.REQUIRES_NEW会导致当前操作脱离上一级事务的控制。所以在使用@Transactional(propagation = Propagation.REQUIRES_NEW)的时候一定要慎重,并且严格控制其被滥用。

2.2 存在的问题

TransactionSynchronizationAdapter的事务提交afterCommit方法后执行,虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。

让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。

  • 1、构建注解
@Target(ElementType.METHOD) 
@Retention(RetentionPolicy.RUNTIME) 
public @interface PostCommit { 
}
  • 2、构建 PostCommitAdapter

PostCommitAdapter 将提供两个功能

  • 它将注册runnables到一个ThreadLocal
  • 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
@Slf4j 
@Component 
public class PostCommitAdapter extends TransactionSynchronizationAdapter { 
    private static final ThreadLocal<List<Runnable>> RUNNABLE = new ThreadLocal<>(); 

    // 为提交后执行注册一个新的 runnable
     public void execute(Runnable runnable) { 
        if (TransactionSynchronizationManager.isSynchronizationActive()) { 
            List<Runnable> runnables = RUNNABLE.get(); 
            if (runnables == null) { 
                runnables = new ArrayList<>(); 
                RUNNABLE.set(runnables); 
                TransactionSynchronizationManager.registerSynchronization(this); 
            }
            return; 
        }
        // 如果事务同步未激活
        runnable.run(); 
    } 

    @Override 
    public void afterCommit() { 
        List<Runnable> runnables = RUNNABLE.get(); 
        runnables.forEach(Runnable::run); 
    } 

    @Override 
    public void afterCompletion(int status) { 
        RUNNABLE.remove(); 
    } 
}

如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables。

  • 3、使用 around 建议连接适配器和注释
    为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect
@Slf4j
@AllArgsConstructor
@Configuration
public class PostCommitAnnotationAspect {
    private final PostCommitAdapter postCommitAdapter;

    @Pointcut("@annotation(com...<package>..PostCommit)")
    private void postCommitPointCut(){}


    @Around("postCommitPointCut()")
    public Object aroundAdvice(final ProceedingJoinPoint pjp) {
        postCommitAdapter.execute(new PjpAfterCommitRunnable(pjp));
        return null;
    }

    private static final class PjpAfterCommitRunnable implements Runnable {
        private final ProceedingJoinPoint pjp;

        public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }

        @Override
        public void run() {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }
    }
}
  • 4、用法
    一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。
    示例:考虑具有 PostCommit 注释方法的两个类 A 和 B
Class A {
   @PostCommit
   void log(){
      log.info("log from class A")
   }
}
Class B {
   @PostCommit
   void log(){
      log.info("log from class B")
   }
}

一个驱动类调用这些方法:

Class mainClass {
      @Transactional
      void transactionalMethod(Entity entity){
          someOperation(entity)
          log.info("inside transaction");
          a.log();
          b.log();
          save(entity);
          log.info("end of method");
      }
}

执行后输出:

> inside transaction
> ** saving entity
> log from class A
> log from Class B

参考:
https://blog.csdn.net/weixin_35973945/article/details/115067904

https://blog.csdn.net/qq330983778/article/details/112255441

https://www.cnblogs.com/mangoubiubiu/p/16366241.html

https://www.jdon.com/57371

相关文章

网友评论

      本文标题:Spring事务事件控制,解决业务异步操作

      本文链接:https://www.haomeiwen.com/subject/olvrxdtx.html