美文网首页
性能优化落地之串行异步化

性能优化落地之串行异步化

作者: Running的程序员 | 来源:发表于2022-10-12 12:54 被阅读0次

    性能是系统的重要维护指标,性能优化的方法论很多,今天结合我的日常开发,聊一下提升性能的一个重要方法:串行处理异步化(并行化)

    最近项目中有批处理的需求:调度中心会定时触发job,系统会批量(分页)获取表中数据,然后调用外部系统,根据外部系统返回的结果,将待处理数据打标,落库;如下图:


    业务简图.png

    调用risk系统是feign调用,开发完成后,进行测试,正常的串行处理下,20万数据处理完成耗时2个小时左右,性能严重堪忧,所以优化是必然的。分析下可以发现,该场景下性能可以捞取收益的部分就是将调用risk系统改为并发处理。

    常规串行方式

    串行的伪代码如下:

    @Component
    @Slf4j
    class FooOriginDataProcessor {
    
        @Autowired
        private FooOriginDataService fooOriginDataService;
        @Autowired
        private FooHitDataService hitDataService;
        @Autowired
        private DecisionRemoteService decisionRemoteService;
    
    
        @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
        public void doProcess(final List<FooOriginDataPO> originDataList) {
            if (CollectionUtils.isEmpty(originDataList)) {
                return;
            }
    
            List<FooHitDataPO> hitDataList = new ArrayList<>();
    
            originDataList.forEach(e -> {
                AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
                DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
                if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                    //命中风险
                    hitDataList.add(FooOriginDataConverter.instance.convertToHitDataPo(e));
                }
            });
    
            hitDataService.batchInsert(hitDataList);
            List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
            FooOriginDataService.updateToProcessSucceed(originDataIds);
        }
    
    }
    

    串行异步化

    改为并发处理后,代码如下:

    @Component
    @RefreshScope
    @Slf4j
    class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
    
        private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
        private static final int CAPACITY = 200;
        private static final int N_CPU = Runtime.getRuntime().availableProcessors();
        /** I/O密集型任务,线程数量不要超过cpu核数 */
        private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
                new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
    
        /**
         * 并发处理的大小,防止触发风控系统的流控
         */
        @Value("${concurrent_size_of_invocation_risk_api:120}")
        private Integer batchSize;
        @Autowired
        private FooOriginDataService fooOriginDataService;
        @Autowired
        private FooHitDataService hitDataService;
        @Autowired
        private DecisionRemoteService decisionRemoteService;
    
    
        @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
        public void doProcess(final List<FooOriginDataPO> originDataList) {
            
            log.info("doProcess with cpu numbers: {}", N_CPU);
            
            if (CollectionUtils.isEmpty(originDataList)) {
                return;
            }
    
            List<FooHitDataPO> hitDataList = new ArrayList<>();
            List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
    
            originDataList.forEach(e -> {
                CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
                    AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
                    DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
                    if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                        //命中风险
                        return FooOriginDataConverter.instance.convertToHitDataPo(e);
                    }
                    return null;
                }, EXECUTOR);
    
                hitDataFutureList.add(itemFuture);
            });
    
            CompletableFuture.allOf((CompletableFuture<?>) hitDataFutureList).get();
    
            AtomicBoolean hasFail = new AtomicBoolean(false);
            hitDataFutureList.forEach(fu -> {
                try {
                    FooHitDataPO hitDataPO = fu.get();
                    if (Objects.nonNull(hitDataPO)) {
                        hitDataList.add(hitDataPO);
                    }
                } catch (InterruptedException e) {
                    log.error("fail to get result from future occurs InterruptedException", e);
                    Thread.currentThread().interrupt();
                    hasFail.set(true);
                } catch (ExecutionException e) {
                    log.error("fail to get result from future occurs ExecutionException", e);
                    hasFail.set(true);
                }
            });
    
            if (hasFail.get()) {
                throw new ServiceResponseException("获取xxx信息发生异常");
            }
            
            hitDataService.batchInsert(hitDataList);
            List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
            FooOriginDataService.updateToProcessSucceed(originDataIds);
        }
    
        @Override
        public void onApplicationEvent(ContextClosedEvent event) {
            EXECUTOR.shutdown();
        }
    
    }
    

    上面的并发处理代码也是有问题,大数据量下会造成如下问题:

    • 1.如果外部系统有流控(正常项目都会有流控,是服务的自我保护措施),你会触发外部系统的流控,导致请求失败
    • 2.如果外部系统没有流控,大数据量下你会将它的系统冲垮,同时也会将自己系统打垮:I/O占用系统大量资源,导致其它业务无法被处理

    综上所述,需要继续对并发部分进行优化,优化的核心就是思想就是:控制并发请求的流量,代码如下:

    @Component
    @RefreshScope
    @Slf4j
    class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
    
        private static final long ONE_THOUSAND = 1000L;
        private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
        private static final int CAPACITY = 200;
        private static final int N_CPU = Runtime.getRuntime().availableProcessors();
        /** I/O密集型任务,线程数量不要超过cpu核数 */
        private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
                new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
    
        /**
         * 并发处理的大小,防止触发风控系统的流控
         */
        @Value("${concurrent_size_of_invocation_risk_api:120}")
        private Integer batchSize;
        @Autowired
        private FooOriginDataService fooOriginDataService;
        @Autowired
        private FooHitDataService hitDataService;
        @Autowired
        private DecisionRemoteService decisionRemoteService;
    
    
        @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
        public void doProcess(final List<FooOriginDataPO> originDataList) throws ExecutionException, InterruptedException {
    
            log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);
    
            if (CollectionUtils.isEmpty(originDataList)) {
                return;
            }
    
            List<FooHitDataPO> hitDataList = new ArrayList<>();
            List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
    
            List<List<FooOriginDataPO>> allSubListOfList = Lists.partition(originDataList, batchSize);
            for (List<FooOriginDataPO> originDataSubList : allSubListOfList) {
                CompletableFuture<FooHitDataPO>[] hitDataFutureArray = new CompletableFuture[originDataSubList.size()];
                long start = System.currentTimeMillis();
    
                for (int i = 0; i < originDataSubList.size(); i++) {
                    FooOriginDataPO originDataPO = originDataSubList.get(i);
                    CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
                        AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(originDataPO);
                        DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
                        if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                            //命中风险
                            return FooOriginDataConverter.instance.convertToHitDataPo(originDataPO);
                        }
                        return null;
                    }, EXECUTOR);
    
                    hitDataFutureArray[i] = itemFuture;
                }
    
                //等待当前批次返回再处理下一批次
                CompletableFuture.allOf(hitDataFutureArray).get();
    
                long concurrentReqCosts = System.currentTimeMillis() - start;
                log.info("concurrent request end, costs: {}", concurrentReqCosts);
    
                //concurrentReqCosts不足一秒时,主线程需要等待,防止触发接口流控
                long needSleepTimeMillis = ONE_THOUSAND - concurrentReqCosts;
                TimeUnit.MILLISECONDS.sleep(needSleepTimeMillis);
    
                hitDataFutureList.addAll(Arrays.asList(hitDataFutureArray));
            }
    
            AtomicBoolean hasFail = new AtomicBoolean(false);
            hitDataFutureList.forEach(fu -> {
                try {
                    FooHitDataPO hitDataPO = fu.get();
                    if (Objects.nonNull(hitDataPO)) {
                        hitDataList.add(hitDataPO);
                    }
                } catch (InterruptedException e) {
                    log.error("fail to get result from future occurs InterruptedException", e);
                    Thread.currentThread().interrupt();
                    hasFail.set(true);
                } catch (ExecutionException e) {
                    log.error("fail to get result from future occurs ExecutionException", e);
                    hasFail.set(true);
                }
            });
    
            if (hasFail.get()) {
                throw new ServiceResponseException("获取信息发生异常");
            }
    
            hitDataService.batchInsert(hitDataList);
            List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
            FooOriginDataService.updateToProcessSucceed(originDataIds);
        }
    
        @Override
        public void onApplicationEvent(ContextClosedEvent event) {
            EXECUTOR.shutdown();
        }
    
    }
    

    最终优化后,20万数据处理落库耗时25分钟,需要知道的是此处受限于外部系统的流控(如这里risk系统对提供给我的这个接口的限流是150/s),也就是说我每秒最多能处理的数据量也就是150条,所以你会发现,优化是有很多限制条件的,也就是说大家需要具体问题具体分析

    一些小结:

    1. 串行处理可以通过异步化(并行化)提升处理速度
    2. I/O密集型任务(网络IO或文件IO)异步处理时,处理线程的数量不要超过CPU核数,线程过多反而会降低处理性能
    3. 异步处理时要考虑并发量,防止因并发过高拖垮系统和其它相关系统

    相关文章

      网友评论

          本文标题:性能优化落地之串行异步化

          本文链接:https://www.haomeiwen.com/subject/oorfzrtx.html