美文网首页Java
记一次每日跑批任务耗时性能从六分钟优化到半分钟历程及总结

记一次每日跑批任务耗时性能从六分钟优化到半分钟历程及总结

作者: 该用户已秃头 | 来源:发表于2021-01-08 22:19 被阅读0次

    话说做了一个产品需求上线后,涉及到有一个每日凌晨七点跑批任务,跑批查询出来的待处理订单也就1100单作用,但是耗时却花费了5~6分钟,虽说跑批中,又调用外部服务,但是仅仅在空闲时间,鉴于此情况

    我觉得这个跑批任务有优化的空间,伴随着一次次性能优化,最终耗时优化到只需平均20多秒就跑批完毕,鉴于此,写一篇自己的优化过程分享给大家。

    一、业务概述

    话说我们产品需求有个每日凌晨七点,之所以为什么是定时,还要凌晨七点,那是由于在你跑批之前,中后台也有一步步跑批任务,只有等他们跑完任务,我们才能发起跑批调用,否则早的话,人家

    的跑批任务没执行,你调用的时候只能是徒劳,这就是背景。话说,在最初技术方案调研时,通过微信电话本来想沟通一下,通过一些性能压测工具压测一下对方的接口性能,这样对于我们发起方也有 一个指标可以提供在技术实现中有所参考。但是呢,对方口述说,我们只是对决策平台,真正处理的是背后的数据引擎团队,他们的性能才是整个调用链路上的关键。想法很好,但最终没有压测。之所以 这个情况,是由于对方的数据只在生产环境提供,测试环境没有。所以不太好压测,毕竟压测的时间点不一样,结论也不一样。毕竟他们提供很多业务方调用,既然如此,我想着先按照他提供的一些指标 比如,秒级给出结果,不支持批量调用,只支持同步调用,不支持消息通知。这些结论使得我开始设计第一版技术方案,看看上线后的跑批耗时。

    上面说了一大堆话,简言之概况如上图。中台的事情不做完,前台即便调度任务想要提前无济于事。所以在每天用户8、9点上班时,跑批任务就得生成工单。

    二、上线版本

    如上图,这是上线的第一个初步版本,第一次跑批1100多单,耗时了5~6分钟。当我告知给业务方,人家说,"不错了,还以为得跑半个小时"。然而,作为技术研发的我心中默默表示不甘心, 觉得对这个跑批性能,认为有改善的空间,否则随处时间流逝,待处理订单会越来越多,前期不改善,后期肯定耗时越来越长。

    第一版方案处理的过程如下

    1、调度平台配置一个job,每天凌晨七点调度调用我这边的一个服务(线上部署两个节点,跑批固定IP一台服务器)(假设A),A服务提供一个rest接口,接口实现异步去执行后续处理订单。

    rest接口示例如下:

    @Autowired
    ScanningOrderTask scanningOrderTask;
    
    @PostMapping("/disposeOrders")
    @ApiOperation(value = "待处理订单跑批", notes = "待处理订单跑批")
    @NoAuthRequired
    public SiaResponse disposeOrders() {
        scanningOrderTask.scanningOrder();
        return SiaResponse.newInstance("待处理订单跑批成功!");
    }
    

    ScanningOrderTask代码示例如下:

    @Component
    @Slf4j
    public class ScanningOrderTask {
    
        @Autowired
        ExecutorService threadPoolExecutor;
    
        @Autowired
        DisposeOrderFacade disposeOrderFacade;
    
        /**
         * 订单跑谛听规则
         */
        public void scanningOrder(){
            log.info("##### [scanningOrder]待处理订单开始执行跑批处理 ####");
            CompletableFuture.runAsync(() -> disposeOrderFacade.scanningOrders(), threadPoolExecutor);
        }
    }
    

    2、disposeOrderFacade#scanningOrders提供的业务伪代码实现如下:

    public void scanningOrders(){
        int start = 0;
        int limit = 100;
        StopWatch stopWatch = new StopWatch("scanningOrders");
        stopWatch.start();
        List<Object> disposeOrderList = 先查询第一页数据;
        do{
            List<DisposeOrder> repeatOrder = Lists.newArrayList();//二次重试需要再次重试订单
            for(DisposeOrder order : list){
                try{
                    operateStrategyManager.execute(operateContext);
                }catch(Exception e){
                    //异常重试
                    operateStrategyManager.execute(operateContext);
                    //再次重试失败,则加入失败订单
                    if(再次重试失败){
                        repeatOrder(order);
                    }
                }
            }
            //查询下一批 待扫描订单
            start += limit;
            param.setStart(start);
            disposeOrderList = disposeOrderService.queryByPage(param);
            if(CollectionUtils.isNotEmpty(repeatOrder)){
                disposeOrderList.addAll(repeatOrder);
            }
        }while(list非空)
        stopWatch.stop();
        int batchSize = longAdder.intValue() - errorOrderSet.size();
        long durationSeconds = (stopWatch.getLastTaskTimeMillis() / 1000);
        log.info("##### [scanningOrder]跑批处理完毕,共计={}单,耗时={}s ####",batchSize,durationSeconds);
    }
    

    3、决策引擎异步处理相关代码

    public void operation(OperateContext context) {
        //1、调用决策引擎
        Result<AlarmDecisionRe> alarmDecisionResult = alarmDecisionManager.call(AlarmDecisionDTO.builder()
                .appCode(context.getDisposeOrder().getAppCode())
                .scenePhase(ScenePhaseEnum.LOAN_PROCESS.getIndex())
                .build());
        //2、决策输出的结果,再交由另外一个线程池 threadPoolExecutor(通用线程池)异步去处理。
        CompletableFuture.runAsync(() -> handleAlarmResult(alarmDecisionResult.getData()), threadPoolExecutor);
    }
    

    4、决策输出结果的异步处理相关代码

    /**
     * 处理规则校验结果`
     * @param decisionResult
     */
    void handleAlarmResult(AlarmDecisionRe decisionResult){
        String appCode = decisionResult.getAppCode();
        StopWatch stopWatch = new StopWatch("handleAlarmResult");
        stopWatch.start();
        if(Objects.isNull(decisionResult) || CollectionUtils.isEmpty(decisionResult.getDecisionRuleList())){
            log.info("[每日跑批]决策输出为空,appCode={}",appCode);
            return;
        }
        //存detail 优先级、编码
        AlarmDetail detail = AlarmDetail.builder()
                .appCode(appCode)
                .alarmTime(new Date())
                .alarmRules(JSON.toJSONString(decisionResult.getDecisionRuleList()))
                .scenePhase(ScenePhaseEnum.LOAN_PROCESS.getIndex())
                .build();
        alarmDetailService.insertRecord(detail);
        List<WorkOrder> oldWorkOrder = workOrderService.queryList(WorkOrderForm.builder()
                .appCode(decisionResult.getAppCode())
                .isFinished(ConstEnum.YesOrNoEnum.NO.getIndex()).build());
        if(CollectionUtils.isEmpty(oldWorkOrder)){
            saveWorkOrder(detail,decisionResult,0);
        }else{
            log.info("[每日跑批]该订单未处理完结无需创建,appCode={}",decisionResult.getAppCode());
        }
        stopWatch.stop();
        long duration = stopWatch.getLastTaskTimeMillis();
        log.info("[每日跑批]handleAlarmResult,appCode={},duration={}ms",appCode,duration);
    }
    

    综上所述,调度任务发起对A服务调用时,然后开启了一个异步任务去处理,异步任务中分页查询处理,循环处理每一个待处理订单,然后同步调用决策引擎,

    决策引擎返回的结果然后异步交由一个线程池去处理。跑批的待处理订单大概只有300多单会命中业务规则,对于命中规则的订单会生成工单外,还需要做相应 一系列业务逻辑处理,包括通知调用其他系统业务处理。而未命中规则,仅需插入一个预警记录外。无需其他操作,这说明,整体耗时在这300多单阻塞同步循环 调用决策引擎耗时占比非常大。

    从上述图,可以看到命中决策规则的订单执行时间(跑批从凌晨七点执行,上图已经07:05分了),包括未命中决策规则的条数(763条)。

    从上述图,可以看到命中决策规则(这部分订单需要业务处理,包括以及Redis入队等操作)的订单执行时间,包括未命中决策规则的条数(362条)。

    三、优化方案

    基于上线第一版的结论分析,并尽可能不修改业务处理逻辑和程序逻辑处理下,通过引入下属方案处理。

    从上图可以看到最明显与初版差异的是,这次优化方案是,把循环处理待处理订单的阻塞操作,改为异步处理,交由一个线程池去处理,串行等待变为并行处理。

    disposeOrderFacade#scanningOrders的方法前后变化

    //异步处理,把每个待处理订单放在disposeOrderHandleThreadPool(待处理线程池)去处理
    CompletableFuture.runAsync(() -> operateStrategyManager.execute(operateContext), disposeOrderHandleThreadPool);
    
    /**
     * @description: 线程池配置
     * @Date : 2019/4/25 下午2:36
     * @Author : 石冬冬-Seig Heil
     */
    @Configuration
    @EnableAsync
    @Slf4j
    public class ThreadPoolConfig {
    
        static final int blockingQueueCapacity = 5000;
    
        static final int keepAliveTime = 60;
    
        static final int availableProcessors = Runtime.getRuntime().availableProcessors();
    
        static final int corePoolSize  = availableProcessors * 8;
    
        static final int maximumPoolSize = 50;
    
        static {
            log.info("[availableProcessors]={},corePoolSize={},maximumPoolSize={}",availableProcessors,corePoolSize,maximumPoolSize);
        }
    
        /**
         * 通用线程池配置
         * @return
         */
        @Bean
        public ExecutorService threadPoolExecutor() {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build();
            return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(blockingQueueCapacity), threadFactory);
        }
    
        /**
         * 跑批待处理订单线程池配置
         * blockingQueueCapacity 需要设置合理大小,目前上线一周,每日跑批 1175单左右;即便线程池已满,使用拒绝策略;有补偿机制。
         * @return
         */
        @Bean
        public ExecutorService disposeOrderHandleThreadPool() {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("disposeOrder-pool-%d").build();
            final int corePoolSize = availableProcessors * 6,maxPoolSize = corePoolSize * 2,keepAliveTime = 60,blockingQueueCapacity = 5000;
            return new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(blockingQueueCapacity), threadFactory);
        }
    
    }
    

    这是上述线程池配置,也是目前跑到线上的配置。其中 disposeOrderHandleThreadPool (待处理订单线程池)也是经过几次优化后最终设置的参数配置。

    • 第一次优化,出现队列拒绝现象。
    /**
     * 跑批待处理订单线程池配置
     * @return
     */
    @Bean
    public ExecutorService disposeOrderHandleThreadPool() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("carthage-disposeOrder-pool-%d").build();
        final int corePoolSize = 5,maxPoolSize = 12;
        return new ThreadPoolExecutor(corePoolSize,maxPoolSize,60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(500), threadFactory);
    }
    

    线程池配置如上,核心线程数5个,最大线程数10个,队列长度500个。

    异常信息如下所图(好在待处理有重试机制,跑批任务数据没有问题):

    • 第二次优化,无队列拒绝现象。
    /**
     * 跑批待处理订单线程池配置
     * blockingQueueCapacity 需要设置合理大小,目前上线一周,每日跑批 1175单左右;即便线程池已满,使用拒绝策略;有补偿机制。
     * @return
     */
    @Bean
    public ExecutorService disposeOrderHandleThreadPool() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("carthage-disposeOrder-pool-%d").build();
        final int corePoolSize = availableProcessors * 4,maxPoolSize = corePoolSize,keepAliveTime = 60,blockingQueueCapacity = 5000;
        return new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(blockingQueueCapacity), threadFactory);
    }
    

    线程池配置如上,核心线程数14个(线上容器处理器核数4核),最大线程数跟核心一致16个,队列长度5000个。

    跑批情况(共计=1210单,主线程:耗时=1s;子线程最后一单最后完成时间 2020-08-16 07:00:27.794;),跑批任务正常,无线程任务队列拒绝现象。

    记一次每日跑批任务耗时性能从六分钟优化到半分钟历程及总结

    如上图,跑批任务最后一单执行日志输出,整个跑批耗时花费26秒。这是在一次周末抽空优化的结果。

    • 第三次优化,进一步调整待处理和通用线程池核心数大小。
    static final int blockingQueueCapacity = 5000;
    
    static final int keepAliveTime = 60;
    
    static final int availableProcessors = Runtime.getRuntime().availableProcessors();
    
    static final int corePoolSize  = availableProcessors * 8;
    
    static final int maximumPoolSize = 50;
    
    /**
     * 通用线程池配置
     * @return
     */
    @Bean
    public ExecutorService threadPoolExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build();
        return new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(blockingQueueCapacity), threadFactory);
    }
    
    /**
     * 跑批待处理订单线程池配置
     * blockingQueueCapacity 需要设置合理大小,目前上线一周,每日跑批 1175单左右;即便线程池已满,使用拒绝策略;有补偿机制。
     * @return
     */
    @Bean
    public ExecutorService disposeOrderHandleThreadPool() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("disposeOrder-pool-%d").build();
        final int corePoolSize = availableProcessors * 6,maxPoolSize = corePoolSize * 2,keepAliveTime = 60,blockingQueueCapacity = 5000;
        return new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(blockingQueueCapacity), threadFactory);
    }
    

    通用线程池核心数32,最大50,队列长度500;待处理订单线程池配置核心数24,最大48,队列长度5000。

    目前今日跑批(8月21日)1323单,主线程耗时=1s;子线程最后一单完成时间(2020-08-21 07:00:22.040),仅需要22秒,三次优化从最初5~6分钟,到现在的单机跑批22秒,质的飞跃。

    上图是今日(8月21日)跑批容器实例(单机)的CPU以及堆内存使用情况。

    上图是今日(8月21日)跑批容器实例(单机)的CPU以及线程数。

    上图是今日(8月21日)跑批容器实例(单机)的堆内存和GC情况(JDK8,使用G1垃圾收集器,跑批期间发生一次MGC)。

    四、总结

    1、性能优化尽量如果上线后,尽量在少修改业务代码或程序逻辑前提下,逐步改善方案。 2、串行变并行,使用线程池去处理,合理逐步调整线程池核心数以及队列长度。 3、线程池根据不同业务场景,应用不要采用一个线程池处理所有异步任务场景。 4、代码中通过合理打印日志,便于后期排查问题以及优化提供帮助。

    相关文章

      网友评论

        本文标题:记一次每日跑批任务耗时性能从六分钟优化到半分钟历程及总结

        本文链接:https://www.haomeiwen.com/subject/ytrqaktx.html