美文网首页js css htmljava 基础并发和多线程
Java异步任务编排—CompletableFuture(二)

Java异步任务编排—CompletableFuture(二)

作者: 雪飘千里 | 来源:发表于2023-02-12 18:51 被阅读0次

    CompletableFuture API

    • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

    • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行),如果以Async结尾,却又没自定义线程池,则还是使用公共的ForkJoinPool线程池,

    1 创建异步任务 API

    CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:

    • supplyAsync执行CompletableFuture任务,支持返回值。
    • runAsync执行CompletableFuture任务,没有返回值。

    举个栗子:

    public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.print("supplyAsync,为了联盟");
                    return "哈哈哈哈哈"; }, executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    }
    
    
    
    //输出
    runAsync,为了部落
    null
    supplyAsync,为了联盟哈哈哈哈哈
    

    2 依赖关系

    • thenRun():不关心上一个任务的执行结果,无传参,无返回值

      做完第一个任务后,再做第二个任务,但是前后两个任务没有参数传递,第二个任务也没有返回值

      CompletableFuture<String> completableFuture   = CompletableFuture.supplyAsync(() -> "Hello");
      CompletableFuture<Void> future = completableFuture.thenRun(() -> System.out.println("Computation finished."));
       
      future.get();
      
    • thenApply():依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,有返回值

        第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
      

      CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
      CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
      assertEquals("Hello World", future.get());

    • thenAccept(): 依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,无返回值

      第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的

      CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(() -> "Hello");
      CompletableFuture<Void> future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));
       
      future.get();
      
    • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

      在第一个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

      CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
      CompletableFuture<String> future   = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
       
      assertEquals("Hello World", future.get());
      

    3 组合关系

    • and集合关系
      • thenCombine():执行两个独立的任务,并对其结果执行某些操作,有返回值
      • thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
      • runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务),不会把执行结果当做方法入参,且没有返回值
    CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
    CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
    //想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用 thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值
    cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
    cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
    
    • 聚合关系
      • applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
      • acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
      • runAfterEither():不会把执行结果当做方法入参,且没有返回值

    applyToEither / acceptEither / runAfterEither 都表示将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务

    //第一个异步任务,休眠2秒,保证它执行晚点
            CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
                try{
                    Thread.sleep(2000L);
                    System.out.println("执行完第一个异步任务");}
                catch (Exception e){
                    return "第一个任务异常";
                }
                return "第一个异步任务";
            });
    
            //第二个异步任务
            CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
                                System.out.println("执行完第二个任务");
                                return "第一个任务还在睡觉,这是第二个任务";}
                            );
                    
            CompletableFuture acceptEither =  second.acceptEitherAsync(first, result ->System.out.println(result+"==acceptEither"));
            CompletableFuture applyToEither = second.applyToEitherAsync(first,result->{
                System.out.println(result+"==applyToEither");
                return result;
            });
            CompletableFuture runAfterEither =  second.runAfterEitherAsync(first, () ->System.out.println("hello"));
    
    • 并行执行
      • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
    CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2   = CompletableFuture.supplyAsync(() -> "Beautiful");
    CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
     
    CompletableFuture<Void> combinedFuture  = CompletableFuture.allOf(future1, future2, future3);
     
    combinedFuture.get();
     
    assertTrue(future1.isDone());
    assertTrue(future2.isDone());
    assertTrue(future3.isDone());
    

    allOf局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

    String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
     
    assertEquals("Hello Beautiful World", combined);
    
    • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture,如果执行的任务异常,anyOfCompletableFuture,执行get方法,会抛出异常
     CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "Hello";
                }
            );
    
            CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
            CompletableFuture<Object> combinedFuture  = CompletableFuture.anyOf(future1, future3);
    
            System.out.println(combinedFuture.get());
    
            System.out.println(future1.get());
            System.out.println(future3.get());
            
            //结果
            World
            Hello
            World
    

    4 结果处理 异常捕获

    • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,无返回值;并且whenComplete方法返回的CompletableFutureresult是上个任务的结果
         CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
         CompletableFuture<String> future3   = future1.whenComplete((a, throwable) -> {
                System.out.println("上个任务执行完啦,还把" + a + "传过来");
            });
    
        System.out.println(future3.get());
    
    • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,有返回值;并且whenComplete方法返回的CompletableFutureresult是回调方法的结果
        CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
                System.out.println("上个任务执行完啦,还把" + a + "传过来");
                return "world";
            });
    
       System.out.println(future3.get());
    
    • exceptionally:某个任务执行异常时,执行的回调方法,并且会把抛出的异常作为参数,传递到回调方法
    CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                    ()->{
                        System.out.println("当前线程名称:" + Thread.currentThread().getName());
                        throw new RuntimeException();
                    }
            );
    
            CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
                e.printStackTrace();
                return "歪歪歪?你的程序异常啦";
            });
    
            System.out.println(exceptionFuture.get());
    

    5 超时处理

    JDK 8 版本的CompletableFuture 没有timeout机制,timeout机制是指,如果forkjoin-pool(或者自定义线程池)中一个线程在规定时间内没有返回,那么就结束掉,而不是继续执行直到获取结果,比如main线程200ms内返回,但forkjoin-pool(或者自定义线程池)中某个执行线程执行400ms才返回,而其返回值根本没有被使用到。

    实现方案:启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),然后用 acceptEither() 或者 applyToEither 看是先计算完成还是先超时:

    public class FutureUtil {
    
        /**
         * cpu 核心数
         */
        private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    
        // 最大超时时间
        private static final int TIMEOUT_VALUE = 1500;
        // 时间单位
        private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
    
    
        /**
         * Singleton delay scheduler, used only for starting and * cancelling tasks.
         */
        public static final class Delayer {
    
            static final ScheduledThreadPoolExecutor delayer;
    
            /**
             * 异常线程,不做请求处理,只抛出异常
             */
            static {
                delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
                delayer.setRemoveOnCancelPolicy(true);
            }
    
            static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
                return delayer.schedule(command, delay, unit);
            }
    
            static final class DaemonThreadFactory implements ThreadFactory {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    t.setName("CompletableFutureScheduler");
                    return t;
                }
            }
        }
    
        /**
         * 根据服务器cpu自定义线程池
         */
        private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                AVALIABLE_PROCESSORS,
                3 * AVALIABLE_PROCESSORS,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(20),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    
        /**
         * 有返回值的异步
         * @param supplier
         * @param <T>
         * @return
         */
        public static  <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
            return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
        }
    
        /**
         * 有返回值的异步 - 可设置超时时间
         * @param timeout
         * @param unit
         * @param supplier
         * @param <T>
         * @return
         */
        public static  <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
            return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
                    .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                    .exceptionally(throwable -> {
                        throwable.printStackTrace();
                        log.error(throwable.getMessage());
                        return null;
                    });
        }
    
        /**
         * 无返回值的异步
         * @param runnable
         * @return
         */
        public static CompletableFuture runAsync(Runnable runnable){
            return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
        }
    
        /**
         * 无返回值的异步 - 可设置超时时间
         * @param runnable
         * @return
         */
        public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
            return CompletableFuture.runAsync(runnable,threadPoolExecutor)
                    .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                    .exceptionally(throwable -> {
                        throwable.printStackTrace();
                        log.error(throwable.getMessage());
                        return null;
                    });
        }
    
        /**
         * 统一处理异步结果
         * @param futures
         * @return
         */
        public static CompletableFuture allOf(CompletableFuture... futures){
            return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
        }
    
        /**
         * 统一处理异步结果 - 可设置超时时间
         * @param futures
         * @return
         */
        public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
            return CompletableFuture.allOf(futures)
                    .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                    .exceptionally(throwable -> {
                        throwable.printStackTrace();
                        log.error(throwable.getMessage());
                        return null;
                    });
        }
    
        /**
         * 异步超时处理
         * @param timeout
         * @param unit
         * @param <T>
         * @return
         */
        public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
            CompletableFuture<T> result = new CompletableFuture<T>();
            // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
            Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
            return result;
        }
    
        public static <T> CompletableFuture<T> timeoutAfter() {
            CompletableFuture<T> result = new CompletableFuture<T>();
            // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
            Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
            return result;
        }
    
    }
    
    

    使用demo

     CompletableFuture<String> future1    = FutureUtil.supplyAsync(10,TimeUnit.MILLISECONDS,() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "Hello";
            });
    
            CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
                System.out.println("上个任务执行完啦,还把" + a + "传过来");
                return "world";
            });
    
            System.out.println(future3.get());
    
    image.png

    在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。实现原理跟上面是一样的。

    6 线程阻塞问题

    要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。

    CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。

    同步方法(即不带Async后缀的方法)有两种情况。

    • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
    • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

    异步方法(即带Async后缀的方法):

    • 可以选择是否传递线程池参数Executor运行在指定线程池中;
    • 当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。

    例如:

    ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
        //业务操作
        return "";
    }, threadPool1);
    //此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
    future1.thenApply(value -> {
        System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
        return value + "1";
    });
    //使用ForkJoinPool中的共用线程池CommonPool
    future1.thenApplyAsync(value -> {
    //do something
      return value + "1";
    });
    //使用指定线程池
    future1.thenApplyAsync(value -> {
    //do something
      return value + "1";
    }, threadPool1);
    

    7 线程池死锁问题

    前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

    当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

    线程池循环引用会导致死锁

    public Object doGet() {
      ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
      CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
      //do sth
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("child");
            return "child";
          }, threadPool1).join();//子任务
        }, threadPool1);
      return cf1.join();
    }
    

    如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

    为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。

    8 异步RPC调用注意不要阻塞IO线程池

    服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

    相关文章

      网友评论

        本文标题:Java异步任务编排—CompletableFuture(二)

        本文链接:https://www.haomeiwen.com/subject/ombdkdtx.html