美文网首页
CompletableFuture入门

CompletableFuture入门

作者: 不知名的蛋挞 | 来源:发表于2018-08-02 10:51 被阅读79次

    CompletableFuture介绍

    Future 接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

    • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
    • 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
    • 等待 Future 集合中的所有任务都完成。
    • 当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。

    等等之类的,新的CompletableFuture将使得这些成为可能。

    CompletableFuture实现了Future<T>, CompletionStage<T>两个接口。所以还是可以像以前一样通过阻塞或轮询的方式获得结果,尽管这种方式不推荐使用。

    虽说 CompletableFuture 实现了 Future 接口,但它多数方法源自于 CompletionStage。CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。

    如下我们使用new的方式创建一个CompletableFuture(这种方式不常用来创建一个CompletableFuture,此处仅为了说明情况),用阻塞的方式得到了结果:

    public class CompletableFutureInAction {
    
        private static Random RAMDOM = new Random(System.currentTimeMillis());
    
        public static void main(String[] args)
                throws ExecutionException, InterruptedException {
            CompletableFuture<Double> completableFuture = new CompletableFuture();
            new Thread(()->{
                double value = get();
                completableFuture.complete(value);
            }).start();
    
            System.out.print("===========no==block");
            Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println);
        }
    
        private static double get(){
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return RAMDOM.nextDouble();
        }
    }
    

    运行之后会立刻输出:

    ===========no==block
    

    过了n秒就会输出一个随机数:

    0.39273128973
    

    如果我们想实现方法完成时会收到通知,而不是一直阻塞在结果的获取上面,可以这样写:

    public class CompletableFutureInAction {
    
        private static Random RAMDOM = new Random(System.currentTimeMillis());
    
        public static void main(String[] args)
                throws ExecutionException, InterruptedException {
            CompletableFuture<Double> completableFuture = new CompletableFuture();
            new Thread(()->{
                double value = get();
                completableFuture.complete(value);
            }).start();
    
            System.out.print("===========no==block");
            completableFuture.whenComplete((v,t)->{
                Optional.ofNullable(v).ifPresent(System.out::println);
                Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
            });
        }
    
        private static double get(){
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return RAMDOM.nextDouble();
        }
    }
    

    CompletableFuture的基本使用

    【创建CompletableFuture对象】

    CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。

    public static <U> CompletableFuture<U> completedFuture(U value)
    

    而以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

    public static CompletableFuture<Void>   runAsync(Runnable runnable)
    public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
    

    以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

    • runAsync方法:它以Runnabel函数式接口类型为参数,所以CompletableFuture的计算结果为空。
    • supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。

    这四个方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务。

    【计算结果完成时的处理】

    当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

    public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)
    

    可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

    方法不以Async结尾,意味着Action使用相同的线程执行,而如果以Async结尾且没有指定Executor,则从ForkJoinPool.commonPool()中获取线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

    如果你用过Future,就会知道糟糕的时候有多糟糕。幸运的是,CompletableFuture有一个漂亮的对应手段,通过使用exceptionally。exceptionally给我们一个机会恢复,通过执行当异步执行的计算抛出exception时备选的方法(alternative method)。

    代码示例:

    public class CompletableFutureInAction2 {
    
        private static Random RAMDOM = new Random(System.currentTimeMillis());
    
        public static void main(String[] args)
                throws ExecutionException, InterruptedException {
            CompletableFuture.supplyAsync(CompletableFutureInAction2::get)
                             .whenComplete((v,t)->{
                                 Optional.ofNullable(v).ifPresent(System.out::println);
                                 Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
                             });
            System.out.print("===========no==block");
        }
    
        private static double get(){
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return RAMDOM.nextDouble();
        }
    }
    

    输出结果:

    ===========no==block
    

    为什么没有结果输出呢?因为whenComplete里面的线程是守护线程,所有的“非后台线程”结束时,程序也就终止了,同时会杀死进程中所有后台线程:main就是一个非后台线程。所以运行之后会直接输出===========no==block,但方法没有执行完那当然没有任何结果输出。如果我们想得到结果可以在 System.out.print("===========no==block");后面加上一句:

    Thread.currentThread().join();
    

    这样main线程会等待CompletableFuture的线程结束之后才能继续运行。

    对于线程池中的所有线程默认都为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。所以线程池的生命周期要比创建它们的程序生命周期要长,我们必须使用shutdown()方法手动退出。我们知道Executors可以用来创建一个线程池,如果我们想让Executors创建的线程池中的线程自动结束,可以使用如下方法:

     public static void main(String[] args) {
            // ExecutorService 默认不是守护线程 默认false setDaemon(false)
            ExecutorService executor = Executors.newFixedThreadPool(2,r->{
                // 对线程池里面的线程进行初始化设置
                Thread t = new Thread();
                t.setDaemon(true);  // 设置线程为守护线程
                return t;
            });
            
            executor.execute(()->System.out.print("test"));
    
            // 不用执行shutdown,线程池中的线程随着main()方法执行完也随之退出
        }
    

    执行supplyAsync的线程是守护线程,所以main()函数执行完了这个线程也随之被杀死。而如果我们把线程池中的所有线程默认都转换为非后台线程,然后用这个线程池中的县城去执行supplyAsync,我们就不用依赖join(),这样主线程退出时不会直接退出JVM,我们就可以等到方法输出结果。

    public class CompletableFutureInAction2 {
        private static Random RAMDOM = new Random(System.currentTimeMillis());
    
        public static void main(String[] args)
                throws ExecutionException, InterruptedException {
            AtomicBoolean finished = new AtomicBoolean(false);
    
            ExecutorService executor = Executors.newFixedThreadPool(2,r->{
                Thread t = new Thread();
                t.setDaemon(false);
                return t;
            });
            
            // CompletableFuture.supplyAsync(executor)  whenComplete
            // 上面两步都是 [pool-1-thread-1]执行 即两个步骤是同一个线程执行
            CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executor)
                    .whenComplete((v,t)->{
                        Optional.ofNullable(v).ifPresent(System.out::println);
                        finished.set(true);
                    });
            System.out.print("===========no==block");
    
           /* while(!finished.get()){
                Thread.sleep(1);
            }*/
        }
    
        private static double get(){
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return RAMDOM.nextDouble();
        }
    }
    

    做好了上面的准备,下面我们来看一下方法使用Async结尾和不使用有什么区别:

    public class CompletableFutureInAction2 {
        private static Random RAMDOM = new Random(System.currentTimeMillis());
    
        public static void main(String[] args) {
            ExecutorService executorPool = Executors.newFixedThreadPool(2, run -> {
                Thread t = new Thread(run);
                t.setDaemon(false);
                return t;
            });
    
            //###1.0 CompletableFuture.supplyAsync(executorPool)  thenApply  whenComplete
            CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                    .thenApply(t->{
                        System.out.println("##1.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                        return t*10;
                    })
                    .whenComplete((v,t)->{
                        System.out.println("##1.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                        System.out.println(v);
                        t.printStackTrace();
                    });
            //### 上面三步都是 [pool-1-thread-1]执行 即三个步骤是同一个线程执行
    
            //###2.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync
            CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                    .thenApplyAsync(t->{
                        System.out.println("##2.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                        return t*10;
                    }).
                    whenCompleteAsync((v,t)->{
                       System.out.println("##2.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                       System.out.println(v);
                       t.printStackTrace();
                    });
            //### 上面三步supplyAsync=[pool-1-thread-2]  后面两个都是[ForkJoinPool.commonPool-worker-1]
            //### 原因由于调用这个时thenApplyAsync,没有指定Executor executor,然后又是因为异步,默认采用ForkJoin的连接池
            //### 由于工作量不大,ForkJoinPool并没有分配两个线程,ForkJoinPool.commonPool-worker-1承担了两份工作
    
            //###3.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync(executorpool)
            CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                    .thenApply(t->{
                        System.out.println("##3.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                        return t*10;
                    }).
                    whenCompleteAsync((v,t)->{
                        System.out.println("##3.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                        System.out.println(v);
                        t.printStackTrace();},executorPool);
            //### 上面三步supplyAsync和thenApply 都是[pool-1-thread-3]  whenCompleteAsync[pool-1-thread-4]
            //### 原因由于调用这个时thenApply和前者同一线程  whenCompleteAsync指定了线程
        }
    
        private static double get(){
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return RAMDOM.nextDouble();
        }
    }
    

    相关文章

      网友评论

          本文标题:CompletableFuture入门

          本文链接:https://www.haomeiwen.com/subject/adwkvftx.html