美文网首页
CompletableFuture异步编程

CompletableFuture异步编程

作者: 刘彦青 | 来源:发表于2019-10-16 23:59 被阅读0次

    CompletableFuture 有什么用

    • CompletableFuture是用来描述多线程任务的时序关系的:串行关系,并行关系,聚合关系。

    • CompletableFuture 是Java 8 新增加的Api,该类实现,Future和CompletionStage两个接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

    创建CompletableFuture对象

    方式一:使用默认线程池

    /**
     * 创建一个不带返回值得任务。
     */
    CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
         @Override
         public void run() {
          //业务逻辑          
         }
     });
     /**
      * 创建一个带返回值的任务。
      */
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            //业务逻辑
            return null;
        }
    });
    

    方式二:使用自定义线程池(建议使用)

     //创建线程池
    ExecutorService executor =  Executors.newFixedThreadPool(10); 
            
    //创建一个不带返回值得任务。
    CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
    
        }
    },executor);
    //创建一个带返回值的任务。
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            //业务逻辑
            return null;
        }
    },executor);    
    
    1. 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的 核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool 线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操 作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
    2. 创建完CompletableFuture对象之后,会自动地异步执行runnable.run()方法或者supplier.get()方法。因为CompletableFuture类实现了Future接口,所以这两个问题你都可以通过Future接口来解决。另外,CompletableFuture类还实现了CompletionStage接口。

    常用API

    • public T get():获取计算结果, 该方法为阻塞方法会等待计算结果完成。
    • public T get(long timeout,TimeUnit unit):有时间限制的阻塞方法
    • public T getNow(T valueIfAbsent)立即获取方法结果,如果没有计算结束则返回传的值
    • public T join()和 get() 方法类似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差别。
    • public boolean complete(T value):立即完成计算,并把结果设置为传的值,返回是否设置成功
    • public boolean completeExceptionally(Throwable ex):立即完成计算,并抛出异常

    理解CompletionStage接口

    CompletionStage接口可以清晰地描述任务之间的这种时序关系,时序关系:串行,并行,汇聚。

    串行

    线程与线程之间的执行顺序是串行的。

    //Async代表的是异步执行fn
    CompletionStage<R>  thenApply(fn); 
    CompletionStage<R>  thenApplyAsync(fn); 
    CompletionStage<Void>   thenAccept(consumer); 
    CompletionStage<Void>   thenAcceptAsync(consumer); 
    CompletionStage<Void>   thenRun(action); 
    CompletionStage<Void>   thenRunAsync(action); 
    CompletionStage<R>  thenCompose(fn); 
    CompletionStage<R>  thenComposeAsync(fn);
    
    • CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四 个系列的接口。
      • thenApply系列函数里参数fn的类型是接口Function<T, R>,这个接口里与CompletionStage相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以thenApply系列方法返回的 是CompletionStage<R>。
    • 而thenAccept系列方法里参数consumer的类型是接口Consumer<T>,这个接口里与CompletionStage相关 的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以thenAccept系列方法返回 的是CompletionStage<Void>。
      • thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,所以thenRun 系列方法返回的也是CompletionStage<Void>。
      • 这些方法里面Async代表的是异步执行fn、consumer或者action。其中,需要你注意的是thenCompose系列 方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。

    演示串行

    //supplyAsync()启动一个异步 流程
    CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
        () -> "Hello World")                        //①     
        .thenApply(s -> s + "girl")     //②
        .thenApply(String::toUpperCase);//③
    System.out.println(f0.join());
    //输出结果 HELLO WORLD  girl
    

    虽然这是一个异步流程,但任务①②③却是 串行执行的,②依赖①的执行结果,③依赖②的执行结果。

    AND汇聚

    CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

    CompletionStage<R>  thenCombine(other,  fn); 
    CompletionStage<R>  thenCombineAsync(other, fn); 
    CompletionStage<Void>   thenAcceptBoth(other,   consumer); CompletionStage<Void>    thenAcceptBothAsync(other,  consumer); CompletionStage<Void>    runAfterBoth(other, action); 
    CompletionStage<Void>   runAfterBothAsync(other,    action);
    
    

    演示:

            // 启动一个异步流程f1
            CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
                System.out.println("异步任务-1");
    
            });
            // 启动一个异步流程f2
            CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
                String s = "异步任务-2";
                System.out.println(s);
                return  s;
    
            });
            //启动一个汇聚流程f3
            CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{
                return tf;//tf是f2的值
            });
    
            //等待任务3执行结果
            System.out.println(f3.join());
    
    OR汇聚

    CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列的 接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

    CompletionStage applyToEither(other,    fn);
    CompletionStage applyToEitherAsync(other,   fn);
    CompletionStage acceptEither(other, consumer);
    CompletionStage acceptEitherAsync(other,    consumer);
    CompletionStage runAfterEither(other,   action); 
    CompletionStage runAfterEitherAsync(other,  action);
    
    

    功能演示:

             // 启动一个异步流程f1
            CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
                int t   =    new Random().nextInt(10);
                System.out.println("f1-t = "+t);
                sleep(t,    TimeUnit.SECONDS);
                return  String.valueOf(t);
            });
         // 启动一个异步流程f2
            CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
                int t   =    new Random().nextInt(10);
                System.out.println("f2-t = "+t);
                sleep(t,    TimeUnit.SECONDS);
                return  String.valueOf(t);
            });
            //将f1和f2的值汇总到f3
            CompletableFuture<String> f3 = f1.applyToEither(f2,s    ->
                        Integer.parseInt(f2.join())+Integer.parseInt(s)+""
            );
            System.out.println(f3.join());
    

    **** 码字不易如果对你有帮助请给个关注****

    **** 爱技术爱生活 QQ群: 894109590****

    相关文章

      网友评论

          本文标题:CompletableFuture异步编程

          本文链接:https://www.haomeiwen.com/subject/dwjbuctx.html