美文网首页java 基础
CompletableFuture实践

CompletableFuture实践

作者: 小le罗 | 来源:发表于2020-09-19 22:29 被阅读0次

    前言

      开发任务中,我们常常会开启一个新的子线程去专门执行较为耗时的代码,这也是最简单的异步调用。随着业务的场景的不断发展,JDK5新增了Future接口,用于描述异步计算的接口,然而Future用于获取异步结果的方法较少:
      1.通过轮询isDone判断任务执行完成后调用get方法获取结果,
      2.直接调用get方法,并设置超时时间,get方法会阻塞调用线程直到结果返回。
    不管哪种方式都会消耗额外的CPU资源,有违异步编程的初衷。
      CompletableFuture作为Future的一种强有力的扩展,提供了函数式编程能力,简化了异步编程的复杂性,通过回调的方式处理计算结果,并提供了完善的异常处理手段。默认依靠fork/join框架启动新的线程实现异步与并发。
      本文通过比较Future和CompletableFuture用法上的区别,领略CompletableFuture异步编程的优势,为了更好的理解,学习网上大神的生活场景举例法贯穿全文,最后以实际工作中遇到的异步使用场景进行实践分析,欢迎大家批评指正。

    生活场景举例

      如果想制作一道番茄炒鸡蛋盖饭,一般会对制作流程进行细化和分工。最常见的就是对主食(米饭)和菜品(番茄炒鸡蛋)进行分工处理,我们将做饭任务细化为准备电饭煲、淘米、蒸煮米饭、处理番茄、打散鸡蛋、炒制番茄鸡蛋、番茄炒鸡蛋盖饭。其中番茄炒鸡蛋盖饭成品的制作要等到蒸煮米饭和炒制番茄鸡蛋均完成后才能执行,。


    番茄炒鸡蛋任务细化

    Future实现

      Future接口有5个方法,其中get(timeout, unit) 支持超时机制,从方法描述中可以发现Future接口不但能够获取任务结果还能取消任务。注意:这里的get方法是阻塞的。

    // 取消任务
    boolean cancel(
      boolean mayInterruptIfRunning);
    // 判断任务是否已取消  
    boolean isCancelled();
    // 判断任务是否已结束
    boolean isDone();
    // 获得任务执行结果
    get();
    // 获得任务执行结果,支持超时
    get(long timeout, TimeUnit unit);
    

      为了更好的实现线程的复用,避免大量线程回收,我们采用线程池来完成任务。
      对于做饭这个任务来说,可以用两个形成实现并发,也就是对主食和菜品进行分工处理。线程t1负责米饭制作的三道工序以及最后的盖饭工序,线程t2负责番茄炒鸡蛋的三道工序。


    番茄炒鸡蛋任务多线程实现
        @Test
        public void doJobFuture() throws ExecutionException, InterruptedException {
            // FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,
            // 可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;
            // 又因为实现了 Future 接口,所以也能用来获得任务的执行结果。
            FutureTask<String> futureTask2 = new FutureTask<>(new ThreadTask2());
            FutureTask<String> futureTask1 = new FutureTask<>(new ThreadTask1(futureTask2));;
            // 构造一个线程池
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            threadPool.prestartAllCoreThreads(); // 预启动所有核心线程
            threadPool.submit(new Thread(futureTask1));
            threadPool.submit(new Thread(futureTask2));
            System.out.println(futureTask1.get());
        }
    
        // Task1 负责的任务:准备电饭煲、淘米、蒸煮米饭、番茄炒鸡蛋盖饭
        static class ThreadTask1 implements Callable<String> {
            FutureTask<String> futureTask2;
            // Task1 任务需要 Task2 任务的执行结果
            ThreadTask1(FutureTask<String> futureTask2) {
                this.futureTask2 = futureTask2;
            }
            @Override
            public String call() throws Exception {
                System.out.println("t1: run task : 准备电饭煲");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("t1: run task : 淘米");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("t1: run task : 蒸煮米饭");
                TimeUnit.SECONDS.sleep(40);
                // 获取Task2执行线程的菜品
                String tf = futureTask2.get();
                return tf + "盖饭制作完成";
            }
        }
    
        // Task2 负责的任务:处理番茄、打散鸡蛋、炒制番茄鸡蛋
        static class ThreadTask2 implements Callable<String> {
            @Override
            public String call() throws Exception {
                System.out.println("t2: run task : 处理番茄");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("t2: run task : 打散鸡蛋");
                TimeUnit.SECONDS.sleep(2);
                System.out.println("t2: run task : 炒制番茄鸡蛋");
                TimeUnit.SECONDS.sleep(10);
                return "番茄炒鸡蛋";
            }
        }
    // 随机执行结果:
    t1: run task : 准备电饭煲
    t2: run task : 处理番茄
    t1: run task : 淘米
    t2: run task : 打散鸡蛋
    t2: run task : 炒制番茄鸡蛋
    t1: run task : 蒸煮米饭
    番茄炒鸡蛋盖饭制作完成
    

      Future实现中ThreadTask1在执行番茄炒鸡蛋盖饭这个最终任务前,需要等待ThreadTask2完成菜品任务,所以内部引用了futureTask2,并调用get方法阻塞等待结果。上述代码还参考了《阿里巴巴java开发手册》中的线程强制使用规范:
      1.线程资源必须通过线程池提供,不允许在应用中自行显示创建线程,好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。
      2.线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险,主要是因为Executors创建的线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    CompletableFuture实现

      为了凸显CompletableFuture异步编程的优势,我们重新实现上面的番茄炒鸡蛋盖饭任务。这次我们将任务细分为三个子任务。

    番茄炒鸡蛋任务CompletableFuture细分方案
        @Test
        public void doJobCompletableFuture() throws ExecutionException, InterruptedException {
            // 构造一个线程池
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            threadPool.prestartAllCoreThreads(); // 预启动所有核心线程
            // Task1 负责的任务:准备电饭煲、淘米、蒸煮米饭
            CompletableFuture<String> cf1 =
                    CompletableFuture.supplyAsync(() -> {
                        try {
                            System.out.println("t1: run task : 准备电饭煲");
                            TimeUnit.SECONDS.sleep(5);
                            System.out.println("t1: run task : 淘米");
                            TimeUnit.SECONDS.sleep(3);
                            System.out.println("t1: run task : 蒸煮米饭");
                            TimeUnit.SECONDS.sleep(40);
                        } catch (InterruptedException e) {
                            throw new BizException(e.getMessage());
                        }
                        return "Task1执行结束";
                    }, threadPool).exceptionally(ex -> {
                        System.out.println(ex.getMessage());
                        return "发生异常: " + ex.getMessage();
                    });
            // Task2 负责的任务:处理番茄、打散鸡蛋、炒制番茄鸡蛋
            CompletableFuture<String> cf2 =
                    CompletableFuture.supplyAsync(() -> {
                        try {
                            System.out.println("t2: run task : 处理番茄");
                            TimeUnit.SECONDS.sleep(5);
                            System.out.println("t2: run task : 打散鸡蛋");
                            TimeUnit.SECONDS.sleep(2);
                            System.out.println("t2: run task : 炒制番茄鸡蛋");
                            TimeUnit.SECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            throw new BizException(e.getMessage());
                        }
                        return "番茄炒鸡蛋";
                    }, threadPool).exceptionally(ex -> {
                        System.out.println(ex.getMessage());
                        return "发生异常: " + ex.getMessage();
                    });
            // Task3 负责的任务:制作番茄炒鸡蛋盖饭,等待Task1和Task2的执行结果
            CompletableFuture<String> cf3 =
                    cf1.thenCombineAsync(cf2, (c1, c2) -> {
                        System.out.println("Task1任务完成 : " + c1);
                        System.out.println("Task2任务完成 : " + c2);
                        return c2 + "盖饭制作完成";
                    }, threadPool);
            System.out.println(cf3.join());
        }
    // 随机执行结果
    t1: run task : 准备电饭煲
    t2: run task : 处理番茄
    t1: run task : 淘米
    t2: run task : 打散鸡蛋
    t2: run task : 炒制番茄鸡蛋
    t1: run task : 蒸煮米饭
    Task1任务完成 : Task1执行结束
    Task2任务完成 : 番茄炒鸡蛋
    番茄炒鸡蛋盖饭制作完成
    
    CompletableFuture vs Future

      1.并且提供了函数式编程的能力,完美结合了Java8流的新特性,代码更简练,语义更清晰。
      2.支持回调函数,极大丰富了Future的功能。
      3.支持合并相互独立的异步计算结果、等待所有异步任务完成或者等待其中一个异步任务完成就返回等多种场景。
      4.提供了原生的异常处理API。

    函数式编程

      在阐述CompletableFuture的详细用法之前有必要先了解一下Java8的新特性——函数式编程。

    行为参数化

      行为参数化是指通过API传递代码的能力,它比匿名类更加清晰,简洁。我们通过一个例子在具体描述行为参数化。在会员卡的业务中,经常出现筛选的场景,比如:筛选卡余额大于500的会员卡信息。

    /**
     * 会员卡对象
     */
    @Data
    public class Card implements Serializable {
    
        private static final long serialVersionUID = -457098909047177539L;
    
        /**
         * 卡号
         */
        private String code;
    
        /**
         * 密码
         */
        private String pwd;
    
        /**
         * 余额(单位:分)
         */
        private Long balance;
    
        /**
         * 0:未使用 1:正常 2:挂失 3:注销 4:换卡
         */
        private Short status;
    
        /**
         * 消费累计(单位:分)
         */
        private Long consumeAmount;
        
        /**
         * 卡消费次数
         */
        private Integer consumeNum;
    }
    

      会员卡实体对象如上所示。所以可以用方法findCardByBalance筛选出余额大于500的会员卡信息。那如果现在要筛选出余额大于1000的会员卡,很简单只要把if逻辑中的500改成1000即可,但是会造成大量重复代码。所以最简单的方法就是重写findCardByBalance,把筛选条件余额作为参数传递。

        // 筛选出余额大于500元的会员卡
        public static List findCardByBalance(List<Card> cardList) {
            List list = new ArrayList<>();
            for (Card card : cardList) {
                if (card.getBalance() > 500.00d) {
                    list.add(card);
                }
            }
            return list;
        }
    
        // 将筛选条件作为参数传递到方法中
        public static List findCardByBalance(List<Card> cardList, Double balance) {
            List list = new ArrayList<>();
            for (Card card : cardList) {
                if (card.getBalance() > balance) {
                    list.add(card);
                }
            }
            return list;
        }
    

      随着业务的发展,筛选条件会变得越来越多,甚至越来越复杂,所以按照修改或增加入参来满足需求将会使代码编码变得非常繁琐,调用时也会异常复杂。那么我们首先想到的就是提取出条件判断部分,也就是筛选条件的部分,并将其抽象成接口方法。

        public interface CardPredicate {
            boolean checkCondition(Card Card);
        }
        // 传递筛选行为
        public static List findCardByBalance(List<Card> cardList, CardPredicate predicate) {
            List list = new ArrayList<>();
            for (Card card : cardList) {
                if (predicate.checkCondition(card)) {
                    list.add(card);
                }
            }
            return list;
        }
    

      因此,我们就可以让findCardByBalance方法来接受cardPredicate中封装的筛选条件,在真正执行之前,我们并不知道checkCondition的具体实现。那当我们在main函数中调用以上方法时,我们可以采用匿名内部类的方式。

            List<Card> cardList = new ArrayList<>();
            // ...省略card的赋值过程
            // 匿名内部类实现
            List returnCardList = findCardByBalance(cardList, new CardPredicate() {
                public boolean checkCondition(Card card) {
                    return card.getBalance() > balance;
                }
            });
    
            // Lambda实现
            returnCardList = findCardByBalance(cardList, card2 -> card2.getBalance() > balance);
    

      上述代码在idea编译器中会有一个警告,Anonymous new CardPredicate can be replaced with lambda,所以我们用lambda表达式简化上述匿名内部类的实现方式。
      到此为止,可以简单总结一下:CardPredicate接口定义了一个checkCondition的行为动作,checkCondition只有在真正执行的时候才会传递具体的行为,这也是最简单的行为传递。行为参数化是指方法能够接收多种不同的行为作为参数,并在内部使用它们,以完成不同行为的能力。

    Lambda表达式

      由于本文篇幅有限,这里只是简单介绍一下Lambda表达式。Java Lambda表达式是一个匿名函数,是一种没有声明的方法,即没有访问修饰符,返回值和方法名,也可以简单的理解为一种“语法糖”,由编译器推断并帮你转换包装为常规的代码,可以使用更少的代码来实现同样的功能。
      Java中的Lambda表达式通常使用语法是 (argument) -> {body},其中body可以是expression表达式或者statements变量值,以(card2 -> card2.getBalance() > balance) 这个表达式为例,Lambda表达式可以分解为三个部分:
      1.参数列表:card2
      2.操作符:->
      3.主体内容:card2.getBalance() > balance
      推荐一篇讲解Lambda表达式的文章:Lambda 表达式有何用处?如何使用?

    函数式接口

      在Java 8里面,所有的Lambda的类型都是一个接口,而Lambda表达式本身就是这个接口的实现。简而言之就是,Lambda表达式本身就是一个接口的实现。比如我们上面提到的Lamdba表达式的完整声明为:

        public static void main(String[] args) {
            // Lambda表达式完整声明
            CardPredicate cardPredicate = card2 -> card2.getBalance() > balance;
            // 传统的java接口实现
            CardPredicate = new CardPredicateImpl(balance);
        }
    
        public static class CardPredicateImpl implements CardPredicate {
            private Double balance;
            public CardPredicateImpl(Double balance) {
                this.balance = balance;
            }
            @Override
            public boolean checkCondition(Card card) {
                return card.getBalance() > balance;
            }
        }
    

      上述的两种实现本质上完全相同,这是Lambda的写法更加优雅简洁。而类似于CardPredicate这种有且仅有一个抽象方法,但是可以有多个非抽象方法的接口被称为函数式接口(Functional Interface)。这里要注意几个点:
      1.定义一个函数式接口需要给接口加上注解@FunctionalInterface。
      2.如果某接口未添加@FunctionalInterface注解,但是仍然只有一个抽象方法,那么编译器依旧会当作函数式接口进行处理。
      3.重写的方法是Object类的方法时,并不会算在“抽象方法”内,不受有且仅有一个抽象方法的约束。
      4.在JDK1.8版本之前,接口中的所有方法必须是非静态的,方法默认为public和abstract。1.8版本之后接口支持default关键词标识的默认方法,默认方法允许添加新的功能到现有库的接口中,并能确保与采用旧版本接口编写的代码的二进制兼容性。

    JDK1.8之前已有的函数式接口:

    • java.lang.Runnable
    • java.util.concurrent.Callable
    • java.security.PrivilegedAction
    • java.util.Comparator
    • java.io.FileFilter
    • java.nio.file.PathMatcher
    • java.lang.reflect.InvocationHandler
    • java.beans.PropertyChangeListener
    • java.awt.event.ActionListener
    • javax.swing.event.ChangeListener

    JDK1.8新增的函数式接口都被放置在java.util.function包下,这些接口按照功能主要是四大核心函数式接口:

    • Consumer<T> 消费型
    • Supplier <T> 供给型
    • Function <T, R> 函数型
    • Predicate <T> 断言型
    接口 参数类型 返回类型 用途
    Consumer T void 接受输入参数T,并且无返回的操作
    Supplier T 无参数,返回T类型结果
    Function T R 接受输入参数T,返回T类型结果参
    Predicate T boolean 接接受输入参数T,返回布尔类型结果

    CompletionStage接口

      CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等,一般来说要执行下一个阶段都需要上一个阶段正常完成,当然也提供了对异常结果的处理接口。CompletionStage的接口一般都返回新的CompletionStage,表示执行完一些逻辑后,生成新的CompletionStage,构成链式的阶段型的操作。目前只有CompletableFuture一个实现类。
      CompletionStage中的方法主要用来定义一个行为,行为的入参也大量的使用了函数式接口的命名方式,可以是Consumer、Function、Runable等。相关方法比如有apply,accept,run等,这些方法的区别在于它们有些是需要传入参,有些则会产生“结果”。

    • Funtion会产生结果
    • Comsumer会消耗结果
    • Runable既不产生结果也不消耗结果

      举例说明thenAcceptBothAsync接口会在当前CompletionStage和other对应的CompletionStage正常执行完成后以前两个CompletionStage作为入参执行action函数,并在入参executor线程池中执行action函数,由于BiConsumer是消费型的函数式接口,因为该接口没有返回值。类似的CompletionStage接口的分类与描述请参考:Java并发包之阶段执行之CompletionStage接口

    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
    

      简单来说CompletionStage确保了CompletableFuture能够进行链式调用。

    CompletableFuture

      CompletableFuture作为CompletionStage接口的官方实现类,实现了Future和CompletionStage接口,这意味着它即满足CompletableStage的阶段执行,也提供了Future中获取该执行结果的方法。
      以supplyAsync(Supplier<U> supplier)接口为例,该接口返回一个新的CompletableFuture,默认在ForkJoinPool.commonPool()中异步运行任务。

        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
    
        static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                         Supplier<U> f) {
            if (f == null) throw new NullPointerException();
            CompletableFuture<U> d = new CompletableFuture<U>();// 表示执行结果的CompletableFuture返回对象
            e.execute(new AsyncSupply<U>(d, f));
            return d;
        }
    
        // 静态内部类
        static final class AsyncSupply<T> extends ForkJoinTask<Void>
                implements Runnable, AsynchronousCompletionTask {
            CompletableFuture<T> dep; Supplier<T> fn; // 具体的任务执行逻辑
            AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
                this.dep = dep; this.fn = fn;
            }
    
            public final boolean exec() { run(); return true; }
    
            public void run() {
                CompletableFuture<T> d; Supplier<T> f;
                if ((d = dep) != null && (f = fn) != null) {
                    dep = null; fn = null;
                    if (d.result == null) {  // 判断当前任务是否已结束,任务结束的情况下,result对象会保存执行结果result != null.
                        try {
                            d.completeValue(f.get());  //设置结果并等待执行结果
                        } catch (Throwable ex) {
                            d.completeThrowable(ex);
                        }
                    }
                    d.postComplete();
                }
            }
        }
    
        final void postComplete() {
            CompletableFuture<?> f = this; Completion h;// 并发栈,保存依赖当前CompletableFuture的任务
            while ((h = f.stack) != null ||
                   (f != this && (h = (f = this).stack) != null)) {
                CompletableFuture<?> d; Completion t;
                if (f.casStack(h, t = h.next)) {
                    if (t != null) {
                        if (f != this) {
                            pushStack(h);
                            continue;
                        }
                        h.next = null;    // detach
                    }
                    f = (d = h.tryFire(NESTED)) == null ? this : d;
                }
            }
        }
    
        @SuppressWarnings("serial")
        abstract static class Completion extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
            volatile Completion next;      // Treiber stack link
            abstract CompletableFuture<?> tryFire(int mode); // 钩子方法
            abstract boolean isLive();
    
            public final void run()                { tryFire(ASYNC); }
            public final boolean exec()            { tryFire(ASYNC); return true; }
            public final Void getRawResult()       { return null; }
            public final void setRawResult(Void v) {}
        }
    

    supplyAsync -> asyncSupplyStage -> AsyncSupply -> postComplete -> tryFire

      从上述源码中可以清晰的看出代码的逻辑线,CompletableFuture在执行时会封装一个用于表示执行结果的CompletableFuture d对象,并将其作为返回值,并根据不同的入参进入不同的处理逻辑。AsyncSupply作为CompletableFuture的静态内部类继承了ForkJoinTask,是一个可以在ForkJoinPool中执行的任务。run()结束后会调用postComplete()来执行所有依赖当前任务的其他任务。
      其中postComplete方法比较复杂,在了解该方法前需要了解CompletableFuture的结构,可以发现每个CompletableFuture持有一个Completion栈stack, 每个Completion持有一个CompletableFuture -> dep, 如此递归循环下去,是层次很深的树形结构。如下图所示(图片引用自【CompletableFuture源码分析】一文)。

    CompletableFuture树形结构
      postComplete主要作用是将当前CompletableFuture的栈中元素逐个出栈并tryFile,发现新的CompletableFuture,将它的元素反向压入本CompletableFuture的栈中,压入结束后,继续对栈中元素逐个出栈并tryFire,发现非空CompletableFuture则继续上述过程。直到本CompletableFuture的栈中不再有元素为止。
      postComplete()最后调用的是Completion的tryFire()。tryFire作为一个钩子方法在Completion的子类中得到了具体实现。tryFire返回的CompletableFuture将在当前执行的CompletableFuture的栈中反向执行。tryFile正常运行结束时会调用postFire方法进行遍历栈并去除死亡任务cleanStack等操作。由于篇幅有限详细的源码分析可以参考博客:【JUC源码解析】CompletableFuture以及ForkJoin框架之CompletableFuture

    CompletableFuture API介绍

    static方法实例化CompletableFuture

    // runAsync方法接收的是Runnable实例,没有返回值
    CompletableFuture.runAsync(Runnable runnable);
    // 参数executor表示让任务在指定的线程池中执行,不指定的话任务是在 ForkJoinPool.commonPool() 线程池中执行的。
    CompletableFuture.runAsync(Runnable runnable, Executor executor);
    
    // supplyAsync方法是JDK8的函数式接口,无参数,会返回结果
    CompletableFuture.supplyAsync(Supplier<U> supplier);
    // 参数executor表示让任务在指定的线程池中执行,不指定的话任务是在 ForkJoinPool.commonPool() 线程池中执行的。
    CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
    

    描述顺序执行关系

      CompletionStage API中主要有thenApply、thenAccept、thenRun和 thenCompose 这四个系列的接口用于描述穿行关系。

        // 返回一个新的CompletionStage,当前阶段正常完成时,将以该阶段的结果作为所提供函数的参数执行。
        public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
        // 返回一个新的CompletionStage,当前阶段正常完成时,将使用此阶段的默认异步执行工具执行此阶段的结果作为所提供函数的参数。
        public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
        // 返回一个新的CompletionStage,当前阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。
        public CompletionStage<Void> thenAccept(Consumer<? super T> action);
        // 返回一个新的CompletionStage,当前阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果作为提供的操作的参数。
        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
        // 返回一个新的CompletionStage,当前阶段正常完成时,执行给定的操作。
        public CompletionStage<Void> thenRun(Runnable action);
        // 返回一个新的CompletionStage,当前阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
        public CompletionStage<Void> thenRunAsync(Runnable action);
        // 返回一个新的CompletionStage,当前阶段正常完成时,这个阶段将作为提供函数的参数执行。
        public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
        // 返回一个新的CompletionStage,当前阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段作为提供的函数的参数。
        public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
    

    描述合并执行关系

      CompletionStage接口里面描述合并执行关系,主要是thenCombine、thenAcceptBoth 和 runAfterBoth系列的接口。

        // 返回一个新的CompletionStage,当前和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。
        public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
        // 返回一个新的CompletionStage,当前和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供函数的参数。
        public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
        // 返回一个新的CompletionStage,当前和另一个给定的阶段都正常完成时,两个结果作为提供的操作的参数被执行
        public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
        // 返回一个新的CompletionStage,当前和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供的操作的参数。
        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
        // 返回一个新的CompletionStage,当前和另一个给定的阶段都正常完成时,执行给定的动作。
        public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
        // 返回一个新的CompletionStage,当前和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
    

    描述选择执行关系

      CompletionStage接口里面描述选择执行关系,主要是applyToEither、acceptEither 和 runAfterEither系列的接口。

        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。
        public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供函数的参数。
        public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,执行相应的结果作为提供的操作的参数。
        public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供的操作的参数。
        public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,执行给定的操作。
        public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
        // 返回一个新的CompletionStage,当前或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
        public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
    

    异常处理

      对于核心方法中抛出的运行时异常,CompletionStage接口提供了相应的异常处理接口。

        // 返回一个新的CompletableFuture,当发生异常时,触发执行指定方法。
        public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
        // 返回此阶段正常执行的结果或异常执行新的CompletionStage。
        public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
        // 返回此阶段正常执行的结果或异常执行新的CompletionStage,执行给定操作将使用此阶段的默认异步执行工具执行给定操作。
        public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
        // 返回一个新的CompletionStage,当此阶段正常完成或发生异常时,将结果作为参数执行。
        public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
        // 返回一个新的CompletionStage,当此阶段正常完成或发生异常时,将结果作为参数执行,执行给定操作将使用此阶段的默认异步执行工具执行给定操作。
        public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    

      whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。两者的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
      由于篇幅有限这里详细的实例可参考深入学习java源码之CompletableFuture.reportGet()与CompletableFuture.supplyAsync()以及Java并发包之阶段执行之CompletionStage接口

    应用实例

      最后以一个实际应用场景结束本文,楼主在会员卡券的业务小组中经常会遇到各种卡信息的查询功能,甚至会包含一些依赖第三方的卡信息查询,为了保证查询接口的顺利返回防止超时调用失败,会采用CompletableFuture并发查询结果。

            // 多线程方式并发获取会员卡信息
            List<FutureRequest> futureRequestList = new ArrayList<>();
            // ....封装多线程并发查询会员卡信息时的请求对象
            // 查询返回的会员卡列表
            List<CardDataVo> tempList = new ArrayList<>();
            CompletableFuture[] cf = futureRequestList.stream().map(futureRequest -> CompletableFuture.supplyAsync(
                    () -> queryCardDataList(futureRequest.getXxx(), futureRequest.getYyy(), futureRequest.getZzz),
                    threadPoolTaskExecutor)
                    .whenComplete((value, e) -> {
                        if (CollectionUtils.isNotEmpty(value)) {
                            tempList.addAll(value);
                        }
                    }))
                    .toArray(CompletableFuture[]::new);
            // allOf方法在当前方法中等待所有CompletableFuture执行完成。join完成后返回结果值,如果完成异常,则返回异常。
            CompletableFuture.allOf(cf).join();
        /**
         * 查询各类卡列表接口
         *
         * @return 会员卡列表信息
         */
        private List<CardDataVo> queryCardDataList(String Xxx, String Yyy, Long Zzz) {
            // ....省略具体查询方法实现
        }
    

      采用自定义线程池一方面更符合线程池开发规范,另一方面是为了避免所有的CompletableFuture共享同一个线程池,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

    参考文章(尊重他人劳动成果)

    1.深入学习java源码之CompletableFuture.reportGet()与CompletableFuture.supplyAsync()
    2.Java并发包之阶段执行之CompletionStage接口
    3.CompletableFuture源码分析
    4.ForkJoin框架之CompletableFuture
    5.Java8的CompletableFuture进阶之道
    6.理解 CompletableFuture 的任务与回调函数的线程
    7.Java回调实现异步
    8.CompletableFuture: 异步编程
    9.Java8函数式编程之三:函数式接口

    相关文章

      网友评论

        本文标题:CompletableFuture实践

        本文链接:https://www.haomeiwen.com/subject/nvduuctx.html