美文网首页java面试
事务service调用异步线程bug

事务service调用异步线程bug

作者: Raral | 来源:发表于2022-07-20 18:51 被阅读0次

    事务service调用异步线程bug

    当一个service更新一条数据,但是在异步方法里,查询数据时候,不是最新的数据的???

    示例(普通开启线程-当前线程有睡眠):

        
    
        @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
                new Thread(() -> {
                    GoodsPO byId = this.getById("111");
                    //这个时候查询是没有提交事务的数据(也就是更新前的数据)
                    log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
                }).start();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("【==当前线程事务提交==】");
            }
    
        }
    
    
     //打印
       /**
       【==当前线程事务开始==】
       【测试goodspo】 未提交事务的数据
        【==当前线程事务提交==】
       */
    

    示例(普通开启线程-当前线程没有睡眠):

        
    
        @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
                new Thread(() -> {
                    GoodsPO byId = this.getById("111");
                    //这个时候查询是提交事务的数据(也就是更新后的数据)
                    log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
                }).start();
                log.info("【==当前线程事务提交==】");
            }
    
        }
    
    
     //打印
       /**
       【==当前线程事务开始==】
        【==当前线程事务提交==】
        【测试goodspo】 已经提交事务的数据(更新后的数据)
       */
    

    示例(springboot线程池@EnableAsync-@Async("executor")-):

    
        @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
                //异步线程池方法
                taskExecutor.asyncTest("111");
                log.info("【==当前线程事务提交==】");
            }
    
        }
    
    
     @Async("executor")
        public void asyncTest(String s) {
            GoodsPO byId = goodsMapper.selectById(s);
            log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
        }
    
    
     //打印
       /**
        【==当前线程事务开始==】
        【==当前线程事务提交==】
        【测试goodspo】 (本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)
       */
    
    

    (本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)

    解决上面问题

    1. 在当前异步方法,添加休眠时间
    2. 在service调取异步方法时,直接查询好数据,传给异步方法
    @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
                //异步线程池方法
                taskExecutor.asyncTest("111");
                log.info("【==当前线程事务提交==】");
            }
    
        }
    
    @Async("executor")
        public void asyncTest(String s) {
    
            try {
                long start = System.currentTimeMillis();
                log.info("线程开始休眠start{}",start);
                Thread.sleep(1000);
                log.info("线程结束休眠end{}",System.currentTimeMillis() - start);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            GoodsPO byId = goodsMapper.selectById(s);
            log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
        }
    
    @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
                //异步线程池方法
                taskExecutor.asyncTest("111");
                log.info("【==当前线程事务提交==】");
            }
    
        }
    
    @Async("executor")
        public void asyncTest(GoodsPO goodsPO) {
            log.info("【测试goodspo】{}", JacksonUtils.obj2json(goodsPO));
        }
    

    注意:异步方法里的异常,不会影响外面事务的

    进阶解决@Transactional 事务提交之后执行 @Async 修饰的异步方法
    最近项目中遇到的问题:
    俩个service方法, 方法A中调用方法B。
    方法A核心业务涉及多张表的数据操作,事务采用注解:@Transactional(rollbackFor = Exception.class)。
    方法B 比较耗时,为了不影响核心业务,方法B 用@Async注解,单独开启一个线程去异步执行。(方法B在另外一个类里边,不能和A在同一个类)。
    出现的问题:
    方法A的事务还没提交,方法B就执行了,导致方法B中查到的数据还是老数据。
    当时想到的解决方案,方法A事务提交后再执行方法B

    • 问题代码:
    
    class A {
     
        @Autowired
        private B b;
        
        @Transactional
        public void updateA(..) {
            insert(..);
            update(..);
            b.updateB(..);
        }
     
    }
        
     
    class B {
     
        @Async
        public void updateB(..) {
            update(..)
        }
     
    
    
    • 注意
      方法A和方法B假如在同一个类中,则方法B的@Async注解会失效:
      首先先解释下@Transactional注解失效原因:Spring在扫描bean的时候会扫描方法是否包含@Transactional注解,如果包含,spring会为这个bean动态生成一个子类(代理类proxy),代理类是继承原来的bean的。
      此时,当前这个注解的方法被调用时候,实际上是由代理类调用的,代理类在调用之前就会启动Transaction事务。然而,如果这个方法被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来bean直接调用,所以就不会启动Transaction,我们看到的现象就是 @Transactional 注解无效。
      @Transactional和@Async注解实现都是基于spirng的Aop,而AOP实现是基于动态代理实现的,故Async 失效的原理和原理是一样的。(都是因为同一个类其他方法调用时,没有通过代理类调用,所以注解失效)

    • 代码

     @Resource
        private TaskExecutor taskExecutor;
    
    
        @Transactional
        @Override
        public void test() {
            log.info("【==当前线程事务开始==】");
    
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
            if(update) {
    
                TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                    @Override
                    public void beforeCommit(boolean readOnly) {
                        log.info("【==当前事务提交前==】");
                    }
    
                    @Override
                    public void afterCommit() {
                        log.info("【==当前事务提交后==】");
                        //异步线程池方法
                        taskExecutor.asyncTest("111");
                    }
                });
    
                log.info("【==当前线程事务提交完成==】");
            }
    
        }
    
        @Async("executor")
        public void asyncTest(String s) {
    
            GoodsPO byId = goodsMapper.selectById(s);
            log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
        }
    
    
    
        //打印
        /**
            【==当前线程事务开始==】
             【==当前线程事务提交完成==】
             【==当前事务提交前==】
             【==当前事务提交后==】
             【测试goodspo】 (事务提交后的数据)
             
        */
    

    进阶解决@Transactional 事务提交之后执行 异步方法时,获取不到事务提交后的数据(@TransactionalEventListener注解)
    Spring事务监听机制—使用@TransactionalEventListener处理数据库事务提交成功后再执行操作

    • 为什么使用  
        在项目中,往往需要执行数据库操作后,发送消息或事件来异步调** * 用其他组件执行相应的操作,例如:
        用户注册后发送激活码;
        配置修改后发送更新事件等。
        但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。
    • 为了解决上述问题,Spring为我们提供了两种方式:
        (1) @TransactionalEventListener注解(订阅发布设计模式)
        (2) 事务同步管理器TransactionSynchronizationManager(上述已介绍)
        以便我们可以在事务提交后再触发某一事件。
    • 代码
      定义事件
    /**
    事件
     * */
    public class AcctypeEvent extends ApplicationEvent {
    
        private String id;
    
        public AcctypeEvent(Object source, String id) {
            super(source);
            this.id = id;
        }
    
        public String getId() {
            return id;
        }
    }
    
    

    定义事件监听器

    /**
    事件监听器
     * */
    @Component
    public class AcctytpeEventListener {
        
        @Resource
        private TaskExecutor taskExecutor;
    
        //监听劵批次事件
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        void updataAcctypeEvent(AcctypeEvent event) {
    //        String id = event.getId();//商品id
            //查询最新的数据
            taskExecutor.asyncTest();
    
        }
        
    }
    

    发布事件

     @Autowired
        private ApplicationContext applicationContext;
    
        @Transactional(rollbackFor = Exception.class)
        @Override
        public void test2() throws Exception {
            log.info("【主线程名称】:{}",Thread.currentThread().getName());
            //更新操作
            boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getApp, 66).eq(GoodsPO::getId, "0001059971"));
    
            //发布事件,处理异步任务(查询最新数据,发送短信成功后更新状态)
            applicationContext.publishEvent(new AcctypeEvent("我是和事务相关的事件,请事务提交后执行我","0001059971"));
    
        }
    

    相关文章

      网友评论

        本文标题:事务service调用异步线程bug

        本文链接:https://www.haomeiwen.com/subject/vakyirtx.html