美文网首页
学习使用CompleteFuture

学习使用CompleteFuture

作者: Spring_java | 来源:发表于2021-07-22 23:27 被阅读0次

    最近遇到一个需求,一批数据,要去请求A B C 多个接口。不同接口返回不同字段的值。然后设置到原来的对象中。

    其中 A BC 接口每次请求都对数量有限制。

    好了,使用CompleteFuture来解决

    代码:

        @GetMapping("/user2")
        public List<User> getData() throws Exception {
            List<User> userList = new ArrayList<>(2000);
            for (int j = 0; j < 1000; j++) {
                User u1 = new User();
                u1.setId(j + "");
                u1.setAddress("地址:" + j);
                u1.setAge(j + "");
                userList.add(u1);
            }
            long l = System.currentTimeMillis();
    //        getOtherInfo(userList);
    //        getUserName(userList);
    //        getMobile(userList);
    
            CompletableFuture<Void> otherTask = CompletableFuture.runAsync(() -> {
                getOtherInfo(userList);
            }, poolExecutor);
            CompletableFuture<Void> nameTask = CompletableFuture.runAsync(() -> {
                getUserName(userList);
            }, poolExecutor);
            CompletableFuture<Void> mobileTask = CompletableFuture.runAsync(() -> {
                getMobile(userList);
            }, poolExecutor);
            try {
                CompletableFuture.allOf(nameTask, mobileTask, otherTask).join();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("================调用第三方耗时:" + (System.currentTimeMillis() - l) + " 毫秒");
            return userList;
        }
    

    原始请求接口 A B C

        /**
         * 模拟调用第三方,获取其他信息
         *
         * @param users
         */
        private void getOtherInfo(List<User> users) {
            long beginTime = System.currentTimeMillis();
            List<List<User>> subUsers = new ArrayList<>();
            List<List<User>> partitionList = getPartitionList(users, subUsers, 50);
    //        partitionList.forEach(list -> {
    //                sendRequestToService(100);
    //                list.forEach(user -> {
    //                    user.setBirth("生日: " + user.getId());
    //                    user.setSex("性别:" + user.getId());
    //            });
    //        });
            List<CompletableFuture> futures=new ArrayList<>();
            for (int i = 0; i < partitionList.size(); i++) {
                List<User> list = partitionList.get(i);
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                    sendRequestToService(100);
                    list.forEach(user -> {
                        user.setBirth("生日: " + user.getId());
                        user.setSex("性别:" + user.getId());
                    });
                });
                futures.add(future);
            }
            CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
            allFuture.join();
    //        System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            System.out.println("获取其他信息耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
        }
    
        /**
         * 获取用户姓名
         *
         * @param users
         */
        private void getUserName(List<User> users) {
            long beginTime = System.currentTimeMillis();
            List<List<User>> subUsers = new ArrayList<>();
            List<List<User>> partitionList = getPartitionList(users, subUsers, 200);
            partitionList.forEach(list -> {
                sendRequestToService(130);
                list.forEach(user -> {
                    user.setUsername("姓名:" + user.getId());
                });
            });
    
            System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            System.out.println("获取用户姓名 耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
        }
    
        /**
         * 获取手机
         * @param users
         */
        private void getMobile(List<User> users) {
            long beginTime = System.currentTimeMillis();
            List<List<User>> subUsers = new ArrayList<>();
            List<List<User>> partitionList = getPartitionList(users, subUsers, 400);
            partitionList.forEach(list -> {
                sendRequestToService(150);
                list.forEach(user -> {
                    user.setMobile("手机:" + user.getId());
                });
            });
            System.out.println(Thread.currentThread().getName() + " 正在执行任务");
            System.out.println("获取用户姓名 耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
        }
    

    数据进行分片 和模拟时间

        /**
         * 根据分片来获取数据
         *
         * @param users
         * @param subUsers
         * @param count
         * @return
         */
        private List<List<User>> getPartitionList(List<User> users, List<List<User>> subUsers, int count) {
            if (users.size() >= count) {
                subUsers = Lists.partition(users, count);
            } else {
                subUsers.add(users);
            }
            return subUsers;
        }
    
        /**
         * 根据传入时间来判断暂停接口多少毫秒
         *
         * @param i
         */
        private void sendRequestToService(int count) {
            try {
                // 模拟请求对面接口count毫秒
                TimeUnit.MILLISECONDS.sleep(count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Bean
        @Qualifier(value = "MyThread")
        private ThreadPoolTaskExecutor poolExecutor(){
            ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(100);
            executor.setKeepAliveSeconds(10);
    
            executor.setThreadNamePrefix("Pool-Nexus");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    

    最终结果:

    不使用 future 
    http-nio-8009-exec-1 正在执行任务
    获取其他信息耗时:2012 毫秒
    http-nio-8009-exec-1 正在执行任务
    获取用户姓名 耗时:652 毫秒
    http-nio-8009-exec-1 正在执行任务
    获取用户姓名 耗时:454 毫秒
    ================调用第三方耗时:3118 毫秒
    使用3个
    Pool-Nexus3 正在执行任务
    获取用户姓名 耗时:453 毫秒
    Pool-Nexus2 正在执行任务
    获取用户姓名 耗时:654 毫秒
    Pool-Nexus1 正在执行任务
    获取其他信息耗时:2011 毫秒
    ================调用第三方耗时:2014 毫秒
    
    使用3个再为了获取其他信息接口再套3个
    ForkJoinPool.commonPool-worker-1 正在执行任务
    ForkJoinPool.commonPool-worker-2 正在执行任务
    ForkJoinPool.commonPool-worker-4 正在执行任务
    ForkJoinPool.commonPool-worker-3 正在执行任务
    ForkJoinPool.commonPool-worker-5 正在执行任务
    ForkJoinPool.commonPool-worker-3 正在执行任务
    ForkJoinPool.commonPool-worker-4 正在执行任务
    ForkJoinPool.commonPool-worker-2 正在执行任务
    ForkJoinPool.commonPool-worker-5 正在执行任务
    ForkJoinPool.commonPool-worker-1 正在执行任务
    ForkJoinPool.commonPool-worker-3 正在执行任务
    ForkJoinPool.commonPool-worker-4 正在执行任务
    ForkJoinPool.commonPool-worker-2 正在执行任务
    ForkJoinPool.commonPool-worker-1 正在执行任务
    ForkJoinPool.commonPool-worker-5 正在执行任务
    ForkJoinPool.commonPool-worker-4 正在执行任务
    ForkJoinPool.commonPool-worker-2 正在执行任务
    ForkJoinPool.commonPool-worker-3 正在执行任务
    ForkJoinPool.commonPool-worker-5 正在执行任务
    ForkJoinPool.commonPool-worker-1 正在执行任务
    获取其他信息耗时:406 毫秒
    Pool-Nexus3 正在执行任务
    获取用户姓名 耗时:454 毫秒
    Pool-Nexus2 正在执行任务
    获取用户姓名 耗时:656 毫秒
    ================调用第三方耗时:660 毫秒
    
    1个异步加多个completeFuture
    
    
    
    

    相关文章

      网友评论

          本文标题:学习使用CompleteFuture

          本文链接:https://www.haomeiwen.com/subject/kkfumltx.html