美文网首页
CompletableFuture流水线工作

CompletableFuture流水线工作

作者: 不知名的蛋挞 | 来源:发表于2018-08-02 19:48 被阅读38次

CompletableFuture流水线工作

将CompletableFuture与其他CompletableFuture组合,以及与其他函数组合,能够为多项任务构建类似流水线的方案,这样能够控制同步和异步执行以及它们之间的依赖。

现在我们来实现下面需求来说明CompletableFuture流水线式的处理:一个shop里面有ID:1-10的production,现在想获取这些production的价格再乘以10倍输出。

用多线程去做的话就需要10个线程去运行任务,还要等待所有结果完成返回到一个list中去。

public class CompletableFutureInAction3 {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    /**
     * 这是一个高并发的执行过程
     */
    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            Thread t = new Thread();
            t.setDaemon(false);
            return t;
        });

        List<Integer> productionIDs = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

        // 开启一个CompletableFuture去进行商品价格查询
        Stream<CompletableFuture<Double>> cfStream = productionIDs.stream().map(i -> CompletableFuture.supplyAsync(() -> queryProduction(i), executor));
        // 实现价格翻倍
        Stream<CompletableFuture<Double>> multiplyFuture = cfStream.map(future -> future.thenApply(CompletableFutureInAction3::multiply));
        // 整合结果 join()
        List<Double> result = multiplyFuture.map(CompletableFuture::join).collect(Collectors.toList());
        System.out.print(result);
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }

    private static double multiply(double value){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return value*10;
    }

    // 参数i没有用,只不过模拟ID去查询商品
    private static double queryProduction(int i){
        return get();
    }
}

我们实现了多个任务去执行最终汇总到一个结果中去,而且我们不用考虑多线程的交互和控制。

CompletableFuture常用流水线工作API

1. 运行完成的代码

  • thenAccept(Consumer<? super T> block):接收上一阶段的输出作为本阶段的输入,无返回值。
  • thenRun(Runnable action):如果处理完成还想要做后续的操作可以使用thenRun。thenRun不关心前一阶段的输出和计算结果,因为它不需要输入参数,也无返回值。
  • runAfterBoth(CompletionStage<?> other,Runnable action):工作方式与thenRun()类似,但是是等待两个Future而不是一个。
// thenAccept
CompletableFuture.supplyAsync(()->1)
                .thenAccept(System.out::println);  // 输出1

StringBuilder result = new StringBuilder();
CompletableFuture.supplyAsync(()->"aaa")
                .thenAccept(s->result.append(s));  // 无任何输出

// thenRun
CompletableFuture.supplyAsync(()->1)
                .thenRun(System.out::println);  // 无任何输出,因为thenRun不接受参数

 /**
   * 这里的两个Stage都是同步运行的,两个运行结束才会输出done
   * 
   * 输出结果:
   * ForkJoinPool-commomPool-worker1 is running
   * ForkJoinPool-commomPool-worker1 is running
   * done
 */
CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" is running");
            return 1;
        }).runAfterBoth(CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" is running");
            return 2;
        }),()->System.out.println("done"));

2. 转换和作用于CompletableFuture

  • thenApply(Function<? super T,? extends U> fn):如果想持续的在多个回调之间传递值,可以用 thenApply 。 thenApply() 的参数 Function,接受一个值,并且返回一个值。其功能相当于将 CompletableFuture<T> 转换成CompletableFuture<U>。
  • handle(BiFunction<? super T,Throwable,? extends U> fn):handle()与thenApply()的区别在于handle()方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出,而thenApply()只能处理正常值,因此一旦有异常就会抛出。
  • thenCompose (Function<? super T, ? extends CompletionStage<U>> fn):允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。和thenApply的不同类似于map和flatMap的不同:thenCompose在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletionStage类型。
// thenApply
CompletableFuture.supplyAsync(()->1)
       .thenApply(i->Integer.sum(i,10))
       .whenComplete((v,t)->System.out.println(v));  // 输出11

// handle
CompletableFuture.supplyAsync(()->1)
       .handle((v,t)->{
            return t!=null?"error":Integer.sum(v,10);
        })  // 多了对异常的考虑
       .whenComplete((v,t)->System.out.println(v)); // 输出11

// thenCompose
CompletableFuture.supplyAsync(()->1)
       .thenCompose(i->CompletableFuture.supplyAsync(()->10*i)) //把i作为入参,交给另外一个CompletableFuture处理
       .thenAccept(System.out::println);  //输出10

3. 结合(链接)两个futures

  • thenCombine (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn):适用于将两个完全不相干的 CompletableFuture 对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务的情况。
  • thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action):用于组合两个并发的任务,产生新的Future没有返回值。工作方式与thenAccept()类似,但是是等待两个Future而不是一个。
// thenCombine
CompletableFuture.supplyAsync(()->1)
     .thenCombine(CompletableFuture.supplyAsync(()->2.0d),(r1,r2)->r1+r2)
     .thenAccept(System.out::println);  //输出 3.0

// thenAcceptBoth
CompletableFuture.supplyAsync(()->1)
     .thenAcceptBoth(CompletableFuture.supplyAsync(()->2.0d),(r1,r2)->{
      // 不会把计算结果再传递下去,就是一个消费者
                    System.out.println(r1);
                    System.out.println(r2);
                    System.out.println(r1+r2);
      }); 

4. 将Function作用于两个已完成Stage的结果之一

CompletableFuture 还能等待第一个 Future 而不是所有 Future 计算完毕,当你有两个任务,两个都是同样类型的结果,你只关心响应时间,谁快用谁的结果,不关心具体哪个任务首先返回。比如有两个数据一样的数据库,你同时去查询,谁快用谁的数据。

  • applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):取2个Future中最先返回的,Function作用于两个Future中完成最快的那一个。
  • acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action):和applyToEither类似,不同的是acceptEither参数为consumer,不会有返回值,也就是会消费掉。
  • runAfterEither(CompletionStage<?> other,Runnable action):两个Future,任何一个完成了都会执行下一步的操作(Runnable)。
/**
  * 输出:
  * I am future 2
  * I am future 1
  * 2
  * 20
  * 1
*/
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).applyToEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),v->v*10)
           .thenAccept(System.out::print);

/**
  * 输出:
  * I am future 2
  * I am future 1
  * 2
  * 1
 */
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).acceptEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),System.out::println);

/**
  * 输出:
  * I am future 2
  * done
  * I am future 1
*/
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).runAfterEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),()->System.out.println("done"));

Thread.currentThread().join();  // 为了让多个任务都执行完,这里添加一个join方法

5. allOf与anyOf

  • allOf(CompletableFuture<?>... cfs):该方法接收一个由CompletableFuture 构成的数组,数组中的所有 Completable-Future 对象执行完成之后,它返回一个 CompletableFuture<Void> 对象。
  • anyOf():该方法接收一个 CompletableFuture 对象构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 CompletableFuture<Object> ,也就是只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待。
 /**
   * 输出:
   * 3.12
   * 3.12
   * 3.12
   * 3.12
   * done
   */
List<CompletableFuture<Double>> collect = Arrays.asList(1, 2, 3, 4)
            .stream()
            .map(i -> CompletableFuture.supplyAsync(() -> 3.12))
            .collect(Collectors.toList());
CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()]))
            .thenRun(()->System.out.println("done"));
public static void test4() throws Exception {

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
            //模拟执行耗时任务
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果
            return "result1";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()->{
            //模拟执行耗时任务
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果
            return "result2";
        });

        CompletableFuture<Object> anyResult=CompletableFuture.anyOf(cf1,cf2);

        System.out.println("第一个完成的任务结果:"+anyResult.get());

        CompletableFuture<Void> allResult=CompletableFuture.allOf(cf1,cf2);

        // 阻塞等待所有任务执行完成
        // join等同于get,唯一区别是前者不会扔出任何检查意外exception
        allResult.join();
        System.out.println("所有任务执行完成");
}

相关文章

网友评论

      本文标题:CompletableFuture流水线工作

      本文链接:https://www.haomeiwen.com/subject/nvddvftx.html