性能是系统的重要维护指标,性能优化的方法论很多,今天结合我的日常开发,聊一下提升性能的一个重要方法:串行处理异步化(并行化)
最近项目中有批处理的需求:调度中心会定时触发job,系统会批量(分页)获取表中数据,然后调用外部系统,根据外部系统返回的结果,将待处理数据打标,落库;如下图:

调用risk系统是feign调用,开发完成后,进行测试,正常的串行处理下,20万数据处理完成耗时2个小时左右,性能严重堪忧,所以优化是必然的。分析下可以发现,该场景下性能可以捞取收益的部分就是将调用risk系统改为并发处理。
常规串行方式
串行的伪代码如下:
@Component
@Slf4j
class FooOriginDataProcessor {
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) {
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
originDataList.forEach(e -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中风险
hitDataList.add(FooOriginDataConverter.instance.convertToHitDataPo(e));
}
});
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
}
串行异步化
改为并发处理后,代码如下:
@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
private static final int CAPACITY = 200;
private static final int N_CPU = Runtime.getRuntime().availableProcessors();
/** I/O密集型任务,线程数量不要超过cpu核数 */
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 并发处理的大小,防止触发风控系统的流控
*/
@Value("${concurrent_size_of_invocation_risk_api:120}")
private Integer batchSize;
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) {
log.info("doProcess with cpu numbers: {}", N_CPU);
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
originDataList.forEach(e -> {
CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中风险
return FooOriginDataConverter.instance.convertToHitDataPo(e);
}
return null;
}, EXECUTOR);
hitDataFutureList.add(itemFuture);
});
CompletableFuture.allOf((CompletableFuture<?>) hitDataFutureList).get();
AtomicBoolean hasFail = new AtomicBoolean(false);
hitDataFutureList.forEach(fu -> {
try {
FooHitDataPO hitDataPO = fu.get();
if (Objects.nonNull(hitDataPO)) {
hitDataList.add(hitDataPO);
}
} catch (InterruptedException e) {
log.error("fail to get result from future occurs InterruptedException", e);
Thread.currentThread().interrupt();
hasFail.set(true);
} catch (ExecutionException e) {
log.error("fail to get result from future occurs ExecutionException", e);
hasFail.set(true);
}
});
if (hasFail.get()) {
throw new ServiceResponseException("获取xxx信息发生异常");
}
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
EXECUTOR.shutdown();
}
}
上面的并发处理代码也是有问题,大数据量下会造成如下问题:
- 1.如果外部系统有流控(正常项目都会有流控,是服务的自我保护措施),你会触发外部系统的流控,导致请求失败
- 2.如果外部系统没有流控,大数据量下你会将它的系统冲垮,同时也会将自己系统打垮:I/O占用系统大量资源,导致其它业务无法被处理
综上所述,需要继续对并发部分进行优化,优化的核心就是思想就是:控制并发请求的流量
,代码如下:
@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
private static final long ONE_THOUSAND = 1000L;
private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
private static final int CAPACITY = 200;
private static final int N_CPU = Runtime.getRuntime().availableProcessors();
/** I/O密集型任务,线程数量不要超过cpu核数 */
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 并发处理的大小,防止触发风控系统的流控
*/
@Value("${concurrent_size_of_invocation_risk_api:120}")
private Integer batchSize;
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) throws ExecutionException, InterruptedException {
log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
List<List<FooOriginDataPO>> allSubListOfList = Lists.partition(originDataList, batchSize);
for (List<FooOriginDataPO> originDataSubList : allSubListOfList) {
CompletableFuture<FooHitDataPO>[] hitDataFutureArray = new CompletableFuture[originDataSubList.size()];
long start = System.currentTimeMillis();
for (int i = 0; i < originDataSubList.size(); i++) {
FooOriginDataPO originDataPO = originDataSubList.get(i);
CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(originDataPO);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中风险
return FooOriginDataConverter.instance.convertToHitDataPo(originDataPO);
}
return null;
}, EXECUTOR);
hitDataFutureArray[i] = itemFuture;
}
//等待当前批次返回再处理下一批次
CompletableFuture.allOf(hitDataFutureArray).get();
long concurrentReqCosts = System.currentTimeMillis() - start;
log.info("concurrent request end, costs: {}", concurrentReqCosts);
//concurrentReqCosts不足一秒时,主线程需要等待,防止触发接口流控
long needSleepTimeMillis = ONE_THOUSAND - concurrentReqCosts;
TimeUnit.MILLISECONDS.sleep(needSleepTimeMillis);
hitDataFutureList.addAll(Arrays.asList(hitDataFutureArray));
}
AtomicBoolean hasFail = new AtomicBoolean(false);
hitDataFutureList.forEach(fu -> {
try {
FooHitDataPO hitDataPO = fu.get();
if (Objects.nonNull(hitDataPO)) {
hitDataList.add(hitDataPO);
}
} catch (InterruptedException e) {
log.error("fail to get result from future occurs InterruptedException", e);
Thread.currentThread().interrupt();
hasFail.set(true);
} catch (ExecutionException e) {
log.error("fail to get result from future occurs ExecutionException", e);
hasFail.set(true);
}
});
if (hasFail.get()) {
throw new ServiceResponseException("获取信息发生异常");
}
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
EXECUTOR.shutdown();
}
}
最终优化后,20万数据处理落库耗时25分钟,需要知道的是此处受限于外部系统的流控(如这里risk系统对提供给我的这个接口的限流是150/s),也就是说我每秒最多能处理的数据量也就是150条,所以你会发现,优化是有很多限制条件的,也就是说大家需要具体问题具体分析
一些小结:
- 串行处理可以通过异步化(并行化)提升处理速度
- I/O密集型任务(网络IO或文件IO)异步处理时,处理线程的数量不要超过CPU核数,线程过多反而会降低处理性能
- 异步处理时要考虑并发量,防止因并发过高拖垮系统和其它相关系统
网友评论