美文网首页
项目优化 - CompletableFuture-异步编排简单实

项目优化 - CompletableFuture-异步编排简单实

作者: 欢喜的看着书 | 来源:发表于2023-07-15 10:30 被阅读0次

    前言

    在之前的项目开发中,都没怎么使用过CompletableFuture的功能,只听说过和异步编程有关。为了能够在将来有需要的时候用得上,这两天花了点时间学习了一下,并简单地总结一下如何使用CompletableFuture完成异步任务编排。
    先创建一个自定义的线程池,后续所有代码都会使用到:

    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() {
        private final AtomicInteger THREAD_NUM = new AtomicInteger(1);
    
        @Override
        public Thread newThread(Runnable r) {
          Thread t = new Thread(r);
    //      设置为守护线程,main线程结束就跟着一起结束,否则main函数结束jvm还在
          t.setDaemon(true);
          t.setName("completable-future-test-Thread-" + THREAD_NUM.incrementAndGet());
          return t;
        }
      }, new ThreadPoolExecutor.AbortPolicy());
    

    同步串行代表任务1、任务2、任务3按时间先后顺序执行,并且都是同一个线程来执行。
    示例代码如下:

    CompletableFuture
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
            .thenApply(
                (task1Result) -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println("拿到上一个任务的返回值:" + task1Result);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                })
            .thenAccept(
                 (task2Result) -> {
                 Thread currentThread = Thread.currentThread();
                 String ThreadName = currentThread.getName();
                 String taskName = "task3";
                 System.out.println(ThreadName + "开始执行任务:" + taskName);
                 System.out.println("正在执行任务" + taskName);
                 System.out.println("拿到上一个任务的返回值:" + task2Result);
                 System.out.println(taskName + "执行结束");
               });
    
    

    执行结果:

    completable-future-test-Thread-2开始执行任务:task1
    正在执行任务task1
    task1执行结束
    completable-future-test-Thread-2开始执行任务:task2
    正在执行任务task2
    拿到上一个任务的返回值:task1
    task2执行结束
    completable-future-test-Thread-2开始执行任务:task3
    正在执行任务task3
    拿到上一个任务的返回值:task2
    task3执行结束
    

    1.入口函数supplyAsync()代表一个异步的有返回值的函数,之所以异步,是与主线程区别,从线程池中的拿一个线程来执行。
    2.thenApply()和thenAccept()没有Async,意味着是和前面的任务共用一个线程,从执行结果上我们也可以看到线程名称相同。
    3.thenApply()需要接收上一个任务的返回值,并且自己也要有返回值。
    4.thenAccept()需要接收上一个任务的返回值,但是它不需要返回值。

    异步串行

    异步串行代表任务1、任务2、任务3按时间先后顺序执行,并由不同的线程来执行。
    示例代码如下:

        CompletableFuture
            // 有返回值
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
            // 需要上一个任务的返回值,并且自身有返回值
            .thenApplyAsync(
                (task1Result) -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println("拿到上一个任务的返回值:" + task1Result);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
            // 不需要上一个任务的返回值,自身也没有返回值
            .thenRunAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println("thenRunAsync()不需要上一个任务的返回值");
                  System.out.println(taskName + "执行结束");
                }, THREAD_POOL_EXECUTOR);
    

    执行结果如下:

    completable-future-test-Thread-2开始执行任务:task1
    正在执行任务task1
    task1执行结束
    completable-future-test-Thread-3开始执行任务:task2
    正在执行任务task2
    拿到上一个任务的返回值:task1
    task2执行结束
    completable-future-test-Thread-4开始执行任务:task3
    正在执行任务task3
    thenRunAsync()不需要上一个任务的返回值
    task3执行结束
    

    1.入口函数依然是supplyAsync(),需要传入一个有返回值的函数作为参数;如果想要没有返回值的函数传进来的话,可以使用CompletableFuture.runAsync();
    2.thenApplyAsync()和thenRunAsync()分别表示里面的任务都是异步执行的,和执行前面的任务不是同一个线程;
    3.thenRunAsync()需要传入一个既不需要参数,也没有返回值的任务;

    并行任务

        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture<Void> future2 = CompletableFuture
            .runAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
    

    执行结果如下:

    completable-future-test-Thread-4开始执行任务:task3
    completable-future-test-Thread-2开始执行任务:task1
    completable-future-test-Thread-3开始执行任务:task2
    正在执行任务task3
    task3执行结束
    正在执行任务task2
    正在执行任务task1
    task2执行结束
    task1执行结束
    

    一看执行结果,明显是乱序的,并且三个任务分别由三个线程执行,符合咱们的预期;注意异步的方法后面都是带有Async关键字的;

    多任务结果合并计算

    • 两个任务结果的合并


    任务3的执行依赖于任务1、任务2的返回值,并且任务1和任务3由同一个线程执行,任务2单独一个线程执行;

    示例代码如下:

        CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
            .thenCombine(
                CompletableFuture
                    // 任务2
                    .supplyAsync(
                        () -> {
                          Thread currentThread = Thread.currentThread();
                          String ThreadName = currentThread.getName();
                          String taskName = "task2";
                          System.out.println(ThreadName + "开始执行任务:" + taskName);
                          System.out.println("正在执行任务" + taskName);
                          System.out.println(taskName + "执行结束");
                          return taskName;
                        }, THREAD_POOL_EXECUTOR),
                // 任务3
                (task1Result, task2Result) -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                });
    

    执行结果如下:

    completable-future-test-Thread-3开始执行任务:task2
    completable-future-test-Thread-2开始执行任务:task1
    正在执行任务task1
    正在执行任务task2
    task2执行结束
    task1执行结束
    completable-future-test-Thread-2开始执行任务:task3
    task1结果:task1 task2结果:task2
    正在执行任务task3
    task3执行结束
    

    CompletableFuture 提供了thenCombine()来合并另一个CompletableFuture的执行结果,所以thenCombine()需要两个参数,第一个参数是另一个CompletableFuture,第二个参数会收集前两个任务的返回值,类似下面这样:

    (result1,result2)->{
      // 执行业务逻辑
      return result3;
    }
    

    如果小伙伴们想要实现任务3也是单独的线程执行的话,可以使用thenCombineAsync()这个方法。代码如下:

        CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
    
            .thenCombineAsync(
                CompletableFuture
                    // 任务2
                    .supplyAsync(
                        () -> {
                          Thread currentThread = Thread.currentThread();
                          String ThreadName = currentThread.getName();
                          String taskName = "task2";
                          System.out.println(ThreadName + "开始执行任务:" + taskName);
                          System.out.println("正在执行任务" + taskName);
                          System.out.println(taskName + "执行结束");
                          return 2;
                        }, THREAD_POOL_EXECUTOR),
                // 任务3
                (task1Result, task2Result) -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("task1结果:" + task1Result + "\ttask2结果:" + task2Result);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return 2L;
                }, THREAD_POOL_EXECUTOR);
    

    如果任务3中不需要返回结果,可以使用thenAcceptBoth()thenAcceptBothAsync(),使用方式与thenCombineAsync()类似;

    • 多任务结果合并



      示例代码如下:

        CompletableFuture future1 = CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future2 = CompletableFuture
            // 任务2
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future3 = CompletableFuture
            // 任务3
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
        CompletableFuture.allOf(futures)
            // 任务4
            .whenCompleteAsync(
                (v, e) -> {
                  List<Object> values = new ArrayList<>();
                  for (CompletableFuture future : futures) {
                    try {
                      values.add(future.get());
                    } catch (Exception ex) {
                    }
                  }
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task4";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("前面任务的处理结果:" + values);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                }, THREAD_POOL_EXECUTOR);
    

    执行结果如下:

    completable-future-test-Thread-3开始执行任务:task2
    completable-future-test-Thread-4开始执行任务:task3
    completable-future-test-Thread-2开始执行任务:task1
    正在执行任务task2
    正在执行任务task3
    正在执行任务task1
    task2执行结束
    task3执行结束
    task1执行结束
    completable-future-test-Thread-2开始执行任务:task4
    前面任务的处理结果:[task1, task2, task3]
    正在执行任务task4
    task4执行结束
    
    

    之所以最后任务4的线程是completable-future-test-Thread-2,那是因为线程池的核心线程数设置为3,线程数设置高一点就会创建新的线程处理;
    从上述代码示例中,我们可以收获到另一个知识点:allOf(),它的作用是要求所有的任务全部完成才能执行后面的任务。

    任一任务完成

    在一批任务中,只要有一个任务完成,那么就可以向后继续执行其他任务。

    为了代码演示无异议,后续代码中,我们把线程数提升到4。

    示例代码如下:

        CompletableFuture future1 = CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future2 = CompletableFuture
            // 任务2
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future3 = CompletableFuture
            // 任务3
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture.anyOf(future1, future2, future3)
            .thenApplyAsync((taskResult) -> {
              Thread currentThread = Thread.currentThread();
              String ThreadName = currentThread.getName();
              String taskName = "task4";
              System.out.println(ThreadName + "开始执行任务:" + taskName);
              System.out.println("前面任务的处理结果:" + taskResult);
              System.out.println("正在执行任务" + taskName);
              System.out.println(taskName + "执行结束");
              return taskName;
            }, THREAD_POOL_EXECUTOR);
    

    执行结果如下:

    completable-future-test-Thread-2开始执行任务:task1
    completable-future-test-Thread-4开始执行任务:task3
    completable-future-test-Thread-3开始执行任务:task2
    正在执行任务task3
    正在执行任务task2
    正在执行任务task1
    task1执行结束
    task3执行结束
    task2执行结束
    completable-future-test-Thread-5开始执行任务:task4
    前面任务的处理结果:task1
    正在执行任务task4
    task4执行结束
    

    可以看到,任务1第一个结束,所以任务4中接收到的执行结果就是任务1的返回值。

    快速失败

    在一批任务当中,只要有任意一个任务执行产生异常了,那么就直接结束;否则就要等待所有任务成功执行完毕。

    示例代码如下:

        CompletableFuture future1 = CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future2 = CompletableFuture
            // 任务2
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task2";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
    
                  throw new RuntimeException("任务2异常!");
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture future3 = CompletableFuture
            // 任务3
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task3";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  throw new RuntimeException("任务3异常!");
                }, THREAD_POOL_EXECUTOR);
        CompletableFuture[] futures = new CompletableFuture[]{future1, future2, future3};
        CompletableFuture allCompletableFuture = CompletableFuture.allOf(futures);
        // 创建一个任务来监听异常
        CompletableFuture<?> anyException = new CompletableFuture<>();
        for (CompletableFuture<?> completableFuture : futures) {
          completableFuture.exceptionally((t) -> {
            // 任何一个任务异常都会让anyException任务完成
            anyException.completeExceptionally(t);
            return null;
          });
        }
        // 要么allCompletableFuture全部成功,要么一个出现异常就结束任务
        CompletableFuture.anyOf(allCompletableFuture, anyException)
            .whenComplete((value, exception) -> {
              if (Objects.nonNull(exception)) {
                System.out.println("产生异常,提前结束!");
                exception.printStackTrace();
                return;
              }
              System.out.println("所有任务正常完成!");
            });
    

    执行结果如下:

    completable-future-test-Thread-2开始执行任务:task1
    completable-future-test-Thread-3开始执行任务:task2
    completable-future-test-Thread-4开始执行任务:task3
    正在执行任务task2
    正在执行任务task3
    正在执行任务task1
    task2执行结束
    task1执行结束
    task3执行结束
    产生异常,提前结束!
    java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务2异常!
      at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
      at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
      at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: java.lang.RuntimeException: 任务2异常!
      at com.example.awesomerocketmq.completable.CompletableFutureTest.lambda$t$1(CompletableFutureTest.java:53)
      at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
      ... 3 more
    
    

    CompletableFuture没有现成的api实现快速失败的功能,所以我们只能结合allOf()和anyOf()来逻辑来自定义方法完成快速失败的逻辑;
    1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。
    2.把所有的CompletableFuture和额外的CompletableFuture放在anyOf()方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。

    注意

    • 异常处理
      最后需要注意的是,所有的CompletableFuture任务一定要加上异常处理:
        CompletableFuture
            // 任务1
            .supplyAsync(
                () -> {
                  Thread currentThread = Thread.currentThread();
                  String ThreadName = currentThread.getName();
                  String taskName = "task1";
                  System.out.println(ThreadName + "开始执行任务:" + taskName);
                  System.out.println("正在执行任务" + taskName);
                  System.out.println(taskName + "执行结束");
                  return taskName;
                }, THREAD_POOL_EXECUTOR)
            .whenComplete((v,e)->{
              if(Objects.nonNull(e)){
                // todo
                // 处理异常
              }
              if(Objects.nonNull(v)){
                // todo
              }
            });
    

    还可以通过另外两个方法处理:exceptionally()或者handle()

    自定义线程池

    CompletableFuture默认的线程池是ForkJoinThreadPool,建议大家在使用的时候尽可能地使用自定义线程池,这样方便后续的代码优化以及相关的日志查看。

    相关文章

      网友评论

          本文标题:项目优化 - CompletableFuture-异步编排简单实

          本文链接:https://www.haomeiwen.com/subject/poyvudtx.html