美文网首页java
CompletableFuture常见用法,Completabl

CompletableFuture常见用法,Completabl

作者: 运气爆棚lsw | 来源:发表于2022-02-07 10:32 被阅读0次
    CompletableFuture常见用法,CompletableFuture使用示例,CompletableFuture等待所有异步任务返回,包括常见的用法,结合自定义线程池使用。结合实际业务Demo代码运行效果如下,可直接运行查看效果,具体请参考如下代码

    1.创建CompletableFuture对象

    方法名  功能描述
    completedFuture(U value)    返回一个已经计算好的CompletableFuture
    runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
    runAsync(Runnable runnable, Executor executor)  使用指定的线程池执行任务,没有返回值
    supplyAsync(Supplier<U> supplier)   使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
    supplyAsync(Supplier<U> supplier, Executor executor)    使用指定的线程池执行任务,有返回值
    

    各种使用示例如下(可直接复制运行):

    public class Test {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1.创建CompletableFuture对象
            //  方法名  功能描述
            //  completedFuture(U value)    返回一个已经计算好的CompletableFuture
            //  runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
            //  runAsync(Runnable runnable, Executor executor)  使用指定的线程池执行任务,没有返回值
            //  supplyAsync(Supplier<U> supplier)   使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
            //  supplyAsync(Supplier<U> supplier, Executor executor)    使用指定的线程池执行任务,有返回值
    
            CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
            // 100
            System.out.println(intFuture.get());
    
            CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
            // null
            System.out.println(voidFuture.get());
    
            CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
            // hello
            System.out.println("stringFuture="+stringFuture.get());
    
    
            // 2.计算结果完成时.可以执行的方法
            // 方法名
            // whenComplete(BiConsumer<? super T,? super Throwable> action)
            // whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
            // whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "hello";
            }).whenComplete((v, e) -> {
                // hello
                System.out.println(v);
            });
            // hello
            System.out.println(future.get());
    
            // 3.转换,消费,执行
            // 方法名      功能描述
            // thenApply    获取上一个任务的返回,并返回当前任务的值
            // thenAccept   获取上一个任务的返回,单纯消费,没有返回值
            // thenRun      上一个任务执行完成后,开始执行thenRun中的任务
            CompletableFuture.supplyAsync(() -> {
                return "hello ";
            }).thenAccept(str -> {
                // hello world
                System.out.println(str + "world");
            }).thenRun(() -> {
                // task finish
                System.out.println("task finish");
            });
    
            // 4.组合(两个任务都完成)
            // 方法名  描述
            // thenCombine  组合两个future,获取两个future的返回结果,并返回当前任务的返回值
            // thenAcceptBoth   组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
            // runAfterBoth 组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
            CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
                return "欢迎关注 ";
            }).thenApply(t -> {
                return t + "微信公众号 ";
            }).thenCombine(CompletableFuture.completedFuture("Java"), (t, u) -> {
                return t + u;
            }).whenComplete((t, e) -> {
                // 欢迎关注 微信公众号 Java
                System.out.println(t);
            });
    
            // 5.组合(只需要一个任务完成)
            // 方法名  描述
            // applyToEither    两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值
            // acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值
            // runAfterEither   两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                return "欢迎关注微信公众号";
            });
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                return "Java";
            });
            CompletableFuture<String> future3 = future1.applyToEither(future2, str -> str);
    
            // 欢迎关注微信公众号 Java 随机输出
            System.out.println(future3.get());
    
    
            //  6.多任务组合
            //  方法名 描述
            //  allOf   当所有的CompletableFuture完成后执行计算
            //  anyOf   任意一个CompletableFuture完成后执行计算
            //  allOf的使用
            CompletableFuture<String> future1A = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "欢迎关注";
            });
            CompletableFuture<String> future2A = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "微信公众号";
            });
            CompletableFuture<String> future3A = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "Java识堂";
            });
    
            // 欢迎关注 微信公众号 Java
            CompletableFuture.allOf(future1A, future2A, future3A)
                    .thenApply(v -> {
                                List<Object> collect = Stream.of(future1, future2, future3A)
                                        .map(CompletableFuture::join)
                                        .collect(Collectors.toList());
                                return collect;
                            }
                    )
                    .thenAccept(System.out::print);
    
            // anyOf的使用
            CompletableFuture<String> future1C = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "欢迎关注";
            });
            CompletableFuture<String> future2C = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "微信公众号";
            });
            CompletableFuture<String> future3C = CompletableFuture.supplyAsync(() -> {
                sleepRandom();
                return "Java";
            });
            CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1C, future2C, future3C);
            // 欢迎关注 微信公众号 Java 随机输出
            System.out.println(resultFuture.get());
    
    
            // 7.异常处理
            // exceptionally    捕获异常,进行处理
            CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
                return 100 / 0;
            }).thenApply(num -> {
                return num + 10;
            }).exceptionally(throwable -> {
                return 0;
            });
            // 0
            System.out.println(future.get());
            // 当然有一些接口能捕获异常
    
            CompletableFuture futureAAA = CompletableFuture.supplyAsync(() -> {
                String str = null;
                return str.length();
            }).whenComplete((v, e) -> {
                if (e == null) {
                    System.out.println("正常结果为" + v);
                } else {
                    // 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
                    System.out.println("发生异常了" + e.toString());
                }
            });
    
    
            // 8.集合业务使用示例,假设stringList为业务执行集合
            ExecutorService executor = new ThreadPoolExecutor(10, 16, 10, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1000));
    
            List<String> stringList = new ArrayList<>();
            for (int i = 0; i < 100; i++) {
                stringList.add("a" + i);
            }
    
            CompletableFuture<Void> all = null;
            CompletableFuture<Void> all1 = null;
            // 开始我们的业务处理
            for (String personName : stringList) {
                CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
                    return null;
                }).whenComplete((v, e) -> {
                    if (e == null) {
                        System.out.println("正常结果为" + v);
                    } else {
                        // 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
                        System.out.println("发生异常了" + e.toString());
                    }
                });
    
                all = CompletableFuture.allOf(stringCompletableFuture);
    
                CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
                    // 模拟业务逻辑,say hello world
                    System.out.println(personName + ": Hello World!");
                    return "task finished!";
                });
                all1 = CompletableFuture.allOf(stringCompletableFuture1);
            }
    
            // 开始等待所有任务执行完成
            all.join();
            all1.join();
    
            // 使用JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
            long start = System.currentTimeMillis();
    
            if (CollectionUtils.isEmpty(stringList)) {
                return;
            }
            final CompletableFuture[] completableFutures = stringList.stream().
                    map(t -> CompletableFuture
                            .supplyAsync(() -> pause(t), executor)
                            .whenComplete((result, th) -> {
                                System.out.println("hello" + result);
                            })).toArray(CompletableFuture[]::new);
    
            // 开始等待所有任务执行完成
            System.out.println("start block");
            CompletableFuture.allOf(completableFutures).join();
            System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
    
            stringList.forEach(name -> CompletableFuture.supplyAsync(() -> {
                // 封装了业务逻辑
                System.out.println("name = " + name);
                return "success";
            }).exceptionally(e -> {
                System.out.println(e);
                return "false";
            }));
    
            // 关闭线程池
            executor.shutdown();
        }
    
        private static void sleepRandom() {
            System.out.println("测试方法");
        }
    
        public static String pause(String name) {
            try {
                Thread.sleep(300);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return name;
        }
    }
    
    

    简约写法CompletableFuture执行任务并返回

    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.stream.Collectors;
    
    /**
     * @author lisanwei24282
     */
    public class CompletableFutureProblem {
        public void batchOperation() {
            /* 集合任务,统一处理 */
            List<String> stringList = new ArrayList<>();
            stringList.add("task1");
            stringList.add("task2");
    
            /* 任务提交汇总 */
            List<CompletableFuture<String>> futures = new ArrayList<>();
            stringList.parallelStream().forEach(str -> {
                /* 调用业务方法 */
                CompletableFuture<String> response = restApiCall(str);
                futures.add(response);
            });
    
            /* 等待所有返回 */
            CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
            CompletableFuture<List<String>> convertedResult = result.thenApply(v ->
                    futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
            );
    
            /* 获取返回数据结果 */
            try {
                List<String> finishedTask = convertedResult.get();
                System.out.println(finishedTask.toString());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /* 假设执行具体业务逻辑功能 */
        public CompletableFuture<String> restApiCall(String str) {
            return CompletableFuture.supplyAsync(() -> {
                return "Complete-" + str;
            });
        }
    
        public static void main(String[] args) {
            CompletableFutureProblem problem = new CompletableFutureProblem();
            problem.batchOperation();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:CompletableFuture常见用法,Completabl

        本文链接:https://www.haomeiwen.com/subject/blqmkrtx.html