美文网首页
Java多线程之CompletableFuture

Java多线程之CompletableFuture

作者: 上善若泪 | 来源:发表于2022-04-28 21:12 被阅读0次

    1 CompletableFuture

    此文章字数多,实际是例子多,多把例子写几遍,受益颇丰

    1.1 简介

    CompletableFuturejava.util.concurrent库在java 8中新增的主要工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性

    CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利
    点击此处了解JMM线程池讲解

    CompletableFutureFutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果

    image.png

    1.2 创建CompletableFuture

    CompletableFuture 在创建时,如果传入线程池,那么会去指定的线程池工作。如果没传入,那么回去默认的 ForkJoinPool
    ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个小任务,把多个小任务放到多个处理器核心上并行执行;当多个小任务执行完成之后,再将这些执行结果合并起来即可。

    ForkJoinPoolExecutorService的实现类,因此是一种特殊的线程池。
    使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPoolsubmit(ForkJoinTask<T> task)invoke(ForkJoinTask<T> task)方法来执行指定任务了。
    其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务

    1.2.1 构造函数创建

    最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture还没有任何计算结果,这时调用join,当前线程会一直阻塞在这里。

    CompletableFuture<String> future = new CompletableFuture();
    String result = future.join();
    System.out.println(result);
    

    此时,如果在另外一个线程中,主动设置该CompletableFuture的值,则上面线程中的结果就能返回。

    future.complete("test");
    

    1.2.2 supplyAsync创建

    CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

    supplyAsync有两种签名:

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    

    第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin线程池来执行被提交的任务。
    第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。
    下面为使用supplyAsync创建CompletableFuture的示例:

    CompletableFuture<String> future 
        = CompletableFuture.supplyAsync(()->{
          System.out.println("compute test");
          return "test";
    });
     
    String result = future.join();
    System.out.println("get result: " + result);
    

    在示例中,异步任务中会打印出compute test,并返回test作为最终计算结果。所以,最终的打印信息为get result: test

    1.2.3 runAsync创建

    CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种签名:

    public static CompletableFuture<Void> runAsync(Runnable runnable)
     
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    

    下面为使用runAsync()的例子:

    CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
                System.out.println("compute test");
            });
    
    System.out.println("get result: " + future.join());
    
    由于任务没有返回值, 所以最后的打印结果是"get result: null"。
    

    1.3 异步回调方法

    Future相比,CompletableFuture最大的不同是支持流式(Stream)的计算处理,多个任务之间,可以前后相连,从而形成一个计算流。比如:任务1产生的结果,可以直接作为任务2的入参,参与任务2的计算,以此类推。

    CompletableFuture中常用的流式连接函数包括:

    • thenApply——有入参有返回
      thenApplyAsync——有入参有返回
    • thenAccept——有入参无返回
      thenAcceptAsync——有入参无返回
    • thenRun——无入参无返回
      thenRunAsync——无入参无返回
    • thenCombine
      thenCombineAsync
    • thenCompose
      thenComposeAsync
    • whenComplete
      whenCompleteAsync
    • handle
      handleAsync

    其中,带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。因此,为了快速理解,在接下来的介绍中,我们主要介绍不带Async的版本。

    1.3.1 thenApply / thenAccept / thenRun互相依赖

    这里将thenApply / thenAccept / thenRun放在一起讲,因为这几个连接函数之间的唯一区别是提交的任务类型不一样 :

    • thenApply提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果
    • thenAccept提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果
    • thenRun提交的任务类型需遵从Runnable签名,即没有入参也没有返回值
    CompletableFuture<Integer> future1 
        = CompletableFuture.supplyAsync(()->{
            System.out.println("compute 1");
            return 1;
     });
     CompletableFuture<Integer> future2 
        = future1.thenApply((p)->{
             System.out.println("compute 2");
             return p+10;
     });
     System.out.println("result: " + future2.join());
    

    在上面的示例中,future1通过调用thenApply将后置任务连接起来,并形成future2。该示例的最终打印结果为11,可见程序在运行中,future1的结果计算出来后,会传递给通过thenApply连接的任务,从而产生future2的最终结果为1+10=11。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。

    需要注意的是,通过thenApply / thenAccept / thenRun连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。

    1.3.1.1 thenApply

    thenApply表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,测试用例如下:

    @Test
    public void test5() throws Exception {
            ForkJoinPool pool=new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            },pool);
            //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
            //thenApply这里实际创建了一个新的CompletableFuture实例
            CompletableFuture<String> cf2=cf.thenApply((result)->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return "test:"+result;
            });
            System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->"+cf.get());
            System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
            System.out.println("run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    
    image.png

    job1执行结束后,将job1的方法返回值作为入参传递到job2中并立即执行job2。thenApplyAsync与thenApply的区别在于,前者是将job2提交到线程池中异步执行,实际执行job2的线程可能是另外一个线程,后者是由执行job1的线程立即执行job2,即两个job都是同一个线程执行的。将上述测试用例中thenApply改成thenApplyAsync后,执行结果如下:


    image.png

    从输出可知,执行job1和job2是两个不同的线程。thenApplyAsync有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()。 下述的多个方法,每个方法都有两个以Async结尾的方法,一个使用默认的Executor实现,一个使用指定的Executor实现,不带Async的方法是由触发该任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务的线程不一定是同一个

    1.3.1.2 thenAccept / thenRun

    thenAcceptthenApply 接收上一个任务的返回值作为参数,但是无返回值;thenRun 的方法没有入参,也买有返回值,测试用例如下:

    @Test
        public void test6() throws Exception {
            ForkJoinPool pool=new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            },pool);
            //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
            CompletableFuture cf2=cf.thenApply((result)->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return "test:"+result;
            }).thenAccept((result)-> { //接收上一个任务的执行结果作为入参,但是没有返回值
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(result);
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
            }).thenRun(()->{ //无入参,也没有返回值
                System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("thenRun do something");
                System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
            });
            System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->"+cf.get());
            System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
            //cf2 等待最后一个thenRun执行完成
            System.out.println("run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    执行结果


    image.png

    1.3.2 exceptionally有返回

    exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStageresult就是该任务正常执行的结果

    @Test
        public void test2() throws Exception {
            ForkJoinPool pool=new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if(true){
                    throw new RuntimeException("test");
                }else{
                    System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                    return 1.2;
                }
            },pool);
            //cf执行异常时,将抛出的异常作为入参传递给回调方法
            CompletableFuture<Double> cf2= cf.exceptionally((param)->{
                 System.out.println(Thread.currentThread()+" start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("error stack trace->");
                param.printStackTrace();
                System.out.println(Thread.currentThread()+" exit,time->"+System.currentTimeMillis());
                 return -1.1;
            });
            //cf正常执行时执行的逻辑,如果执行异常则不调用此逻辑
            CompletableFuture cf3=cf.thenAccept((param)->{
                System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("param->"+param);
                System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
            });
            System.out.println("main thread start,time->"+System.currentTimeMillis());
            //等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了
            //cf2.get时,没有异常,但是依然有返回值,就是cf的返回值
            System.out.println("run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    
    image.png

    1.3.3 whenComplete无返回

    whenComplete主要用于注入任务完成时的回调通知逻辑。这个解决了传统future在任务完成时,无法主动发起通知的问题。前置任务会将计算结果或者抛出的异常作为入参传递给回调通知函数。

    以下为示例:

    CompletableFuture<Integer> future1 
        = CompletableFuture.supplyAsync(()->{
            System.out.println("compute 1");
            return 1;
    });
    CompletableFuture future2 
        = future1.whenComplete((r, e)->{
            if(e != null){
                System.out.println("compute failed!");
            } else {
                System.out.println("received result is " + r);
            }
    });
    System.out.println("result: " + future2.join());
    

    需要注意的是,future2获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果。

    @Test
        public void test10() throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if(false){
                    throw new RuntimeException("test");
                }else{
                    System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                    return 1.2;
                }
            });
            //cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
            CompletableFuture<Double> cf2=cf.whenComplete((a,b)->{
                System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if(b!=null){
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                }else{
                    System.out.println("run succ,result->"+a);
                }
                System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
            });
            //等待子任务执行完成
            System.out.println("main thread start wait,time->"+System.currentTimeMillis());
            //如果cf是正常执行的,cf2.get的结果就是cf执行的结果
            //如果cf是执行异常,则cf2.get会抛出异常
            System.out.println("run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    1.3.4 handle有返回

    handlewhenComplete的作用有些类似,但是handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果。handle方法返回的CompletableFutureresult是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFutureresult无关了

    以下为示例:

    CompletableFuture<Integer> future1
         = CompletableFuture.supplyAsync(()->{
             System.out.println("compute 1");
             return 1;
     });
     CompletableFuture<Integer> future2
         = future1.handle((r, e)->{
             if(e != null){
                 System.out.println("compute failed!");
                 return r;
             } else {
                 System.out.println("received result is " + r);
                 return r + 10;
             }
     });
     System.out.println("result: " + future2.join());
    

    在以上示例中,打印出的最终结果为11。说明经过handle计算后产生了新的结果

     @Test
        public void test10() throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+"job1 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if(true){
                    throw new RuntimeException("test");
                }else{
                    System.out.println(Thread.currentThread()+"job1 exit,time->"+System.currentTimeMillis());
                    return 1.2;
                }
            });
            //cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
            CompletableFuture<String> cf2=cf.handle((a,b)->{
                System.out.println(Thread.currentThread()+"job2 start,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if(b!=null){
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                }else{
                    System.out.println("run succ,result->"+a);
                }
                System.out.println(Thread.currentThread()+"job2 exit,time->"+System.currentTimeMillis());
                if(b!=null){
                    return "run error";
                }else{
                    return "run succ";
                }
            });
            //等待子任务执行完成
            System.out.println("main thread start wait,time->"+System.currentTimeMillis());
            //get的结果是cf2的返回值,跟cf没关系了
            System.out.println("run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    1.4 异步组合方法

    1.4.1 thenCombine / thenAcceptBoth / runAfterBoth互相不依赖

    这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果

    thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。

    CompletableFuture<Integer> future1 
        = CompletableFuture.supplyAsync(()->{
         System.out.println("compute 1");
         return 1;
     });
     CompletableFuture<Integer> future2 
        = CompletableFuture.supplyAsync(()->{
         System.out.println("compute 2");
         return 10;
     });
     CompletableFuture<Integer> future3 
        = future1.thenCombine(future2, (r1, r2)->r1 + r2);
     System.out.println("result: " + future3.join());
    

    上面示例代码中,future1future2为独立的CompletableFuture任务,他们分别会在各自的线程中并行执行,然后future1通过thenCombinefuture2连接,并且以lamda表达式传入处理结果的表达式,该表达式代表的任务会将future1与future2的结果作为入参并计算他们的和。
    因此,上面示例代码中,最终的打印结果是11。

    一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。

    注意,thenAcceptBoth、thenAcceptBothAsync、runAfterBoth、runAfterBothAsync的作用与thenConbime类似,唯一不同的地方是任务类型不同,分别是BiConumser、Runnable

    @Test
        public void test7() throws Exception {
            ForkJoinPool pool=new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return 3.2;
            });
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
            CompletableFuture<Double> cf3=cf.thenCombine(cf2,(a,b)->{
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                System.out.println("job3 param a->"+a+",b->"+b);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                return a+b;
            });
     
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
            CompletableFuture cf4=cf.thenAcceptBoth(cf2,(a,b)->{
                System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
                System.out.println("job4 param a->"+a+",b->"+b);
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
            });
     
            //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
            CompletableFuture cf5=cf4.runAfterBoth(cf3,()->{
                System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                System.out.println("cf5 do something");
                System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
            });
     
            System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->"+cf.get());
            System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
            System.out.println("cf5 run result->"+cf5.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    1.4.2 applyToEither / acceptEither / runAfterEither

    这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务(其他线程依然会继续执行),其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下

    @Test
        public void test8() throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return 3.2;
            });
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
            CompletableFuture<Double> cf3=cf.applyToEither(cf2,(result)->{
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                System.out.println("job3 param result->"+result);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                return result;
            });
     
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
            CompletableFuture cf4=cf.acceptEither(cf2,(result)->{
                System.out.println(Thread.currentThread()+" start job4,time->"+System.currentTimeMillis());
                System.out.println("job4 param result->"+result);
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job4,time->"+System.currentTimeMillis());
            });
     
            //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
            CompletableFuture cf5=cf4.runAfterEither(cf3,()->{
                System.out.println(Thread.currentThread()+" start job5,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                System.out.println("cf5 do something");
                System.out.println(Thread.currentThread()+" exit job5,time->"+System.currentTimeMillis());
            });
     
            System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->"+cf.get());
            System.out.println("main thread start cf5.get(),time->"+System.currentTimeMillis());
            System.out.println("cf5 run result->"+cf5.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    1.4.3 thenCompose互相依赖

    前面讲了thenCombine主要用于没有前后依赖关系之间的任务进行连接。那么,如果两个任务之间有前后依赖关系,但是连接任务又是独立的CompletableFuture,该怎么实现呢?

    先来看一下直接使用thenApply实现:

    CompletableFuture<Integer> future1 
        = CompletableFuture.supplyAsync(()->{
             System.out.println("compute 1");
             return 1;
     });
     CompletableFuture<CompletableFuture<Integer>> future2 
        =  future1.thenApply(
            (r)->CompletableFuture.supplyAsync(()->r+10));
     System.out.println(future2.join().join());
    

    可以发现,上面示例代码中,future2的类型变成了CompletableFuture嵌套,而且在获取结果的时候,也需要嵌套调用join或者get。这样,当连接的任务越多时,代码会变得越来越复杂,嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

    看一下通过thenCompose如何实现上面的代码:

    CompletableFuture<Integer> future1 
      = CompletableFuture.supplyAsync(()->{
          System.out.println("compute 1");
          return 1;
      });
    CompletableFuture<Integer> future2 
        = future1.thenCompose(
            (r)->CompletableFuture.supplyAsync(()->r+10));
    System.out.println(future2.join());
    

    通过示例代码可以看出来,很明显,在使用了thenCompose后,future2不再存在CompletableFuture类型嵌套了,从而比较简洁的达到了我们的目的。

    thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务

        @Test
        public void test9() throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<String> cf2= cf.thenCompose((param)->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return CompletableFuture.supplyAsync(()->{
                    System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                    }
                    System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                    return "job3 test";
                });
            });
            System.out.println("main thread start cf.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->"+cf.get());
            System.out.println("main thread start cf2.get(),time->"+System.currentTimeMillis());
            System.out.println("cf2 run result->"+cf2.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    1.4.4 allOf / anyOf

    allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null
    anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。

     @Test
        public void test11() throws Exception {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job1,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job1,time->"+System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job2,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread()+" exit job2,time->"+System.currentTimeMillis());
                return 3.2;
            });
            CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread()+" start job3,time->"+System.currentTimeMillis());
                try {
                    Thread.sleep(1300);
                } catch (InterruptedException e) {
                }
    //            throw new RuntimeException("test");
                System.out.println(Thread.currentThread()+" exit job3,time->"+System.currentTimeMillis());
                return 2.2;
            });
            //allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
            //anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
            CompletableFuture cf4=CompletableFuture.allOf(cf,cf2,cf3).whenComplete((a,b)->{
               if(b!=null){
                   System.out.println("error stack trace->");
                   b.printStackTrace();
               }else{
                   System.out.println("run succ,result->"+a);
               }
            });
     
            System.out.println("main thread start cf4.get(),time->"+System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf4 run result->"+cf4.get());
            System.out.println("main thread exit,time->"+System.currentTimeMillis());
        }
    

    join()与get()区别在于join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常
    get() 可以指定超时时间

    例子转载于 :
    https://blog.csdn.net/qq_31865983/article/details/106137777
    https://blog.csdn.net/tongtest/article/details/107549749

    相关文章

      网友评论

          本文标题:Java多线程之CompletableFuture

          本文链接:https://www.haomeiwen.com/subject/xxtjyrtx.html