美文网首页
Java:CompletableFuture一文搞定

Java:CompletableFuture一文搞定

作者: byamao1 | 来源:发表于2018-09-17 18:39 被阅读0次

    Prefix

    CompletableFuture本身是为了解决任务的执行和衔接。在后面我们可以看到,CompletableFuture的功能就是分派任务到线程,并以近乎于pipeline的概念将各任务进行编排,下游任务可以获取上游任务的结果。从方法名上我们可见pipeline

    complete -> whenComplete -> then* 
    

    后面我们会从例子探索pipeline的概念。

    注意以Async结尾的方法:

    不以Async结尾的方法由原来的线程计算;
    以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

    About CompletableFuture

    同步式
    尽管Future可以代表在另外的线程中执行的一段异步代码,但是你还是可以在本身线程(例如main)中执行:

    CompletableFuture<Integer> future = new CompletableFuture<>();
    

    上面的代码中future没有关联任何的Callback、线程池、异步任务等,如果客户端调用future.get就会一直傻等下去。

    future.get(); 
    //一直等下去。源码可以看到get调用了waitingGet,是循环获取结果。
    //因为没有调用complete这种方法生成结果,它就要一直循环
    

    你可以通过下面的代码完成一个计算,触发客户端的等待:

    future.complete(100);
    

    CompletableFuture.complete()只能调用一次,后续调用将被忽略。但也有一个后门叫做CompletableFuture.obtrudeValue(…)覆盖一个新Future之前的价值,请小心使用。
    Java 8:CompletableFuture终极指南

    异步式
    以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

    public static CompletableFuture<Void>   runAsync(Runnable runnable)
    public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)
    
    

    当运行完成时的回调操作

    • 当运行时出现了异常,可以通过exceptionally进行补偿(消化)
    • whenComplete,当计算完成,或者抛出异常的时候,可以对其结果或异常进行消费
    • handle,当计算完成,或者抛出异常的时候,可以对其结果或异常进行转换

    handle使用场景:
    exceptionally消化任务抛出的异常后,我们会有机会将此异常转换为和Future类型的兼容的一些值来进行恢复。safe进一步的转换将不再产生一个异常而是从提供功能的函数返回一个String值。
    Java 8:CompletableFuture终极指南

    这里有个关于任务的概念需要了解:

    whenCompletehandleexceptionally里面的运算代码算不算任务?可能在某些业务语境中是的,但至少在CompletableFuture的语义中我觉得不算。它们仅仅是任务的附属,而不能独立成为任务。从下面的例子就能反映:

        @Test
        public void testNoExceptionally() throws Exception {
            CompletableFuture<String> future = new CompletableFuture();
            // task1
            future.completeExceptionally(new Exception("测试抛异常"));
            future.whenComplete((s, t) -> {
                // 1
                log.info("1:{}", s);
                if(t != null)
                    log.error(t.getMessage());})
            .whenComplete((s, t) -> {
                // 2
                log.info("2:{}", s);
                if(t != null)
                    log.error(t.getMessage());})
            .thenApply(s -> {  // task2
                // 3
                log.info("3:{}", s);
                return s;})
            .exceptionally(e -> {
                // 4       异常中断了task2,在这里被消化
                log.error("4:{}", e.getMessage());
                return e.getMessage();
            })
            .join();
        }
    

    task1抛出异常,2个whenCompleteexceptionally都执行了,但是task2并没有被执行。说明了2个问题:

    • whenComplete实际上是归于task1
    • 只有exceptionally可以进行任务中异常的消化,否则异常会传递下去,中断后面的任务被执行,直到有exceptionally消化异常

    任务之间的承接

    注意:下游任务并不承接上游任务的异常。如果上游任务抛出异常而没有exceptionally消化,则下游任务不会被执行,所以这里的方法并没有像上面whenCompletehandle有异常参数:

    public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
    

    A->B

    • thenApply,针对上一步(A)结果进行转换(B)。
    • thenAccept,针对上一步结果进行消费
    • thenRun,对上一步的计算结果不关心

    A,B -> C

    • thenCombine,结合上一步(A)结果和本次(B)的结果,然后进行转换
    • thenAcceptBoth,结合上一步结果和本次的结果,然后进行消耗。消耗计算是在本次线程(本次线程就是上文提到的原来的线程)上执行;而Async是在线程池上执行。下面以此类推
    • runAfterBoth,不关心上一步和本次的结果,只关心它们运算完毕,然后进行下步操作
    • acceptEither,上一步和本次哪个快就用哪个的结果,然后消费

    辅助方法 allOf 和 anyOf

    • allOf方法是当所有的CompletableFuture都执行完后执行计算
    • anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

    探索一下pipeline的概念

    下面的代码中,我先后执行2个任务:task1task2

    task1中抛出异常
    1.task1如果抛异常,则打印异常信息;
    2.等task1完成后,task后处理打印task1的输出字符串或异常信息;
    3.task后处理如果抛异常,则打印异常信息;
    4.task2task后处理完成后,打印task1的输出字符串

        @Test
        public void testPipeline() throws Exception {
            CompletableFuture<String> future = new CompletableFuture();
            // task1
            future.completeExceptionally(new Exception("测试抛异常"));
            future.exceptionally(e -> {
                // 1
                log.error("1:{}", e.getMessage());
                return e.getMessage();
            }).whenComplete((s, t) -> { 
                // 2
                log.info("2:{}", s);
                if(t != null)   // 注意,t如果为null,会导致抛异常
                    log.error(t.getMessage());
            }).exceptionally(e -> {
                // 3    1将异常消化了,所以不会触发3
                log.error("3:{}", e.getMessage());
                return e.getMessage();
            }).thenAccept(s -> {  // task2
                // 4
                log.info("4:{}", s+" world");})
             .join();     //main线程等任务执行完再结束
        }
    

    结果是1、2、4 会被触发。
    下面我们做个修改:task1不抛出异常。

        @Test
        public void testPipeline2() throws Exception {
            CompletableFuture<String> future = new CompletableFuture();
            // task1
            future.complete("正常");
            future.exceptionally(e -> {
                // 1
                log.error("1:{}", e.getMessage());
                return e.getMessage();
            }).whenComplete((s, t) -> {   
                // 2
                log.info("2:{}", s);
                if(t != null) 
                    log.error(t.getMessage());
            }).exceptionally(e -> {
                // 3
                log.error("3:{}", e.getMessage());
                return e.getMessage();
            }).thenAccept(s -> {  // task2
                // 4
                log.info("4:{}", s+" world");})
                    .join(); 
        }
    

    那么只有2、4被触发。

    About thread

    allOf

    Q:allOfthenApply中的任务哪个线程执行?

    可以清楚的是,thenApply中的任务是在allOf返回的CompletableFuture的线程上执行。那么allOf返回的CompletableFuture的线程是什么?allOf返回的CompletableFuture的线程一般是allOf中最后一个完成的任务的线程。

    不过凡事总有例外。实际上无论是new CompletableFuture()还是CompletableFuture.supplyAsync都是立刻执行任务。如果main执行到allOf时,像下面的例子里allOf内的任务都已经结束了,即使future1是异步的,thenApply仍然是同步的(此处为main线程)。
    比如:

        @Test
        public void testCmopletableFuture() throws Exception {
    
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(this::getThreadName);
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(this::getThreadName);
    
            sleep(3000);
            CompletableFuture.allOf(future1, future2)  //  执行到allof,future1和 future2已经结束
                            .thenApply(t ->{
                                log.info(Thread.currentThread().getName());
                                return getThreadName(future1, future2);})
                            .exceptionally(e->{
                                log.info(Thread.currentThread().getName());
                                return Lists.newArrayList();})
                            .join();
        }
        private String getThreadName(){
            sleep(1000);
            String res = Thread.currentThread().getName();
            return res;
        }
    
        private List<String> getThreadName(CompletableFuture<String> future1, CompletableFuture<String> future2){
            List<String> res = Lists.newLinkedList();
            try{
                res.add(future1.get());
                res.add(future2.get());
            }catch(Exception e){
    
            }
            log.info("{}", res);
            return res;
        }
    
        private void sleep(long millis){
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    Ref

    简明扼要:CompletableFuture 详解 - 简书
    全面:Java CompletableFuture 详解 | 鸟窝

    相关文章

      网友评论

          本文标题:Java:CompletableFuture一文搞定

          本文链接:https://www.haomeiwen.com/subject/plirnftx.html