业务背景
A平台调用B平台提供的soa接口查询司机详情信息,由于B平台提供的是批量查询接口,对批量查询接口,做了单次查询的数量限制,那就是限制每次查询的数量不能超过指定的值(100个)。
优化前:
而A平台为了提高获取信息的速度,考虑到使用多线程并发进行获取,于是第一版的代码是下面这样的:
private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
List<Long> availableDriverIdList = new ArrayList<>();
//获取批量调用分页数量
int pageCount = PageUtil.getPageCount(driverIdList.size(), PAGE_SIZE);
try {
//根据分页数量设置线程池大小
ExecutorService executor = new ThreadPoolExecutor(pageCount, pageCount,
ONE_THOUSAND, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
List<Future<List<DriverBO>>> resultFutureList = new ArrayList<>();
for (int i = 1; i <= pageCount; i++) {
//根据分页数量,循环提交查询任务
List<Long> pageDriverIdList = PageUtil.startPage(driverIdList, i, PAGE_SIZE);
DriverListResquest driverListResquest = driverService.getDriverListRequest(pageDriverIdList, vehicleTypeIds);
resultFutureList.add(executor.submit(() -> driverService.getDriverList(driverListResquest)));
}
//阻塞获取请求结果
for (Future<List<DriverBO>> futureResult : resultFutureList) {
List<DriverBO> driverBOList = futureResult.get();
if (!CollectionUtil.isEmpty(driverBOList)) {
driverBOList.forEach(e -> availableDriverIdList.add(e.getDriverId()));
}
}
//关闭线程池
executor.shutdown();
} catch (Exception e) {
log.error("selectDriverDetailsBySoa,并发批量查询司机详情,处理出错", e);
}
log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
return availableDriverIdList;
}
缺点:虽然这里线程池的大小是根据分页数设置的,但每次请求都需要反复创建和停止线程池,在请求量大时这将是非常消耗资源的,也会是很致命的。
优化后:
经过优化调整之后,代码是这样的:
private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
List<Long> availableDriverIdList = new ArrayList<>();
try {
//使用google第三方工具类直接进行分组(分页)
List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, PAGE_SIZE);
//分组提交任务
List<CompletableFuture<List<DriverBO>>> completableFutureList = pagesDriverIdList.stream()
.map(pageDriverIdList -> asyncRequestSoaDriverSoaService(pageDriverIdList, vehicleTypeIds)).collect(Collectors.toList());
//汇总处理结果
List<List<DriverBO>> boLists = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
boLists.forEach(listItem -> listItem.forEach(item -> availableDriverIdList.add(item.getDriverId())));
log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
} catch (Exception e) {
log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理失败", e);
}
return availableDriverIdList;
}
private CompletableFuture<List<DriverBO>> asyncRequestSoaDriverSoaService(List<Long> driverIdList, List<Long> vehicleTypeIds){
//封装请求参数
DriverListResquest driverListResquest = driverService.getDriverListRequest(driverIdList, vehicleTypeIds);
//使用针对业务统一管理的线程池,提交任务
return AsyncExecutor.runAsync(() -> sdpDriverService.getDriverList(driverListResquest));
}
是不是简洁了很多。另外,针对特定业务场景,专门建了异步线程池类进行管理线程池:
@Slf4j
@NoArgsConstructor
@Component(ASYNC_EXECUTOR)
public class AsyncExecutor implements Executor {
public static final String ASYNC_EXECUTOR = "asyncExecutor";
public static ExecutorService executors;
@Value("${async.executor.thread.corePoolSize:#{null}}")
private Integer corePoolSize;
@Value("${async.thread.maxPoolSize:#{null}}")
private Integer maxPoolSize;
@Value("${executor.thread.nameFormat:async-pool-%d}")
private String threadNameFormat;
@PostConstruct
public void init() {
log.info("async executor,corePoolSize:{},maxPoolSize{},threadNameFormat:{}", this.corePoolSize, this.maxPoolSize, this.threadNameFormat);
ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat(this.threadNameFormat).build();
if (Objects.isNull(corePoolSize)) {
this.corePoolSize = Runtime.getRuntime().availableProcessors() * 4;
}
if (Objects.isNull(maxPoolSize)) {
this.maxPoolSize = Runtime.getRuntime().availableProcessors() * 8;
}
AsyncExecutor.executors = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, 3000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@Override
public void execute(@NotNull Runnable command) {
if (command == null) {
throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
} else {
AsyncExecutor.executors.execute(new RunnableWrapper(command));
}
}
public static <T> CompletableFuture<T> runAsync(@NonNull Supplier<T> supplier) {
if (supplier == null) {
throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
} else {
return CompletableFuture.supplyAsync(new SupplierWrapper<>(supplier), AsyncExecutor.executors);
}
}
}
其实可以看得出来,这里单纯使用CompletableFuture,并没有真正发挥出来其优势,因为上面的核心问题不在Future,而在线程池的使用不当上。而使用Java Stream的流式开发进行优化,同样可以达到上面代码的间接的效果。
public static List<Long> selectDriverDetailsBySoa(List<Long> driverIdList) {
List<Long> availableDriverIdList = new ArrayList<>();
List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, 1);
List<Future<List<DriverBO>>> resultFutureList = pagesDriverIdList.stream()
.map(pageDriverIdList -> GrabHallAsyncExecutor.executors.submit(() -> getDriverList(pageDriverIdList)))
.collect(Collectors.toList());
resultFutureList.stream().map(future -> getFutureResult(future))
.collect(Collectors.toList())
.forEach((e) -> e.forEach(o -> {
availableDriverIdList.add(o.getDriverId());
}));
return availableDriverIdList;
}
那为什么还要用CompletableFuture的呢,一是为了和现有代码中的使用保持一致,其次是在学习之后,发现CompletableFuture比Future更好用,能力更强。
所以如果可以,推荐你也把CompletableFuture使用起来,真的很好用。
后面我会和大家分享更多详细的对于CompletableFuture的学习总结。
网友评论