美文网首页spring framework一些收藏
事务注解@Transactional在多线程下的体现和尽量保证多

事务注解@Transactional在多线程下的体现和尽量保证多

作者: 东南枝下 | 来源:发表于2022-01-17 18:39 被阅读0次

    事务注解@Transactional在多线程下的体现

    使用默认的事务传播规则,测试代码如下

    ChickServiceImpl.java 将列表插入分成n个线程并发操作

    ......
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        public Boolean batchConcurrentSave(List<Chick> chickList) {
            // 分成n个循环调用异步函数
            CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];
    
            for (int i = 0; i < chickList.size(); i++) {
                // 获取返回结果
                CompletableFuture<Long> chickId = chickAsyncRepository.insertAsyncChick(chickList.get(i));
                completableFutureArr[i] = chickId;
            }
    
            //join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
            System.out.println("异步调用完,join前.......");
            CompletableFuture.allOf(completableFutureArr).join();
            System.out.println("异步调用完,join后.......");
    
            return Boolean.TRUE;
        }
    
    ......
    

    ChickAsyncRepositoryImpl.insertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况

    @Component
    public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {
    
        @Autowired
        private ChickMapper chickMapper;
    
        @Override
        @Async("executorBeanName")
        // @Transactional(rollbackFor = Exception.class)
        public CompletableFuture<Long> insertAsyncChick(Chick chick) {
            Long chickId = chickMapper.addChick(chick);
            System.out.println("当前线程号 : "+Thread.currentThread().getName());
            // 故意写个报错代码,测试回滚情况
            if("c".equals(chick.getName())){
                int i =1/0;
            }
            return CompletableFuture.completedFuture(chickId);
        }
    }
    

    用以下数据进行测试,当插入第三条name=c的数据时会报错

    [
    {
        "name":"a",
        "weight":"10"
    },
    {
        "name":"b",
        "weight":"11"
    },
    {
        "name":"c",
        "weight":"12"
    },
    {
        "name":"d",
        "weight":"13"
    }
    ]
    
    1. ChickAsyncRepositoryImpl.insertAsyncChick 函数上不加@Transactional注解
      代码报错,但四条数据均插入成功,name为c的线程没有发生回滚,说明并发时没有继承到调用者的事务
    2. ChickAsyncRepositoryImpl.insertAsyncChick 函数上@Transactional注解
      代码报错,只有三条数据插入成功,name为c的数据回滚了,没有插入成功,说明并发时该函数没有继承到调用者的事务,自己各自创建了新事物

    尽量保证多线程下的事务一致性

    先尽量保证并发线程间的事务一致,流程图如下:


    并发事务一致性流程图.png

    代码实现:

    ChickServiceImpl.java 将列表插入分成n个线程并发操作
    使用 AtomicBoolean 作为线程间的共享变量
    使用 CountDownLatch 完成线程间的互相等待

    ......
    
        @Override
        public Boolean batchAtomicConcurrentSave(List<Chick> chickList) {
            AtomicBoolean successFlag = new AtomicBoolean(Boolean.TRUE);
            // 分成n个循环调用异步函数
            CompletableFuture[] completableFutureArr = new CompletableFuture[chickList.size()];
            CountDownLatch countDownLatch = new CountDownLatch(chickList.size());
            for (int i = 0; i < chickList.size(); i++) {
                // 获取返回结果
                CompletableFuture<Long> chickId =
                        chickAsyncRepository.newInsertAsyncChick(chickList.get(i),
                                countDownLatch,
                                successFlag);
                completableFutureArr[i] = chickId;
            }
    
            //join() 的作用:让“主线程”等待“子线程”结束之后才能继续运行
            System.out.println("异步调用完,join前.......");
            CompletableFuture.allOf(completableFutureArr).join();
            System.out.println("异步调用完,join后.......");
    
            return successFlag.get();
        }
    
    ......
    

    ChickAsyncRepositoryImpl.newInsertAsyncChick 插入数据,在插入函数中故意写个报错的代码,测试数据库回滚的情况
    使用 PlatformTransactionManager 来手动控制事务、提交和回滚

    /**
     * @author Jenson
     */
    @Component
    @Slf4j
    public class ChickAsyncRepositoryImpl implements ChickAsyncRepository {
    
        @Autowired
        private ChickMapper chickMapper;
        @Autowired
        private PlatformTransactionManager platformTransactionManager;
    
        @Override
        @Async("executorBeanName")
        public CompletableFuture<Long> newInsertAsyncChick(Chick chick,
                                                           CountDownLatch countDownLatch,
                                                           AtomicBoolean successFlag) {
            System.out.println("当前线程号 : " + Thread.currentThread().getName());
            // 创建事务
            TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
            TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
            Long chickId = -1L;
            try {
                chickId = chickMapper.addChick(chick);
                // 故意写个报错代码,测试回滚情况
                if ("c".equals(chick.getName())) {
                    int i = 1 / 0;
                }
            } catch (Exception e) {
                System.out.println("报错了,报错的线程 : " + Thread.currentThread().getName());
                successFlag.set(Boolean.FALSE);
    //            log.error("error {}", e);
            }
    //        // 测试CountDownLatch是否生效
    //        if ("c".equals(chick.getName())) {
    //            try {
    //                Thread.sleep(10000);
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    //        }
            countDownLatch.countDown();
            try {
                System.out.println(Thread.currentThread().getName() + "开始await : " + new Date());
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "结束await : " + new Date());
            // 最后提交/回滚
            if(Boolean.TRUE.equals(successFlag.get())){
                platformTransactionManager.commit(transactionStatus);
            }
            else {
                platformTransactionManager.rollback(transactionStatus);
            }
            return CompletableFuture.completedFuture(chickId);
        }
    }
    

    相关文章

      网友评论

        本文标题:事务注解@Transactional在多线程下的体现和尽量保证多

        本文链接:https://www.haomeiwen.com/subject/giuchrtx.html