美文网首页
记一次多线程实践过程

记一次多线程实践过程

作者: 某某程序员_ | 来源:发表于2019-08-28 14:20 被阅读0次

    背景:

    一个代扣的任务,每天会定时批量跑任务。

    第一版:单线程串行执行

    开始数据量小的时候,没什么感觉。
    过了一段时间,当数据量超过5000条的时候,每次任务需要大概4小时,观察之后发现,调用支付系统接口的响应大多在2s以上。
    这个时候有人会说,支付服务有问题改支付服务啊,与我自己有什么关系。但是,在实际开发过程中,我们会调用其他服务,并且这些其他服务不受我们的控制,因此首先要从自己负责的项目入手,进行优化。
    为什么一定要优化呢,4个小时能执行完也没什么问题啊,一天也只跑一次。这里有个实际的问题,就是执行时间越长,出错的概率越高,所以能优化的时候还是需要优化的。

    第二版:使用线程池

    代扣任务有三个场景,比如:今天的,昨天的,前天的。
    这里可以使用三个线程的线程池来执行,这时:执行时间=最长时间的场景的执行时间
    如果数据平均的话,总执行时间大概会变为 4/3=1.33小时。
    但是现实情况往往数据是不平均的,有可能到这一步整个任务执行时长并没有明显缩短。
    代码如下:

        ExecutorService AUTO_RENEW_TASK_POOL = Executors.newFixedThreadPool(3);
    
        public void execute(ShardingContext shardingContext) {
            LOGGER.info("开始.");
            CountDownLatch countDownLatch = new CountDownLatch(3);
            AUTO_RENEW_TASK_POOL.submit(() -> {
                dealToday();
                countDownLatch.countDown();
            });
            AUTO_RENEW_TASK_POOL.submit(() -> {
                dealBefore1Day();
                countDownLatch.countDown();
            });
            AUTO_RENEW_TASK_POOL.submit(() -> {
                dealBefore2Day();
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await();
            } catch (Exception e){
                LOGGER.error("异常", e);
            }
            LOGGER.info("结束.");
        }
    

    第三版:进一步细化业务里使用多线程

    在第二版的基础上再进行一次多线程处理。
    开通16个线程并发处理,整个任务执行时间变为最长 4/16 = 0.25小时 = 15分钟,最短仅需 4/3/16=1/12≈0.08小时 = 5分钟 。具体开多少个线程并发处理,根据实际业务场景及整个服务的情况进行判断。
    代码如下:

        public void dealAutoPaymentDetail(...){
            ...
            ExecutorService pool = Executors.newFixedThreadPool(16);
            while (true){
                ...
                if (!CollectionUtils.isEmpty(listBiz)){
                    List<Callable<Boolean>> list = new ArrayList<>();
                    listBiz.forEach(item->{
                        
                            list.add(() -> {
                                ...//业务处理
                            });
                        }          
                    });
    
                    try {
                        pool.invokeAll(list);
                    } catch (InterruptedException e){
                        LOGGER.error("error. " , e);
                    }finally {
                        list.clear();
                    }
                }else {
                    break;
                }
            }
        }
    

    pool.invokeAll(list),有同学看到这里可能会有疑问,直接pool.submit(...)就可以解决了,为什么要使用pool.invokeAll来实现,这个与实际业务有关系。
    pool.invokeAll(...):调用此接口,阻塞主线程直至线程池执行完毕再执行后续流程。
    pool.submit(...):调用此接口,仅把任务交给其他线程执行,主线程不会阻塞,继续执行后续流程。

    还有同学会有疑问,这里直接开多线程,不使用线程池也可以达到目的。原因是在实际业务中,数据量可能会非常庞大,无限制的开启线程会把整个服务拖垮。系统变慢或者直接OOM异常。

    相关文章

      网友评论

          本文标题:记一次多线程实践过程

          本文链接:https://www.haomeiwen.com/subject/abjpectx.html