美文网首页异步&线程池
三、CompletableFuture

三、CompletableFuture

作者: 紫荆秋雪_文 | 来源:发表于2020-06-20 18:15 被阅读0次

    一、场景

    当在比较复杂的情况下,在需要远程调用,必然需要花费更多的时间

    • 1、获取sku的基本信息 0.5s
    • 2、获取sku的图片信息 1.0s
    • 3、获取sku的促销信息 2s
      执行完上面的任务需要3.5s,而当有更多的请求时任务时,消耗的时间会更多。如果有多个线程同时执行上面的任务,也许只需要2s就可以完成响应

    二、创建异步对象

    一、runAsync创建异步对象

        // 定义线程池
        public static ExecutorService executor = Executors.newFixedThreadPool(10);
    
        public static void main(String[] args) {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
            }, executor);
    
            System.out.println("main。。。。。end。。。。。");
        }
    
    • runAsync没有返回结果

    二、supplyAsync创建异步对象

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • supplyAsync可以返回结果

    三、计算完成时回调方法

    一、 whenComplete 获取结果

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).whenCompleteAsync((res, ex) -> {
                System.out.println("异步任务完成。。。。。结果是:" + res);
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    运行结果:5
    异步任务完成。。。。。结果是:5
    main。。。。。end。。。。。5
    
    • 获取异常
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 0;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).whenCompleteAsync((res, ex) -> {
                System.out.println("异步任务完成。。。。。结果是:" + res);
                System.out.println("异步任务完成。。。。。异常是:" + ex);
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    异步任务完成。。。。。结果是:null
    异步任务完成。。。。。异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    

    二、 exceptionally感知到异常时,返回默认值

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 0;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).whenCompleteAsync((res, ex) -> {
                System.out.println("异步任务完成。。。。。结果是:" + res);
                System.out.println("异步任务完成。。。。。异常是:" + ex);
            }, executor).exceptionally(ex -> 100);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    异步任务完成。。。。。结果是:null
    异步任务完成。。。。。异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    main。。。。。end。。。。。100
    
    • 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行

    三、handle方法

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).handleAsync((res, ex) -> {
                if (ex != null) {
                    return res * 10;
                }
                return 100;
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    运行结果:5
    main。。。。。end。。。。。100
    

    四、线程串行化方法

    1、thenRunAsync

    
        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).thenRunAsync(() -> {
                System.out.println("任务2启动了。。。。。");
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    运行结果:5
    任务2启动了。。。。。
    main。。。。。end。。。。。null
    
    • 无法获取上个任务结果

    2、thenAcceptAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).thenAcceptAsync((res) -> {
                System.out.println("任务2启动了。。。。。,上一次任务结果:" + res);
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    运行结果:5
    任务2启动了。。。。。,上一次任务结果:5
    main。。。。。end。。。。。null
    
    • 可以获取上一个任务结果

    3、thenApplyAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("当前线程:" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("运行结果:" + i);
                return i;
            }, executor).thenApplyAsync((res) -> {
                System.out.println("任务2启动了。。。。。,上一次任务结果:" + res);
                return "任务2返回结果结果:" + res;
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    当前线程:pool-1-thread-1
    运行结果:5
    任务2启动了。。。。。,上一次任务结果:5
    main。。。。。end。。。。。任务2返回结果结果:5
    
    • thenApplyAsync:既可以获得上个任务结果又可以返回结果

    • 小结

    • thenApply方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值

    • thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果

    • thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作

    五、两任务组合

    runAfterBothAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return "Raven";
            }, executor);
    
            future01.runAfterBothAsync(future02, () -> {
                System.out.println("任务3线程开始。。。");
            }, executor);
    
            System.out.println("main。。。。。end。。。。。");
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    main。。。。。end。。。。。
    任务2线程结束。。。
    任务3线程开始。。。
    

    thenAcceptBothAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return "Raven";
            }, executor);
    
            future01.thenAcceptBothAsync(future02, (f1, f2) -> {
                System.out.println("任务3线程开始。。。" + f1 + " ---- " + f2);
            }, executor);
    
            System.out.println("main。。。。。end。。。。。");
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    main。。。。。end。。。。。
    任务2线程结束。。。
    任务3线程开始。。。5 ---- Raven
    

    thenCombineAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return "Raven";
            }, executor);
    
            CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
                System.out.println("任务3线程开始。。。" + f1 + " ---- " + f2);
                return f1 + ":----" + f2 + "--->Haha";
            }, executor);
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    任务2线程结束。。。
    任务3线程开始。。。5 ---- Raven
    main。。。。。end。。。。。5:----Raven--->Haha
    
    • 小结
    • 1、runAfterBoth:组合两个 future ,不需要获取 future 结果,只需两个 future 处理完任务后,处理该任务。
    • 2、thenAcceptBoth:组合两个 future ,获取两个 future 任务的返回结果 ,然后处理任务,没有返回值
    • 3、thenCombine:组合两个 future ,获取两个 future 任务的返回结果 ,并返回当前任务的返回值

    六、两任务组合-任何一个完成

    runAfterEitherAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<String> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return "Raven";
            }, executor);
    
            CompletableFuture<Void> future = future01.runAfterEitherAsync(future02, () -> {
                System.out.println("任务3线程开始。。。");
            }, executor);
    
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    任务3线程开始。。。
    main。。。。。end。。。。。null
    任务2线程结束。。。
    

    acceptEitherAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return 22;
            }, executor);
    
            CompletableFuture<Void> future = future01.acceptEitherAsync(future02, (res) -> {
                System.out.println("任务3线程开始。。。" + res);
            }, executor);
    
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    任务3线程开始。。。5
    main。。。。。end。。。。。null
    任务2线程结束。。。
    

    applyToEitherAsync

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return 22;
            }, executor);
    
            CompletableFuture<String> future = future01.applyToEitherAsync(future02, (res) -> {
                System.out.println("任务3线程开始。。。" + res);
                return "任务3线程开始。。。" + res;
            }, executor);
    
    
            System.out.println("main。。。。。end。。。。。" + future.get());
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    任务3线程开始。。。5
    main。。。。。end。。。。。任务3线程开始。。。5
    任务2线程结束。。。
    
    • 小结
    • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
    • acceptEither:两个任务有一个任务完成,获取它的返回值,处理任务,没有新的返回值
    • runAfterEither:两个任务有一个任务完成,不需要获取 future 的结果,处理任务,也没有返回值。

    七、多任务组合

    allOf:等待所有任务完成

       public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return 22;
            }, executor);
    
            CompletableFuture<Integer> future03= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务3线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务3线程结束。。。");
                return 23;
            }, executor);
    
            CompletableFuture<Void> future = CompletableFuture.allOf(future01, future02, future03);
            // 等待所有结果完成
            future.get();
            System.out.println("main。。。。。end。。。。。");
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    任务3线程。。。pool-1-thread-3
    任务2线程结束。。。
    任务3线程结束。。。
    main。。。。。end。。。。。
    

    anyOf:只要有一个任务完成

        public static void main(String[] args) throws Exception {
            System.out.println("main。。。。。start。。。。。");
    
            CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
                System.out.println("任务1线程。。。" + Thread.currentThread().getName());
                int i = 10 / 2;
                System.out.println("任务1线程结束。。。");
                return i;
            }, executor);
    
            CompletableFuture<Integer> future02= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务2线程。。。" + Thread.currentThread().getName());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务2线程结束。。。");
                return 22;
            }, executor);
    
            CompletableFuture<Integer> future03= CompletableFuture.supplyAsync(() -> {
                System.out.println("任务3线程。。。" + Thread.currentThread().getName());
                System.out.println("任务3线程结束。。。");
                return 23;
            }, executor);
    
            CompletableFuture<Object> future = CompletableFuture.anyOf(future01, future02, future03);
            // 等待所有结果完成
            Object obj = future.get();
            System.out.println("main。。。。。end。。。。。" + obj);
        }
    
    • 运行结果
    main。。。。。start。。。。。
    任务1线程。。。pool-1-thread-1
    任务1线程结束。。。
    任务2线程。。。pool-1-thread-2
    main。。。。。end。。。。。5
    任务3线程。。。pool-1-thread-3
    任务3线程结束。。。
    任务2线程结束。。。
    
    

    相关文章

      网友评论

        本文标题:三、CompletableFuture

        本文链接:https://www.haomeiwen.com/subject/krqgxktx.html