美文网首页
Java高并发系列——检视阅读(八)

Java高并发系列——检视阅读(八)

作者: 卡斯特梅的雨伞 | 来源:发表于2021-10-02 17:03 被阅读0次

    Java高并发系列——CompletableFuture

    JUC中工具类CompletableFuture

    CompletableFuture是java8中新增的一个类,算是对Future的一种增强,用起来很方便,也是会经常用到的一个工具类,熟悉一下。

    CompletionStage接口

    • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
    • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
    • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

    CompletableFuture类

    • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
    • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
    • 实现了Future和CompletionStage接口

    常见的方法,熟悉一下:

    runAsync 和 supplyAsync方法

    CompletableFuture 提供了四个静态方法来创建一个异步操作。

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    

    没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

    • runAsync方法不支持返回值。
    • supplyAsync可以支持返回值。

    示例:

    public class CompletableFutureTest1 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest1.runAsync();
            CompletableFutureTest1.supplyAsync();
        }
    
        //runAsync方法不支持返回值 Runnable
        public static void runAsync() throws Exception {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                System.out.println("run end ...");
            });
    
            future.get();
        }
    
        //supplyAsync可以支持返回值 Supplier<U>
        public static void supplyAsync() throws Exception {
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                System.out.println("run end ...");
                return System.currentTimeMillis();
            });
      //如果没有future.get()阻塞等待结果的话,因为CompletableFuture.supplyAsync()方法默认是守护线程形式执行任务,在主线程结束后会跟着退出,
            // 如果传入的是线程池去执行,这不是守护线程,不会导致退出
            long time = future.get();
            System.out.println("time = "+time);
        }
    }
    

    输出:

    run end ...
    run end ...
    time = 1599556248764
    

    计算结果完成时的回调方法

    当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

    public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
    

    可以看到Action的类型是BiConsumer它可以处理正常的计算结果,或者异常情况。

    whenComplete 和 whenCompleteAsync 的区别:

    • whenComplete:当前任务的线程继续执行 whenComplete 的任务。
    • whenCompleteAsync:把 whenCompleteAsync 这个任务继续提交给线程池来进行执行

    示例:

    public class CompletableFutureTest1 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest1.whenComplete();
            CompletableFutureTest1.whenCompleteBySupply();
        }
    
        public static void whenComplete() throws Exception {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                if (new Random().nextInt() % 2 >= 0) {
                    int i = 12 / 0;
                    //run end ...
                    //执行完成!
                    //int i = 12 / 0;
                }
                System.out.println("run end ...");
            });
            //对执行成功或者执行异常做处理的回调方法
            future.whenComplete((avoid, throwable) -> {
                //先判断是否抛异常分开处理
                if (throwable != null) {
                    System.out.println("执行失败!" + throwable.getMessage());
                } else {
                    System.out.println("执行完成!");
                }
            });
            //对执行异常做处理的回调方法
            future.exceptionally(throwable -> {
                        System.out.println("执行失败!" + throwable.getMessage());
                        return null;
                    }
            );
            TimeUnit.SECONDS.sleep(2);
        }
    
        public static void whenCompleteBySupply() throws Exception {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                if (new Random().nextInt() % 2 >= 0) {
                    //int i = 12 / 0;
                    //run end ...
                    //执行完成!
                    //int i = 12 / 0;
                }
                System.out.println("run end ...");
                return "success";
            });
            //whenComplete在thenAccept之前执行
            future.thenAccept(result -> {
                System.out.println(result);
            });
            //对执行成功或者执行异常做处理的回调方法
            future.whenComplete((avoid, throwable) -> {
                //先判断是否抛异常分开处理
                if (throwable != null) {
                    System.out.println("执行失败!" + throwable.getMessage());
                } else {
                    System.out.println("执行完成!");
                }
            });
            //对执行异常做处理的回调方法
            future.exceptionally(throwable -> {
                        System.out.println("执行失败!" + throwable.getMessage());
                        return null;
                    }
            );
            TimeUnit.SECONDS.sleep(2);
        }
        }
    

    输出:

    执行失败!java.lang.ArithmeticException: / by zero
    执行失败!java.lang.ArithmeticException: / by zero
    run end ...
    执行完成!
    success
    

    thenApply 方法

    当一个线程依赖另一个线程时,可以使用 thenApply、thenApplyAsync 方法来把这两个线程串行化。

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    
    Function<? super T,? extends U> 
    T:上一个任务返回结果的类型
    U:当前任务的返回值类型
    

    示例:

    public class CompletableFutureTest2 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest2.thenApply();
        }
        //多个CompletableFuture可以串行执行
        //当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
        //多个任务串行执行,第二个任务依赖第一个任务的结果。
        private static void thenApply() throws Exception {
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                        long result = new Random().nextInt(100);
                        System.out.println("result1=" + result);
                        return result;
                    }
            ).thenApply((t -> {
                long result = t * 5;
                System.out.println("result2=" + result);
                return result;
            }));
            //方式一:阻塞等待结果
            long result = future.get();
            System.out.println("result2: " + result);
            //方式二:调用成功后接收任务的处理结果,并消费处理,无返回结果
            future.thenAccept((r) -> {
                System.out.println("result2: " + r);
            });
        }
    }
    

    输出:

    result1=41
    result2=205
    result2: 205
    result2: 205
    

    handle 方法——可以处理正常和异常情况的thenApply 方法

    handle 是执行任务完成时对结果的处理。
    handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
    

    示例:在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。

    public class CompletableFutureTest3 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.handle();
        }
    
        public static void handle() throws Exception {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    
                @Override
                public Integer get() {
                    int i = 10 / 0;
                    return new Random().nextInt(10);
                }
            }).handle(
                    (param, throwable) -> {
                        int result = -1;
                        if (throwable == null) {
                            result = param * 2;
                        } else {
                            System.out.println(throwable.getMessage());
                        }
                        return result;
                    }
                    /*new BiFunction<Integer, Throwable, Integer>() {
                @Override
                public Integer apply(Integer param, Throwable throwable) {
                    int result = -1;
                    if(throwable==null){
                        result = param * 2;
                    }else{
                        System.out.println(throwable.getMessage());
                    }
                    return result;
                }
            }*/);
            System.out.println(future.get());
        }
    }
    

    输出:

    java.lang.ArithmeticException: / by zero
    -1
    

    thenAccept 消费处理结果——无返回结果

    接收任务的处理结果,并消费处理,无返回结果。

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
    
        public static void main(String[] args) throws Exception {
            //CompletableFutureTest3.handle();
            CompletableFutureTest3.thenAccept();
        }
    
        public static void thenAccept() throws Exception {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                        return new Random().nextInt(10);
                    }
            ).thenAccept(integer -> {
                System.out.println(integer);
            });
            future.get();
        }
    }
       //输出:5 
    

    thenRun 方法——继续执行下一个Runnable任务,不获取上一个任务的处理结果

    跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。

    public CompletionStage<Void> thenRun(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action);
    public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.thenRun();
        }
    
        public static void thenRun() throws Exception{
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return new Random().nextInt(10);
                }
            }).thenRun(() -> {
                System.out.println("thenRun ...");
            });
            future.get();
        }
    }
    //2秒后输出:thenRun ...
    

    thenCombine 合并任务

    thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.thenCombine();
        }
    
        private static void thenCombine() throws Exception {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                return "hello";
            });
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                return "world";
            });
            CompletableFuture<String> result = future1.thenCombine(future2, (result1, result2) -> {
                return result1 + " " + result2;
            });
            System.out.println(result.get());
        }
    }
    //输出:hello world
    

    thenAcceptBoth

    当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。

    public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
    
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.thenAcceptBoth();
            //等待守护进程执行完
            TimeUnit.SECONDS.sleep(5);
        }
    
        private static void thenAcceptBoth() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
                int t = new Random().nextInt(3);
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f1=" + t);
                return t;
            });
    
            CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
                int t = new Random().nextInt(3);
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f2=" + t);
                return t;
            });
            f1.thenAcceptBoth(f2, (result1, result2) -> {
                System.out.println("f1=" + result1 + ";f2=" + result2 + ";");
            });
        }
     }
    
    

    输出:

    f1=1
    f2=1
    f1=1;f2=1;
    

    applyToEither 方法——有返回值消耗

    两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作

    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.applyToEither();
        }
    
        private static void applyToEither() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{
                int t = 1;
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f1="+t);
                return t;
            });
            CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->{
                int t = 2;
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f2="+t);
                return t;
            });
    
            CompletableFuture<Integer> result = f1.applyToEither(f2, (r)->{
                System.out.println(r);
                return r * 2;
            });
            System.out.println(result.get());
        }
    

    输出:

    f1=1
    1
    2
    

    acceptEither 方法——无返回值消耗

    两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。注意,这时候其实两个CompletionStage都是会执行完的,只是我们只获取其中的一个比较快的结果而已,参考示例的输出。

    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
    

    示例:

    public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
            //CompletableFutureTest3.applyToEither();
            CompletableFutureTest3.acceptEither();
            TimeUnit.SECONDS.sleep(5);
        }
    
        private static void acceptEither() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
                int t = new Random().nextInt(3);
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f1=" + t);
                return t;
            });
    
            CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
                int t = new Random().nextInt(3);
                try {
                    TimeUnit.SECONDS.sleep(t);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("f2=" + t);
                return t;
            });
            f1.acceptEither(f2, (t) -> {
                System.out.println(t);
            });
        }
    }
    

    输出:

    f1=1
    1
    f2=2
    

    runAfterEither 方法

    两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable),两个CompletionStage都是会执行完的.

    public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
    

    示例代码

    public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
            //CompletableFutureTest3.applyToEither();
            //CompletableFutureTest3.acceptEither();
            CompletableFutureTest3.runAfterEither();
            TimeUnit.SECONDS.sleep(5);
        }
    
        private static void runAfterEither() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int t = new Random().nextInt(3);
                    try {
                        TimeUnit.SECONDS.sleep(t);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("f1=" + t);
                    return t;
                }
            });
    
            CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int t = new Random().nextInt(3);
                    try {
                        TimeUnit.SECONDS.sleep(t);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("f2=" + t);
                    return t;
                }
            });
            f1.runAfterEither(f2, ()->{
                System.out.println("上面有一个已经完成了。");
            });
        }
    }
    

    输出:

    f1=0
    上面有一个已经完成了。
    f2=1
    

    runAfterBoth

    两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable),注意输出顺序,runAfterBoth方法要等两个CompletionStage都执行完了才会执行。

    public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
    

    示例代码

    public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
            //CompletableFutureTest3.applyToEither();
            //CompletableFutureTest3.acceptEither();
            //CompletableFutureTest3.runAfterEither();
            CompletableFutureTest3.runAfterBoth();
            TimeUnit.SECONDS.sleep(5);
        }
    
        private static void runAfterBoth() throws Exception {
            CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int t = new Random().nextInt(3);
                    try {
                        TimeUnit.SECONDS.sleep(t);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("f1="+t);
                    return t;
                }
            });
    
            CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int t = new Random().nextInt(3);
                    try {
                        TimeUnit.SECONDS.sleep(t);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("f2="+t);
                    return t;
                }
            });
            f1.runAfterBoth(f2, new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("上面两个任务都执行完成了。");
                }
            });
        }
    }
    

    输出:

    f1=1
    f2=2
    上面两个任务都执行完成了。
    

    thenCompose 方法

    thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
    

    示例代码

    public class CompletableFutureTest3 {
        public static void main(String[] args) throws Exception {
            CompletableFutureTest3.thenCompose();
            TimeUnit.SECONDS.sleep(3);
        }
    
        private static void thenCompose() throws Exception {
            CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
                int t = new Random().nextInt(3);
                System.out.println("t1=" + t);
                return t;
            }).thenCompose((param) -> {
                return CompletableFuture.supplyAsync(() -> {
                    int t = param * 2;
                    System.out.println("t2=" + t);
                    return t;
                });
            });
            System.out.println("thenCompose result : " + f.get());
        }
    }
    

    输出:

    t1=1
    t2=2
    thenCompose result : 2
    

    疑问:

    Q:thenAcceptBoth与thenCombine 的区别是什么?

    A:thenAcceptBoth无返回值消耗执行,thenCombine 会有返回值。一般accept都是没有返回值的,apply是有返回值的。

    Q:thenCompose 与thenApply 方法 的区别是什么?不都是串行执行下一个任务,并把第一个任务作为参数传递给第二个任务么?

    thApplythenCompose都是将一个CompletableFuture<Integer>转换为CompletableFuture<String>。不同的是,thenApply中的传入函数的返回值是String,而thenCompose的传入函数的返回值是CompletableFuture<String>

    thenApply与thenCompose的异同

    获取线程执行结果的6种方法

    方式1:Thread的join()方法实现

    代码中通过join方式阻塞了当前主线程,当thread线程执行完毕之后,join方法才会继续执行。

    join的方式,只能阻塞一个线程,如果其他线程中也需要获取thread线程的执行结果,join方法无能为力了。

    示例:

    public class ThreadJoinTest {
        //用于封装结果
        static class Result<T> {
            T result;
    
            public T getResult() {
                return result;
            }
    
            public void setResult(T result) {
                this.result = result;
            }
        }
        public static void main(String[] args) throws InterruptedException {
            Result<String> result = new Result<>();
            Thread t = new Thread(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                result.setResult("success");
                System.out.println("end thread!");
            });
            t.start();
            //让主线程等待thread线程执行完毕之后再继续,join方法会让当前线程阻塞
            t.join();
            System.out.println("main get result="+result.getResult());
        }
    }
    

    输出:

    start thread!
    end thread!
    main get result=success
    

    方式2:CountDownLatch实现

    使用CountDownLatch可以让一个或者多个线程等待一批线程完成之后,自己再继续.

    示例:

    public class CountDownLatchTest2 {
        static class Result<T>{
            private T result;
    
            public T getResult() {
                return result;
            }
    
            public void setResult(T result) {
                this.result = result;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Result<String> result = new Result<>();
            CountDownLatch latch = new CountDownLatch(1);
            Thread t = new Thread(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                    result.setResult("success");
                    System.out.println("end thread!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
            t.start();
            latch.await();
            System.out.println("main get result="+result.getResult());
        }
    }
    

    输出:

    start thread!
    end thread!
    main get result=success
    

    方式3:ExecutorService.submit方法实现——ThreadPoolExecutor

    使用ExecutorService.submit方法实现的,此方法返回一个Futurefuture.get()会让当前线程阻塞,直到Future关联的任务执行完毕。

    示例:

    public class ThreadPoolExecutorTest2 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            Future<String> future = executor.submit(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end thread!");
                return "success";
            });
            executor.shutdown();
            System.out.println("main get result="+future.get());
        }
    }
    

    输出同上。

    方式4:FutureTask方式1——作为Runnable传给Thread执行

    线程池的submit方法传入的Callable对象本质上也是包装成一个FutureTask来执行。

    代码中使用FutureTask实现的,FutureTask实现了Runnable接口,并且内部带返回值,所以可以传递给Thread直接运行,futureTask.get()会阻塞当前线程,直到FutureTask构造方法传递的任务执行完毕,get方法才会返回。

    示例:

    public class FutureTaskTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //创建一个FutureTask
            FutureTask<String> futureTask = new FutureTask<>(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end thread!");
                return "success";
            });
            //将futureTask传递一个线程运行
            new Thread(futureTask).start();
            //futureTask.get()会阻塞当前线程,直到futureTask执行完毕
            String result = futureTask.get();
            System.out.println("main get result=" + result);
        }
    }
    

    方式5:FutureTask方式2——构造FutureTask对象及执行内容,直接在Thread里面跑run方法

    当futureTask的run()方法执行完毕之后,futureTask.get()会从阻塞中返回。

    示例:

    public class FutureTaskTest1 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //创建一个FutureTask
            FutureTask<String> futureTask = new FutureTask<>(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end thread!");
                return "success";
            });
            //将futureTask传递一个线程运行
            new Thread(()->futureTask.run()).start();
            //futureTask.get()会阻塞当前线程,直到futureTask执行完毕
            String result = futureTask.get();
            System.out.println("main get result=" + result);
        }
    }
    

    方式6:CompletableFuture方式实现

    CompletableFuture.supplyAsync可以用来异步执行一个带返回值的任务,调用completableFuture.get()

    会阻塞当前线程,直到任务执行完毕,get方法才会返回。

    public class CompletableFutureTest4 {
    
        public static void main(String[] args) throws Exception {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("start thread!");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end thread!");
                return "success";
            });
            // future.get()会阻塞当前线程直到获得结果
            System.out.println("main get result="+future.get());
        }
    }
    

    高并发中计数器的四种实现方式

    需求:一个jvm中实现一个计数器功能,需保证多线程情况下数据正确性。

    我们来模拟50个线程,每个线程对计数器递增100万次,最终结果应该是5000万。

    我们使用4种方式实现,看一下其性能,然后引出为什么需要使用LongAdderLongAccumulator

    方式一:使用加锁的方式实现——synchronized或Lock

    从示例输出结果看,ReentrantLock的效率明显比synchronized差了2-3倍。

    示例:

    public class SynchronizeCalculator {
        private static long count = 0;
        private static Lock lock = new ReentrantLock();
        public synchronized static void incrment() {
            count++;
        }
    
        public static void incrmentByLock() {
            lock.lock();
            try {
                count++;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 5; i++) {
                count = 0;
                averageTest();
            }
        }
    
        public static void averageTest() throws InterruptedException {
            long t1 = System.currentTimeMillis();
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch latch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                executor.execute(() -> {
                    try {
                        for (int j = 0; j < 1000000; j++) {
                            incrment();
                            //incrmentByLock();
                        }
                    } finally {
                        latch.countDown();
                    }
    
                });
            }
            latch.await();
            long t2 = System.currentTimeMillis();
            System.out.println(String.format("结果:%s,耗时(ms):%s", count, (t2 - t1)));
            executor.shutdown();
        }
    }
    

    输出:

    //synchronized
    结果:50000000,耗时(ms):490
    结果:50000000,耗时(ms):1574
    结果:50000000,耗时(ms):399
    结果:50000000,耗时(ms):395
    结果:50000000,耗时(ms):396
    //lock
    结果:50000000,耗时(ms):1289
    结果:50000000,耗时(ms):1239
    结果:50000000,耗时(ms):1224
    结果:50000000,耗时(ms):1219
    结果:50000000,耗时(ms):1246
    

    方式2:AtomicLong实现

    AtomicLong内部采用CAS的方式实现,并发量大的情况下,CAS失败率比较高,导致性能比synchronized还低一些。并发量不是太大的情况下,CAS性能还是可以的。

    示例:

    public class AtomicLongTest {
        private static AtomicLong count = new AtomicLong(0);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 5; i++) {
                count.set(0);
                averageTest();
            }
        }
    
        public static void averageTest() throws InterruptedException {
            long t1 = System.currentTimeMillis();
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch latch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                executor.execute(() -> {
                    try {
                        for (int j = 0; j < 1000000; j++) {
                            count.getAndIncrement();
                        }
                    } finally {
                        latch.countDown();
                    }
    
                });
            }
            latch.await();
            long t2 = System.currentTimeMillis();
            System.out.println(String.format("结果:%s,耗时(ms):%s", count.get(), (t2 - t1)));
            executor.shutdown();
        }
    }
    

    输出:

    结果:50000000,耗时(ms):1018
    结果:50000000,耗时(ms):1442
    结果:50000000,耗时(ms):1033
    结果:50000000,耗时(ms):935
    结果:50000000,耗时(ms):1320
    

    方式3:LongAdder实现——相当于锁分段技术

    先介绍一下LongAdder,说到LongAdder,不得不提的就是AtomicLong,AtomicLong是JDK1.5开始出现的,里面主要使用了一个long类型的value作为成员变量,然后使用循环的CAS操作去操作value的值,并发量比较大的情况下,CAS操作失败的概率较高,内部失败了会重试,导致耗时可能会增加。

    LongAdder是JDK1.8开始出现的,所提供的API基本上可以替换掉原先的AtomicLong。LongAdder在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字,然后交给多个人去管控,每个管控者负责保证部分数字在多线程情况下操作的正确性。当多线程访问的时,通过hash算法映射到具体管控者去操作数据,最后再汇总所有的管控者的数据,得到最终结果。相当于降低了并发情况下锁的粒度,所以效率比较高,看一下下面的图,方便理解:

    image.png

    示例:

    代码中new LongAdder()创建一个LongAdder对象,内部数字初始值是0,调用increment()方法可以对LongAdder内部的值原子递增1。reset()方法可以重置LongAdder的值,使其归0。

    public class LongAdderTest {
        private static LongAdder count = new LongAdder();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 5; i++) {
                count.reset();
                averageTest();
            }
        }
    
        public static void averageTest() throws InterruptedException {
            long t1 = System.currentTimeMillis();
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch latch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                executor.execute(() -> {
                    try {
                        for (int j = 0; j < 1000000; j++) {
                            count.increment();
                        }
                    } finally {
                        latch.countDown();
                    }
    
                });
            }
            latch.await();
            long t2 = System.currentTimeMillis();
            System.out.println(String.format("结果:%s,耗时(ms):%s", count.sum(), (t2 - t1)));
            executor.shutdown();
        }
    }
    

    输出:

    结果:50000000,耗时(ms):209
    结果:50000000,耗时(ms):133
    结果:50000000,耗时(ms):149
    结果:50000000,耗时(ms):146
    结果:50000000,耗时(ms):148
    

    方式4:LongAccumulator实现

    LongAccumulator介绍

    LongAccumulator是LongAdder的功能增强版。LongAdder的API只有对数值的加减,而LongAccumulator提供了自定义的函数操作,其构造函数如下:

    /**
      * accumulatorFunction:需要执行的二元函数(接收2个long作为形参,并返回1个long)
      * identity:初始值
     **/
    public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }
    

    示例:

    LongAccumulator的效率和LongAdder差不多,不过更灵活一些。

    调用new LongAdder()等价于new LongAccumulator((x, y) -> x + y, 0L)

    从上面4个示例的结果来看,LongAdder、LongAccumulator全面超越同步锁及AtomicLong的方式,建议在使用AtomicLong的地方可以直接替换为LongAdder、LongAccumulator吞吐量更高一些。

    public class LongAccumulatorTest {
        private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 0);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 5; i++) {
                count.reset();
                averageTest();
            }
        }
    
        public static void averageTest() throws InterruptedException {
            long t1 = System.currentTimeMillis();
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch latch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                executor.execute(() -> {
                    try {
                        for (int j = 0; j < 1000000; j++) {
                            count.accumulate(1);
                        }
                    } finally {
                        latch.countDown();
                    }
    
                });
            }
            latch.await();
            long t2 = System.currentTimeMillis();
            System.out.println(String.format("结果:%s,耗时(ms):%s", count.longValue(), (t2 - t1)));
            executor.shutdown();
        }
    }
    

    输出:

    结果:50000000,耗时(ms):152
    结果:50000000,耗时(ms):148
    结果:50000000,耗时(ms):137
    结果:50000000,耗时(ms):137
    结果:50000000,耗时(ms):144
    
    疑问:

    Q:LongAccumulator.reset方法并不能重置重置LongAccumulator的identity:初始值正确,使其恢复原来的初始值。当初始值为0是不会发生这个问题,而当我们设置初始值如1时,就会导致后续的计算操作增加了5份初始值,目前猜测原因是因为代码中LongAccumulator在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字 ,而初始化的时候也是初始化了多份数据,导致初始值叠加了多份。不知道这是个bug么?待解惑。

    在此记录下来希望有遇到这种情况的同学注意。解决方法便是要么初始值identity=0不会有这种问题;或者有需要使用reset方法重置的改为重新创建个LongAccumulator处理。

    源码:

    public void reset() {
        Cell[] as = cells; Cell a;
        base = identity;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    //对多个cell进行初始值赋值导致后面计算叠加了多份初始值
                    a.value = identity;
            }
        }
    }
    

    示例:

    public class LongAccumulatorTest {
        //设置初始值为1查看输出结果
        private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 1);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 5; i++) {
                count.reset();
                averageTest();
            }
        }
    
        public static void averageTest() throws InterruptedException {
            long t1 = System.currentTimeMillis();
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch latch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                executor.execute(() -> {
                    try {
                        for (int j = 0; j < 1000000; j++) {
                            count.accumulate(1);
                        }
                    } finally {
                        latch.countDown();
                    }
    
                });
            }
            latch.await();
            long t2 = System.currentTimeMillis();
            System.out.println(String.format("结果:%s,耗时(ms):%s", count.longValue(), (t2 - t1)));
            executor.shutdown();
        }
    }
    

    输出:这时候你会发现只有第一次计算是正确的,只要使用了rest方法重置就会导致这个错误。

    结果:50000001,耗时(ms):185
    结果:50000005,耗时(ms):143
    结果:50000005,耗时(ms):139
    结果:50000005,耗时(ms):162
    结果:50000005,耗时(ms):142
    

    演示公平锁和非公平锁

    先理解一下什么是公平锁、非公平锁?

    公平锁和非公平锁体现在别人释放锁的一瞬间,如果前面已经有排队的,新来的是否可以插队,如果可以插队表示是非公平的,如果不可用插队,只能排在最后面,是公平的方式。

    示例:

    @Slf4j
    public class ReentrantLockTest2 {
    
        public static void main(String[] args) throws InterruptedException {
            ReentrantLockTest2.LockTest(false);
            TimeUnit.SECONDS.sleep(4);
            log.error("-------------------------------");
            ReentrantLockTest2.LockTest(true);
        }
    
        public static void LockTest(boolean fair) throws InterruptedException {
            ReentrantLock lock = new ReentrantLock(fair);
            new Thread(() -> {
                lock.lock();
                try {
                    log.error(Thread.currentThread().getName() + " start!");
                    TimeUnit.SECONDS.sleep(3);
                    new Thread(() -> {
                        //注意线程池要当前线程创建的才能使用,如果传给新开的线程会获取不到线程池
                        test("后到组",lock);
                    }).start();
                    //test(executorAfter,lock);
                    log.error(Thread.currentThread().getName() + "end!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "Hold Lock 4 Test Thread").start();
            test("先到组",lock);
            TimeUnit.SECONDS.sleep(3);
        }
    
        private static void test(String name,Lock lock){
            //自定义包含策略
            ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                    new DemoThreadFactory(name), new ThreadPoolExecutor.AbortPolicy());
            for (int i = 0; i < 10; i++) {
                executor.execute(() -> {
                    lock.lock();
                    try {
                        log.error("获取到锁!");
                    } finally {
                        lock.unlock();
                    }
                });
            }
            executor.shutdown();
        }
    }
    

    输出:

    14:45:23.204 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
    14:45:26.211 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
    14:45:26.211 [From DemoThreadFactory's 先到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.211 [From DemoThreadFactory's 先到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 后到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.212 [From DemoThreadFactory's 先到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.213 [From DemoThreadFactory's 后到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.218 [From DemoThreadFactory's 后到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.218 [From DemoThreadFactory's 先到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.218 [From DemoThreadFactory's 后到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.218 [From DemoThreadFactory's 后到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.219 [From DemoThreadFactory's 后到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.219 [From DemoThreadFactory's 后到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.219 [From DemoThreadFactory's 后到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.219 [From DemoThreadFactory's 后到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:26.219 [From DemoThreadFactory's 后到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:30.205 [main] ERROR com.self.current.ReentrantLockTest2 - -------------------------------
    14:45:30.205 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
    14:45:33.206 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
    14:45:33.206 [From DemoThreadFactory's 先到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.206 [From DemoThreadFactory's 先到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.209 [From DemoThreadFactory's 先到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.209 [From DemoThreadFactory's 先到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.209 [From DemoThreadFactory's 先到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.209 [From DemoThreadFactory's 先到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 先到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 先到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 先到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 先到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 后到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.210 [From DemoThreadFactory's 后到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.211 [From DemoThreadFactory's 后到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.212 [From DemoThreadFactory's 后到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    14:45:33.212 [From DemoThreadFactory's 后到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
    

    google提供的一些好用的并发工具类

    关于并发方面的,juc已帮我们提供了很多好用的工具,而谷歌在此基础上做了扩展,使并发编程更容易,这些工具放在guava.jar包中。

    guava maven配置

    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>27.0-jre</version>
    </dependency>
    

    guava中常用几个类

    MoreExecutors:提供了一些静态方法,是对juc中的Executors类的一个扩展。
    Futures:也提供了很多静态方法,是对juc中Future的一个扩展。

    案例1:异步执行任务完毕之后回调——相当于CompletableFuture的whenComplete方法

    ListeningExecutorService接口继承于juc中的ExecutorService接口,对ExecutorService做了一些扩展,看其名字中带有Listening,说明这个接口自带监听的功能,可以监听异步执行任务的结果。通过MoreExecutors.listeningDecorator创建一个ListeningExecutorService对象,需传递一个ExecutorService参数,传递的ExecutorService负责异步执行任务。

    ListeningExecutorServicesubmit方法用来异步执行一个任务,返回ListenableFutureListenableFuture接口继承于juc中的Future接口,对Future做了扩展,使其带有监听的功能。调用submit.addListener可以在执行的任务上添加监听器,当任务执行完毕之后会回调这个监听器中的方法。

    ListenableFutureget方法会阻塞当前线程直到任务执行完毕

    另一种回调方式是通过调用Futures的静态方法addCallback在异步执行的任务中添加回调,回调的对象是一个FutureCallback,此对象有2个方法,任务执行成功调用onSuccess,执行失败调用onFailure

    失败的情况可以将代码中int i = 10 / 0;注释去掉,执行一下可以看看效果。

    示例:

    @Slf4j
    public class GuavaTest {
    
        //相当于CompletableFuture的whenComplete方法
        public static void main1(String[] args) throws ExecutionException, InterruptedException {
            //创建一个线程池
            ExecutorService delegate = Executors.newFixedThreadPool(5);
            try {
                ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
                //异步执行一个任务
                ListenableFuture<Integer> submit = executorService.submit(() -> {
                    log.error("{}", System.currentTimeMillis());
                    //休眠2秒,默认耗时
                    TimeUnit.SECONDS.sleep(2);
                    log.error("{}", System.currentTimeMillis());
                    return 10;
                });
                //当任务执行完毕之后回调对应的方法
                submit.addListener(() -> {
                    log.error("任务执行完毕了,我被回调了");
                }, MoreExecutors.directExecutor());
                log.error("{}", submit.get());
            } finally {
                delegate.shutdown();
            }
        }
    
        //相当于CompletableFuture的whenComplete方法
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
                    new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    
    
            ListeningExecutorService service = MoreExecutors.listeningDecorator(executor);
            try {
                ListenableFuture<Integer> future = service.submit(() -> {
                    log.error("{}", System.currentTimeMillis());
                    //休眠2秒,默认耗时
                    TimeUnit.SECONDS.sleep(2);
                    //int i = 10 / 0;
                    log.error("{}", System.currentTimeMillis());
                    return 10;
                });
                Futures.addCallback(future, new FutureCallback<Integer>() {
                    @Override
                    public void onSuccess(Integer integer) {
                        log.error("执行成功:{}", integer);
                    }
    
                    @Override
                    public void onFailure(Throwable throwable) {
                        log.error("执行失败:{}", throwable.getMessage());
                    }
                });
                log.error("{}", future.get());
            } finally {
                service.shutdown();
            }
    
        }
    }
    

    输出:

    15:32:54.480 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 1599809574477
    15:32:56.487 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 1599809576487
    15:32:56.488 [main] ERROR com.self.current.GuavaTest - 10
    15:32:56.488 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 执行成功:10
    

    示例2:获取一批异步任务的执行结果——Futures.allAsList(futureList).get()

    结果中按顺序输出了6个异步任务的结果,此处用到了Futures.allAsList方法,看一下此方法的声明:

    public static <V> ListenableFuture<List<V>> allAsList(
          Iterable<? extends ListenableFuture<? extends V>> futures)
    

    传递一批ListenableFuture,返回一个ListenableFuture<List<V>>,内部将一批结果转换为了一个ListenableFuture对象。

    示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.error("star");
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            List<ListenableFuture<Integer>> futureList = new ArrayList<>();
            for (int i = 5; i >= 0; i--) {
                int j = i;
                futureList.add(executorService.submit(() -> {
                    TimeUnit.SECONDS.sleep(j);
                    return j;
                }));
            }
            //把多个ListenableFuture转换为一个ListenableFuture
            //ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(futureList);
            //获取一批任务的执行结果
            List<Integer> resultList = Futures.allAsList(futureList).get();
            //输出
            resultList.forEach(item -> {
                log.error("{}", item);
            });
        } finally {
            delegate.shutdown();
        }
    }
    

    输出:

    15:45:51.160 [main] ERROR com.self.current.GuavaTest - star
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 5
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 4
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 3
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 2
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 1
    15:45:56.185 [main] ERROR com.self.current.GuavaTest - 0
    

    示例3:一批任务异步执行完毕之后回调——包装futureList传递给Futures.addCallback 方法

    异步执行一批任务,最后计算其和,代码中异步执行了一批任务,所有任务完成之后,回调了上面的onSuccess方法,内部对所有的结果进行sum操作。

    示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.error("start");
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            List<ListenableFuture<Integer>> futureList = new ArrayList<>();
            for (int i = 5; i >= 0; i--) {
                int j = i;
                futureList.add(executorService.submit(() -> {
                    TimeUnit.SECONDS.sleep(j);
                    return j;
                }));
            }
            //把多个ListenableFuture转换为一个ListenableFuture
            ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(futureList);
            Futures.addCallback(listListenableFuture, new FutureCallback<List<Integer>>() {
                @Override
                public void onSuccess(List<Integer> result) {
                  log.error("result中所有结果之和:"+result.stream().reduce(Integer::sum).get());
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    log.error("执行任务发生异常:" + throwable.getMessage(), throwable);
                }
            });
        } finally {
            delegate.shutdown();
        }
    }
    

    输出:

    15:57:00.539 [main] ERROR com.self.current.GuavaTest - start
    15:57:05.560 [pool-2-thread-1] ERROR com.self.current.GuavaTest - result中所有结果之和:15
    
    其他疑问:

    Q:运行下面这个例子结束不了,debug倒是可以,这是为什么呢?Thread[Monitor Ctrl-Break,5,main]是哪来的?

    public class VolatileTest1 {
    
        public static volatile int num = 0;
    
        public  static void add(){
            num++;
        }
    
        public static void main(String[] args) throws InterruptedException {
            Thread[] threads = new Thread[10];
            for (Thread thread : threads) {
                thread = new Thread(()->{
                    for (int i = 0; i < 1000; i++) {
                        VolatileTest1.add();
                    }
                });
                thread.start();
                thread.join();
            }
            //2
            //java.lang.ThreadGroup[name=main,maxpri=10]
            //    Thread[main,5,main]
            //    Thread[Monitor Ctrl-Break,5,main]
            //结束不了,debug倒是可以,这是为什么呢?Thread[Monitor Ctrl-Break,5,main]是哪来的?
            while (Thread.activeCount() >1){
                Thread.yield();
                System.out.println(Thread.activeCount());
                ThreadGroup parent = Thread.currentThread().getThreadGroup();
                parent.list();
            }
            System.out.println("num="+num);
        }
    }
    

    其他

    AQS原理没讲,需要找资料补充。

    JUC中常见的集合原来没讲,比如ConcurrentHashMap最常用的,后面的都很泛,没有深入,虎头蛇尾。

    阻塞队列讲得不够深入。

    参考

    java高并发系列

    liaoxuefeng Java教程 CompletableFuture

    相关文章

      网友评论

          本文标题:Java高并发系列——检视阅读(八)

          本文链接:https://www.haomeiwen.com/subject/wmujnltx.html