美文网首页
java多线程并发异步编程,高效利用JUC+lambda

java多线程并发异步编程,高效利用JUC+lambda

作者: 木木子丶 | 来源:发表于2020-12-28 17:04 被阅读0次

    前言

    当我们的业务变得复杂,需求导致我们需要开多线程去做业务执行,通常我们使用的是Executors去创建+RetreenLock+原子类atomic去操作多个线程间的协作,但当业务更具象化时我们需要对线程存在依赖关系,组合,排序,并发,串行等操作,这时候我们可以用RetreenLock的Condition,用变量来控制unlock去决定是否放开与执行,但终究还是太麻烦,所以今天我整理了CompletableFuture类来操作线程,极大的简化了我们的操作

    功能

    • 支持supplier(callable)与runnable的组合
    • 串行
    • 并行
    • 多任务组合
    • 取消线程任务
    • 任务结果获取与动态判断执行顺序
    • ..

    入门(创建)

     /**
         * Returns a new CompletableFuture that is asynchronously completed
         * by a task running in the {@link ForkJoinPool#commonPool()} with
         * the value obtained by calling the given Supplier.
         *
         * @param supplier a function returning the value to be used
         * to complete the returned CompletableFuture
         * @param <U> the function's return type
         * @return the new CompletableFuture
         */
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
    
         /**
         * Returns a new CompletableFuture that is asynchronously completed
         * by a task running in the {@link ForkJoinPool#commonPool()} after
         * it runs the given action.
         *
         * @param runnable the action to run before completing the
         * returned CompletableFuture
         * @return the new CompletableFuture
         */
        public static CompletableFuture<Void> runAsync(Runnable runnable) {
            return asyncRunStage(asyncPool, runnable);
        }   
        
    

    使用runnable或supplier来创建异步任务

        /**
         * 例子
         */
        @Test
        public void test() {
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程A方法执行");
                return "线程A方法返回";
            });
    
    
             CompletableFuture.runAsync(() -> {
                System.out.println("线程B方法执行");
            });
        
        }
    

    串行

    串行执行任务,不关心之前任务结果

        /**
         * Returns a new CompletionStage that, when this stage completes
         * normally, executes the given action using this stage's default
         * asynchronous execution facility.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param action the action to perform before completing the
         * returned CompletionStage
         * @return the new CompletionStage
         */
        public CompletionStage<Void> thenRunAsync(Runnable action);
    
        /**
         * @param :
         * @desc : 串行,不关心上一个任务结果(无返回值)
         * @dete : 2020/12/29 3:01 下午
         * @Return:
         * @author: 徐子木
         */
        @Test
        public void test1() {
    
            CompletableFuture.supplyAsync(() -> {
                return "线程A返回";
            }).thenRunAsync(() -> {
                System.out.println("线程B执行");
            });
    
            //线程B执行
        }
    

    串行执行任务,依赖上一个任务结果(无返回值)

        /**
         * Returns a new CompletionStage that, when this stage completes
         * normally, is executed using this stage's default asynchronous
         * execution facility, with this stage's result as the argument to
         * the supplied action.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param action the action to perform before completing the
         * returned CompletionStage
         * @return the new CompletionStage
         */
        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    
        /**
         * @param :
         * @desc : 串行,依赖上一个任务结果,无返回值
         * @dete : 2020/12/29 3:04 下午
         * @Return:
         * @author: 徐子木
         */
        @Test
        public void test2() {
            CompletableFuture.supplyAsync(() -> {
                return "线程A返回结果";
            }).thenAcceptAsync(a -> {
                System.out.println(a + "/ B线程执行");
            });
    
            //线程A返回结果/ B线程执行
    
    

    串行执行任务,依赖上一个任务结果(有返回值)

       /**
         * Returns a new CompletableFuture that is asynchronously completed
         * by a task running in the {@link ForkJoinPool#commonPool()} with
         * the value obtained by calling the given Supplier.
         *
         * @param supplier a function returning the value to be used
         * to complete the returned CompletableFuture
         * @param <U> the function's return type
         * @return the new CompletableFuture
         */
        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
            return asyncSupplyStage(asyncPool, supplier);
        }
    
        /**
         * @param :
         * @desc : 串行,依赖上一个任务结果,有返回值
         * @dete : 2020/12/29 3:09 下午
         * @Return:
         * @author: 徐子木
         */
        @Test
        public void test3() {
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "线程A返回结果";
            }).thenApplyAsync(a -> {
                return a + "/ b方法返回结果";
            });
    
            System.out.println(future.join());
    
            //线程A返回结果/ b方法返回结果
        }
    

    串行,依赖上一个结果,返回一个常量任务

        /**
         * Returns a new CompletionStage that, when this stage completes
         * normally, is executed using this stage's default asynchronous
         * execution facility, with this stage as the argument to the
         * supplied function.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param fn the function returning a new CompletionStage
         * @param <U> the type of the returned CompletionStage's result
         * @return the CompletionStage
         */
        public <U> CompletionStage<U> thenComposeAsync
            (Function<? super T, ? extends CompletionStage<U>> fn);
    
        /**
         * @param : thenCompose
         * @desc : 串行,依赖上一个结果,返回一个常量任务
         * @dete : 2020/12/29 3:14 下午
         * @Return: 类似thenApplay, 区别是返回值是completionStage, thenApply返回值是T, 提供该方法为了和其他Future更好的使用
         * @author: 徐子木
         */
        @Test
        public void test4() {
    
            CompletableFuture<String> f = CompletableFuture.completedFuture("常量值A");
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "线程A返回结果";
            }).thenComposeAsync(a -> {
                System.out.println("线程B执行/ " + a);
                return f;
            });
    
            System.out.println(future.join());
    
            //线程B执行/ 线程A返回结果
            //常量值A
        }
    

    并行

    不依赖前面任务结果,无返回值

       /**
         * Returns a new CompletionStage that, when this and the other
         * given stage complete normally, executes the given action using
         * this stage's default asynchronous execution facility.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param other the other CompletionStage
         * @param action the action to perform before completing the
         * returned CompletionStage
         * @return the new CompletionStage
         */
        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,
                                                       Runnable action);
    
       /**
         * @param :
         * @desc : 并行,不依赖前任务结果,无返回值
         * @dete : 2020/12/29 3:21 下午
         * @Return:
         * @author: 徐子木
         */
        @Test
        public void test5() {
    
            CompletableFuture<String> f = CompletableFuture.completedFuture("常量值A");
    
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程A执行");
                return "线程A返回结果";
            }).runAfterBothAsync(f, () -> {
                System.out.println("线程B执行");
            });
    
            //线程B执行
        }
    

    并行,多个线程选其最快

        /**
         * Returns a new CompletionStage that, when either this or the
         * other given stage complete normally, executes the given action.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param other the other CompletionStage
         * @param action the action to perform before completing the
         * returned CompletionStage
         * @return the new CompletionStage
         */
        public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
                                                    Runnable action);
    
       /**
         * @param :
         * @desc : 并行, 谁先执行完则会触发下一任务(!二者选其一)
         * @dete : 2020/12/29 3:28 下午
         * @Return: 无返回值
         * @author: 徐子木
         */
        @Test
        public void test6() {
            CompletableFuture<Void> futureA = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("线程A任务执行");
            });
    
            CompletableFuture.runAsync(() -> {
                System.out.println("线程B任务执行");
            }).runAfterEither(futureA, () -> {
                System.out.println("最先执行完");
            });
            //线程B任务执行
            //最先执行完
        }
    
    

    并行,多个线程谁执行最快选其一,并执行他

      /**
         * Returns a new CompletionStage that, when either this or the
         * other given stage complete normally, is executed using this
         * stage's default asynchronous execution facility, with the
         * corresponding result as argument to the supplied action.
         *
         * See the {@link CompletionStage} documentation for rules
         * covering exceptional completion.
         *
         * @param other the other CompletionStage
         * @param action the action to perform before completing the
         * returned CompletionStage
         * @return the new CompletionStage
         */
        public CompletionStage<Void> acceptEitherAsync
            (CompletionStage<? extends T> other,
             Consumer<? super T> action);
    
        /**
         * @param :
         * @desc : 并行,谁执行最快则触发下一任务,并执行他
         * @dete : 2020/12/29 3:58 下午
         * @Return: 同理applyToEitherAsync 有返回值
         * @author: 徐子木
         */
        @Test
        public void test7() {
    
            CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                return "线程A执行结果";
            });
    
            CompletableFuture.supplyAsync(() -> {
                return "线程B执行结果";
            }).acceptEitherAsync(futureA, x -> {
                System.out.println(x);
            });
    
            //线程B执行结果
        }
    

    异常

    如果前有任务触发异常,则会拦截到并处理

        /**
         * Returns a new CompletableFuture that is completed when this
         * CompletableFuture completes, with the result of the given
         * function of the exception triggering this CompletableFuture's
         * completion when it completes exceptionally; otherwise, if this
         * CompletableFuture completes normally, then the returned
         * CompletableFuture also completes normally with the same value.
         * Note: More flexible versions of this functionality are
         * available using methods {@code whenComplete} and {@code handle}.
         *
         * @param fn the function to use to compute the value of the
         * returned CompletableFuture if this CompletableFuture completed
         * exceptionally
         * @return the new CompletableFuture
         */
        public CompletableFuture<T> exceptionally(
            Function<Throwable, ? extends T> fn) {
            return uniExceptionallyStage(fn);
        }
    
      /**
         * @param :
         * @desc : 如果之前的任务有异常,则调用exceptionally
         * @dete : 2020/12/29 4:04 下午
         * @Return: 同理handleAsync 可以处理上一任务的返回值
         * @author: 徐子木
         */
        @Test
        public void test8() {
    
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                return "线程A执行结果";
            }).thenApplyAsync(a -> {
                if (true)
                    throw new RuntimeException("线程B抛出错误");
                return "线程B执行结果";
            }).exceptionally(e -> {
                e.printStackTrace();
                return "┭┮﹏┭┮";
            });
    
            System.out.println(future.join());
    
            //java.lang.RuntimeException: 线程B抛出错误
    
            Lock lock = new ReentrantLock();
    
    
        }
    

    任务组合

    allOf 所有任务需要执行完毕才会触发下一个任务的执行

        /**
         * Returns a new CompletableFuture that is completed when all of
         * the given CompletableFutures complete.  If any of the given
         * CompletableFutures complete exceptionally, then the returned
         * CompletableFuture also does so, with a CompletionException
         * holding this exception as its cause.  Otherwise, the results,
         * if any, of the given CompletableFutures are not reflected in
         * the returned CompletableFuture, but may be obtained by
         * inspecting them individually. If no CompletableFutures are
         * provided, returns a CompletableFuture completed with the value
         * {@code null}.
         *
         * <p>Among the applications of this method is to await completion
         * of a set of independent CompletableFutures before continuing a
         * program, as in: {@code CompletableFuture.allOf(c1, c2,
         * c3).join();}.
         *
         * @param cfs the CompletableFutures
         * @return a new CompletableFuture that is completed when all of the
         * given CompletableFutures complete
         * @throws NullPointerException if the array or any of its elements are
         * {@code null}
         */
        public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
            return andTree(cfs, 0, cfs.length - 1);
        }
    

    anyOf 任意一任务执行完毕都会触发下一个任务的执行

      /**
         * Returns a new CompletableFuture that is completed when any of
         * the given CompletableFutures complete, with the same result.
         * Otherwise, if it completed exceptionally, the returned
         * CompletableFuture also does so, with a CompletionException
         * holding this exception as its cause.  If no CompletableFutures
         * are provided, returns an incomplete CompletableFuture.
         *
         * @param cfs the CompletableFutures
         * @return a new CompletableFuture that is completed with the
         * result or exception of any of the given CompletableFutures when
         * one completes
         * @throws NullPointerException if the array or any of its elements are
         * {@code null}
         */
        public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
            return orTree(cfs, 0, cfs.length - 1);
        }
    

    取消执行线程与其他功能

    cancel

        /**
         * If not already completed, completes this CompletableFuture with
         * a {@link CancellationException}. Dependent CompletableFutures
         * that have not already completed will also complete
         * exceptionally, with a {@link CompletionException} caused by
         * this {@code CancellationException}.
         *
         * @param mayInterruptIfRunning this value has no effect in this
         * implementation because interrupts are not used to control
         * processing.
         *
         * @return {@code true} if this task is now cancelled
         */
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = (result == null) &&
                internalComplete(new AltResult(new CancellationException()));
            postComplete();
            return cancelled || isCancelled();
        }
    

    是否取消

        /**
         * Returns {@code true} if this CompletableFuture was cancelled
         * before it completed normally.
         *
         * @return {@code true} if this CompletableFuture was cancelled
         * before it completed normally
         */
        public boolean isCancelled() {
            Object r;
            return ((r = result) instanceof AltResult) &&
                (((AltResult)r).ex instanceof CancellationException);
        }
    

    ...

    本人选取了一些比较常用的,还有其他不完善的功能大家可以去看源码自行探索

    希望能对同学们能有帮助,随手点个赞再走吧,球球了

    相关文章

      网友评论

          本文标题:java多线程并发异步编程,高效利用JUC+lambda

          本文链接:https://www.haomeiwen.com/subject/ppqqoktx.html