美文网首页
从Future到CompleteableFuture的转化实践之

从Future到CompleteableFuture的转化实践之

作者: GuangHui | 来源:发表于2022-01-24 10:17 被阅读0次
    业务背景

    A平台调用B平台提供的soa接口查询司机详情信息,由于B平台提供的是批量查询接口,对批量查询接口,做了单次查询的数量限制,那就是限制每次查询的数量不能超过指定的值(100个)。

    优化前:

    而A平台为了提高获取信息的速度,考虑到使用多线程并发进行获取,于是第一版的代码是下面这样的:

    private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
            List<Long> availableDriverIdList = new ArrayList<>();
            //获取批量调用分页数量
            int pageCount = PageUtil.getPageCount(driverIdList.size(), PAGE_SIZE);
    
            try {
                //根据分页数量设置线程池大小
                ExecutorService executor = new ThreadPoolExecutor(pageCount, pageCount,
                        ONE_THOUSAND, TimeUnit.MILLISECONDS,
                        new SynchronousQueue<>(),
                        Executors.defaultThreadFactory(),
                        new ThreadPoolExecutor.AbortPolicy());
                List<Future<List<DriverBO>>> resultFutureList = new ArrayList<>();
                for (int i = 1; i <= pageCount; i++) {
                        //根据分页数量,循环提交查询任务
                    List<Long> pageDriverIdList = PageUtil.startPage(driverIdList, i, PAGE_SIZE);
                    DriverListResquest driverListResquest = driverService.getDriverListRequest(pageDriverIdList, vehicleTypeIds);
                    resultFutureList.add(executor.submit(() -> driverService.getDriverList(driverListResquest)));
                }
                //阻塞获取请求结果
                for (Future<List<DriverBO>> futureResult : resultFutureList) {
                    List<DriverBO> driverBOList = futureResult.get();
                    if (!CollectionUtil.isEmpty(driverBOList)) {
                        driverBOList.forEach(e -> availableDriverIdList.add(e.getDriverId()));
                    }
                }
                //关闭线程池
                executor.shutdown();
            } catch (Exception e) {
                log.error("selectDriverDetailsBySoa,并发批量查询司机详情,处理出错", e);
            }
            log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
            return availableDriverIdList;
        }
    

    缺点:虽然这里线程池的大小是根据分页数设置的,但每次请求都需要反复创建和停止线程池,在请求量大时这将是非常消耗资源的,也会是很致命的。

    优化后:

    经过优化调整之后,代码是这样的:

     private List<Long> selectDriverDetailsBySoa(List<Long> driverIdList, List<Long> vehicleTypeIds) {
            List<Long> availableDriverIdList = new ArrayList<>();
            try {
                    //使用google第三方工具类直接进行分组(分页)
                List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, PAGE_SIZE);
                //分组提交任务
                List<CompletableFuture<List<DriverBO>>> completableFutureList = pagesDriverIdList.stream()
                        .map(pageDriverIdList -> asyncRequestSoaDriverSoaService(pageDriverIdList, vehicleTypeIds)).collect(Collectors.toList());
                //汇总处理结果
                List<List<DriverBO>> boLists = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
                boLists.forEach(listItem -> listItem.forEach(item -> availableDriverIdList.add(item.getDriverId())));
                log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理结果:{}", JsonUtils.writeValueAsString(availableDriverIdList));
            } catch (Exception e) {
                log.info("selectDriverDetailsBySoa,并发批量查询司机详情,处理失败", e);
            }
            return availableDriverIdList;
        }
    
        private CompletableFuture<List<DriverBO>> asyncRequestSoaDriverSoaService(List<Long> driverIdList, List<Long> vehicleTypeIds){
                //封装请求参数
            DriverListResquest driverListResquest = driverService.getDriverListRequest(driverIdList, vehicleTypeIds);
            //使用针对业务统一管理的线程池,提交任务
            return AsyncExecutor.runAsync(() ->  sdpDriverService.getDriverList(driverListResquest));
        }
    

    是不是简洁了很多。另外,针对特定业务场景,专门建了异步线程池类进行管理线程池:

    @Slf4j
    @NoArgsConstructor
    @Component(ASYNC_EXECUTOR)
    public class AsyncExecutor implements Executor {
        public static final String ASYNC_EXECUTOR = "asyncExecutor";
        public static ExecutorService executors;
        @Value("${async.executor.thread.corePoolSize:#{null}}")
        private Integer corePoolSize;
        @Value("${async.thread.maxPoolSize:#{null}}")
        private Integer maxPoolSize;
        @Value("${executor.thread.nameFormat:async-pool-%d}")
        private String threadNameFormat;
    
        @PostConstruct
        public void init() {
            log.info("async executor,corePoolSize:{},maxPoolSize{},threadNameFormat:{}", this.corePoolSize, this.maxPoolSize, this.threadNameFormat);
            ThreadFactory threadFactory = (new ThreadFactoryBuilder()).setNameFormat(this.threadNameFormat).build();
            if (Objects.isNull(corePoolSize)) {
                this.corePoolSize = Runtime.getRuntime().availableProcessors() * 4;
            }
            if (Objects.isNull(maxPoolSize)) {
                this.maxPoolSize = Runtime.getRuntime().availableProcessors() * 8;
            }
            AsyncExecutor.executors = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, 3000L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        }
    
        @Override
        public void execute(@NotNull Runnable command) {
            if (command == null) {
                throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
            } else {
                AsyncExecutor.executors.execute(new RunnableWrapper(command));
            }
        }
    
        public static <T> CompletableFuture<T> runAsync(@NonNull Supplier<T> supplier) {
            if (supplier == null) {
                throw new NullPointerException("AsyncExecutor async executor:command is marked non-null but is null");
            } else {
                return CompletableFuture.supplyAsync(new SupplierWrapper<>(supplier), AsyncExecutor.executors);
            }
        }
    
    }
    

    其实可以看得出来,这里单纯使用CompletableFuture,并没有真正发挥出来其优势,因为上面的核心问题不在Future,而在线程池的使用不当上。而使用Java Stream的流式开发进行优化,同样可以达到上面代码的间接的效果。

    public static List<Long> selectDriverDetailsBySoa(List<Long> driverIdList) {
        List<Long> availableDriverIdList = new ArrayList<>();
        List<List<Long>> pagesDriverIdList = Lists.partition(driverIdList, 1);
        List<Future<List<DriverBO>>> resultFutureList = pagesDriverIdList.stream()
                .map(pageDriverIdList -> GrabHallAsyncExecutor.executors.submit(() -> getDriverList(pageDriverIdList)))
                .collect(Collectors.toList());
        resultFutureList.stream().map(future -> getFutureResult(future))
                .collect(Collectors.toList())
                .forEach((e) -> e.forEach(o -> {
                    availableDriverIdList.add(o.getDriverId());
                }));
        return availableDriverIdList;
    }
    

    那为什么还要用CompletableFuture的呢,一是为了和现有代码中的使用保持一致,其次是在学习之后,发现CompletableFuture比Future更好用,能力更强。
    所以如果可以,推荐你也把CompletableFuture使用起来,真的很好用。

    后面我会和大家分享更多详细的对于CompletableFuture的学习总结。

    相关文章

      网友评论

          本文标题:从Future到CompleteableFuture的转化实践之

          本文链接:https://www.haomeiwen.com/subject/zusthrtx.html