美文网首页
CompletableFuture理解

CompletableFuture理解

作者: 多喝水JS | 来源:发表于2019-04-01 23:15 被阅读0次

简介

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。
同时它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将多个异步计算的结果合并成一个

  • 等待Future集合中的所有任务都完成

  • Future完成事件(即,任务完成以后触发执行动作)

所以java一些框架,比如Netty,自己扩展了Java的 Future接口,提供了addListener观察者设计模式,当计算结果完成及时通知监听者。
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture的使用

public class CompletableFutureTest {

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("运行");
            return "success";
        });
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
输出:
运行
success

CompletableFuture类实现了CompletionStageFuture接口
所以可以通过get()方法获取结果

创建CompletableFuture对象

CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。

public static <U> CompletableFuture<U> completedFuture(U value)

而以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

runAsync方法也好理解,它以Runnable函数式接口类型为参数,所以CompletableFuture的计算结果为空。

supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。

计算结果完成时的处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同。也就是这个exceptionally方法用来处理异常的情况。

更多API查看
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html https://colobu.com/2016/02/29/Java-CompletableFuture/

原理解析

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("运行");
            return "success";
        });

创建一个CompletableFuture,并且CompletableFuture结果类型为String

 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

 private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

   static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        //f.get()获取结果
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

AsyncSupply<T> 继承 ForkJoinTask,交给ForkJoinPool执行,最终执行run()方法获取结果

相关文章

网友评论

      本文标题:CompletableFuture理解

      本文链接:https://www.haomeiwen.com/subject/sghibqtx.html