Java高并发系列——CompletableFuture
JUC中工具类CompletableFuture
CompletableFuture是java8中新增的一个类,算是对Future的一种增强,用起来很方便,也是会经常用到的一个工具类,熟悉一下。
CompletionStage接口
- CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
CompletableFuture类
- 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了Future和CompletionStage接口
常见的方法,熟悉一下:
runAsync 和 supplyAsync方法
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync方法不支持返回值。
- supplyAsync可以支持返回值。
示例:
public class CompletableFutureTest1 {
public static void main(String[] args) throws Exception {
CompletableFutureTest1.runAsync();
CompletableFutureTest1.supplyAsync();
}
//runAsync方法不支持返回值 Runnable
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
});
future.get();
}
//supplyAsync可以支持返回值 Supplier<U>
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});
//如果没有future.get()阻塞等待结果的话,因为CompletableFuture.supplyAsync()方法默认是守护线程形式执行任务,在主线程结束后会跟着退出,
// 如果传入的是线程池去执行,这不是守护线程,不会导致退出
long time = future.get();
System.out.println("time = "+time);
}
}
输出:
run end ...
run end ...
time = 1599556248764
计算结果完成时的回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer它可以处理正常的计算结果,或者异常情况。
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:当前任务的线程继续执行 whenComplete 的任务。
- whenCompleteAsync:把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
示例:
public class CompletableFutureTest1 {
public static void main(String[] args) throws Exception {
CompletableFutureTest1.whenComplete();
CompletableFutureTest1.whenCompleteBySupply();
}
public static void whenComplete() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
//run end ...
//执行完成!
//int i = 12 / 0;
}
System.out.println("run end ...");
});
//对执行成功或者执行异常做处理的回调方法
future.whenComplete((avoid, throwable) -> {
//先判断是否抛异常分开处理
if (throwable != null) {
System.out.println("执行失败!" + throwable.getMessage());
} else {
System.out.println("执行完成!");
}
});
//对执行异常做处理的回调方法
future.exceptionally(throwable -> {
System.out.println("执行失败!" + throwable.getMessage());
return null;
}
);
TimeUnit.SECONDS.sleep(2);
}
public static void whenCompleteBySupply() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
//int i = 12 / 0;
//run end ...
//执行完成!
//int i = 12 / 0;
}
System.out.println("run end ...");
return "success";
});
//whenComplete在thenAccept之前执行
future.thenAccept(result -> {
System.out.println(result);
});
//对执行成功或者执行异常做处理的回调方法
future.whenComplete((avoid, throwable) -> {
//先判断是否抛异常分开处理
if (throwable != null) {
System.out.println("执行失败!" + throwable.getMessage());
} else {
System.out.println("执行完成!");
}
});
//对执行异常做处理的回调方法
future.exceptionally(throwable -> {
System.out.println("执行失败!" + throwable.getMessage());
return null;
}
);
TimeUnit.SECONDS.sleep(2);
}
}
输出:
执行失败!java.lang.ArithmeticException: / by zero
执行失败!java.lang.ArithmeticException: / by zero
run end ...
执行完成!
success
thenApply 方法
当一个线程依赖另一个线程时,可以使用 thenApply、thenApplyAsync 方法来把这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
示例:
public class CompletableFutureTest2 {
public static void main(String[] args) throws Exception {
CompletableFutureTest2.thenApply();
}
//多个CompletableFuture可以串行执行
//当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
//多个任务串行执行,第二个任务依赖第一个任务的结果。
private static void thenApply() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long result = new Random().nextInt(100);
System.out.println("result1=" + result);
return result;
}
).thenApply((t -> {
long result = t * 5;
System.out.println("result2=" + result);
return result;
}));
//方式一:阻塞等待结果
long result = future.get();
System.out.println("result2: " + result);
//方式二:调用成功后接收任务的处理结果,并消费处理,无返回结果
future.thenAccept((r) -> {
System.out.println("result2: " + r);
});
}
}
输出:
result1=41
result2=205
result2: 205
result2: 205
handle 方法——可以处理正常和异常情况的thenApply 方法
handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例:在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.handle();
}
public static void handle() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int i = 10 / 0;
return new Random().nextInt(10);
}
}).handle(
(param, throwable) -> {
int result = -1;
if (throwable == null) {
result = param * 2;
} else {
System.out.println(throwable.getMessage());
}
return result;
}
/*new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable==null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
}*/);
System.out.println(future.get());
}
}
输出:
java.lang.ArithmeticException: / by zero
-1
thenAccept 消费处理结果——无返回结果
接收任务的处理结果,并消费处理,无返回结果。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.handle();
CompletableFutureTest3.thenAccept();
}
public static void thenAccept() throws Exception {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}
).thenAccept(integer -> {
System.out.println(integer);
});
future.get();
}
}
//输出:5
thenRun 方法——继续执行下一个Runnable任务,不获取上一个任务的处理结果
跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenRun();
}
public static void thenRun() throws Exception{
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future.get();
}
}
//2秒后输出:thenRun ...
thenCombine 合并任务
thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenCombine();
}
private static void thenCombine() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "world";
});
CompletableFuture<String> result = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " " + result2;
});
System.out.println(result.get());
}
}
//输出:hello world
thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenAcceptBoth();
//等待守护进程执行完
TimeUnit.SECONDS.sleep(5);
}
private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.thenAcceptBoth(f2, (result1, result2) -> {
System.out.println("f1=" + result1 + ";f2=" + result2 + ";");
});
}
}
输出:
f1=1
f2=1
f1=1;f2=1;
applyToEither 方法——有返回值消耗
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.applyToEither();
}
private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{
int t = 1;
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(()->{
int t = 2;
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
});
CompletableFuture<Integer> result = f1.applyToEither(f2, (r)->{
System.out.println(r);
return r * 2;
});
System.out.println(result.get());
}
输出:
f1=1
1
2
acceptEither 方法——无返回值消耗
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。注意,这时候其实两个CompletionStage都是会执行完的,只是我们只获取其中的一个比较快的结果而已,参考示例的输出。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
CompletableFutureTest3.acceptEither();
TimeUnit.SECONDS.sleep(5);
}
private static void acceptEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.acceptEither(f2, (t) -> {
System.out.println(t);
});
}
}
输出:
f1=1
1
f2=2
runAfterEither 方法
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable),两个CompletionStage都是会执行完的.
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例代码
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
//CompletableFutureTest3.acceptEither();
CompletableFutureTest3.runAfterEither();
TimeUnit.SECONDS.sleep(5);
}
private static void runAfterEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
f1.runAfterEither(f2, ()->{
System.out.println("上面有一个已经完成了。");
});
}
}
输出:
f1=0
上面有一个已经完成了。
f2=1
runAfterBoth
两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable),注意输出顺序,runAfterBoth方法要等两个CompletionStage都执行完了才会执行。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
示例代码
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
//CompletableFutureTest3.acceptEither();
//CompletableFutureTest3.runAfterEither();
CompletableFutureTest3.runAfterBoth();
TimeUnit.SECONDS.sleep(5);
}
private static void runAfterBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.runAfterBoth(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面两个任务都执行完成了。");
}
});
}
}
输出:
f1=1
f2=2
上面两个任务都执行完成了。
thenCompose 方法
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
示例代码
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenCompose();
TimeUnit.SECONDS.sleep(3);
}
private static void thenCompose() throws Exception {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
System.out.println("t1=" + t);
return t;
}).thenCompose((param) -> {
return CompletableFuture.supplyAsync(() -> {
int t = param * 2;
System.out.println("t2=" + t);
return t;
});
});
System.out.println("thenCompose result : " + f.get());
}
}
输出:
t1=1
t2=2
thenCompose result : 2
疑问:
Q:thenAcceptBoth与thenCombine 的区别是什么?
A:thenAcceptBoth无返回值消耗执行,thenCombine 会有返回值。一般accept都是没有返回值的,apply是有返回值的。
Q:thenCompose 与thenApply 方法 的区别是什么?不都是串行执行下一个任务,并把第一个任务作为参数传递给第二个任务么?
thApply
和thenCompose
都是将一个CompletableFuture<Integer>
转换为CompletableFuture<String>
。不同的是,thenApply
中的传入函数的返回值是String
,而thenCompose
的传入函数的返回值是CompletableFuture<String>
。
获取线程执行结果的6种方法
方式1:Thread的join()方法实现
代码中通过join方式阻塞了当前主线程,当thread线程执行完毕之后,join方法才会继续执行。
join的方式,只能阻塞一个线程,如果其他线程中也需要获取thread线程的执行结果,join方法无能为力了。
示例:
public class ThreadJoinTest {
//用于封装结果
static class Result<T> {
T result;
public T getResult() {
return result;
}
public void setResult(T result) {
this.result = result;
}
}
public static void main(String[] args) throws InterruptedException {
Result<String> result = new Result<>();
Thread t = new Thread(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
result.setResult("success");
System.out.println("end thread!");
});
t.start();
//让主线程等待thread线程执行完毕之后再继续,join方法会让当前线程阻塞
t.join();
System.out.println("main get result="+result.getResult());
}
}
输出:
start thread!
end thread!
main get result=success
方式2:CountDownLatch实现
使用CountDownLatch
可以让一个或者多个线程等待一批线程完成之后,自己再继续.
示例:
public class CountDownLatchTest2 {
static class Result<T>{
private T result;
public T getResult() {
return result;
}
public void setResult(T result) {
this.result = result;
}
}
public static void main(String[] args) throws InterruptedException {
Result<String> result = new Result<>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
result.setResult("success");
System.out.println("end thread!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
});
t.start();
latch.await();
System.out.println("main get result="+result.getResult());
}
}
输出:
start thread!
end thread!
main get result=success
方式3:ExecutorService.submit方法实现——ThreadPoolExecutor
使用ExecutorService.submit
方法实现的,此方法返回一个Future
,future.get()
会让当前线程阻塞,直到Future关联的任务执行完毕。
示例:
public class ThreadPoolExecutorTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
Future<String> future = executor.submit(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
executor.shutdown();
System.out.println("main get result="+future.get());
}
}
输出同上。
方式4:FutureTask方式1——作为Runnable传给Thread执行
线程池的submit方法传入的Callable对象本质上也是包装成一个FutureTask来执行。
代码中使用FutureTask
实现的,FutureTask实现了Runnable
接口,并且内部带返回值,所以可以传递给Thread直接运行,futureTask.get()
会阻塞当前线程,直到FutureTask
构造方法传递的任务执行完毕,get方法才会返回。
示例:
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个FutureTask
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
//将futureTask传递一个线程运行
new Thread(futureTask).start();
//futureTask.get()会阻塞当前线程,直到futureTask执行完毕
String result = futureTask.get();
System.out.println("main get result=" + result);
}
}
方式5:FutureTask方式2——构造FutureTask对象及执行内容,直接在Thread里面跑run方法
当futureTask的run()方法执行完毕之后,futureTask.get()
会从阻塞中返回。
示例:
public class FutureTaskTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个FutureTask
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
//将futureTask传递一个线程运行
new Thread(()->futureTask.run()).start();
//futureTask.get()会阻塞当前线程,直到futureTask执行完毕
String result = futureTask.get();
System.out.println("main get result=" + result);
}
}
方式6:CompletableFuture方式实现
CompletableFuture.supplyAsync
可以用来异步执行一个带返回值的任务,调用completableFuture.get()
会阻塞当前线程,直到任务执行完毕,get方法才会返回。
public class CompletableFutureTest4 {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
// future.get()会阻塞当前线程直到获得结果
System.out.println("main get result="+future.get());
}
}
高并发中计数器的四种实现方式
需求:一个jvm中实现一个计数器功能,需保证多线程情况下数据正确性。
我们来模拟50个线程,每个线程对计数器递增100万次,最终结果应该是5000万。
我们使用4种方式实现,看一下其性能,然后引出为什么需要使用LongAdder
、LongAccumulator
。
方式一:使用加锁的方式实现——synchronized或Lock
从示例输出结果看,ReentrantLock的效率明显比synchronized差了2-3倍。
示例:
public class SynchronizeCalculator {
private static long count = 0;
private static Lock lock = new ReentrantLock();
public synchronized static void incrment() {
count++;
}
public static void incrmentByLock() {
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count = 0;
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
incrment();
//incrmentByLock();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count, (t2 - t1)));
executor.shutdown();
}
}
输出:
//synchronized
结果:50000000,耗时(ms):490
结果:50000000,耗时(ms):1574
结果:50000000,耗时(ms):399
结果:50000000,耗时(ms):395
结果:50000000,耗时(ms):396
//lock
结果:50000000,耗时(ms):1289
结果:50000000,耗时(ms):1239
结果:50000000,耗时(ms):1224
结果:50000000,耗时(ms):1219
结果:50000000,耗时(ms):1246
方式2:AtomicLong实现
AtomicLong
内部采用CAS的方式实现,并发量大的情况下,CAS失败率比较高,导致性能比synchronized还低一些。并发量不是太大的情况下,CAS性能还是可以的。
示例:
public class AtomicLongTest {
private static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.set(0);
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.getAndIncrement();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count.get(), (t2 - t1)));
executor.shutdown();
}
}
输出:
结果:50000000,耗时(ms):1018
结果:50000000,耗时(ms):1442
结果:50000000,耗时(ms):1033
结果:50000000,耗时(ms):935
结果:50000000,耗时(ms):1320
方式3:LongAdder实现——相当于锁分段技术
先介绍一下LongAdder
,说到LongAdder,不得不提的就是AtomicLong,AtomicLong是JDK1.5开始出现的,里面主要使用了一个long类型的value作为成员变量,然后使用循环的CAS操作去操作value的值,并发量比较大的情况下,CAS操作失败的概率较高,内部失败了会重试,导致耗时可能会增加。
LongAdder是JDK1.8开始出现的,所提供的API基本上可以替换掉原先的AtomicLong。LongAdder在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字,然后交给多个人去管控,每个管控者负责保证部分数字在多线程情况下操作的正确性。当多线程访问的时,通过hash算法映射到具体管控者去操作数据,最后再汇总所有的管控者的数据,得到最终结果。相当于降低了并发情况下锁的粒度,所以效率比较高,看一下下面的图,方便理解:
image.png示例:
代码中new LongAdder()
创建一个LongAdder对象,内部数字初始值是0,调用increment()
方法可以对LongAdder内部的值原子递增1。reset()
方法可以重置LongAdder
的值,使其归0。
public class LongAdderTest {
private static LongAdder count = new LongAdder();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.increment();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count.sum(), (t2 - t1)));
executor.shutdown();
}
}
输出:
结果:50000000,耗时(ms):209
结果:50000000,耗时(ms):133
结果:50000000,耗时(ms):149
结果:50000000,耗时(ms):146
结果:50000000,耗时(ms):148
方式4:LongAccumulator实现
LongAccumulator介绍
LongAccumulator是LongAdder的功能增强版。LongAdder的API只有对数值的加减,而LongAccumulator提供了自定义的函数操作,其构造函数如下:
/**
* accumulatorFunction:需要执行的二元函数(接收2个long作为形参,并返回1个long)
* identity:初始值
**/
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
示例:
LongAccumulator
的效率和LongAdder
差不多,不过更灵活一些。
调用new LongAdder()
等价于new LongAccumulator((x, y) -> x + y, 0L)
。
从上面4个示例的结果来看,LongAdder、LongAccumulator
全面超越同步锁及AtomicLong
的方式,建议在使用AtomicLong
的地方可以直接替换为LongAdder、LongAccumulator
,吞吐量更高一些。
public class LongAccumulatorTest {
private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.accumulate(1);
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count.longValue(), (t2 - t1)));
executor.shutdown();
}
}
输出:
结果:50000000,耗时(ms):152
结果:50000000,耗时(ms):148
结果:50000000,耗时(ms):137
结果:50000000,耗时(ms):137
结果:50000000,耗时(ms):144
疑问:
Q:LongAccumulator.reset方法并不能重置重置LongAccumulator的identity:初始值正确,使其恢复原来的初始值。当初始值为0是不会发生这个问题,而当我们设置初始值如1时,就会导致后续的计算操作增加了5份初始值,目前猜测原因是因为代码中LongAccumulator在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字 ,而初始化的时候也是初始化了多份数据,导致初始值叠加了多份。不知道这是个bug么?待解惑。
在此记录下来希望有遇到这种情况的同学注意。解决方法便是要么初始值identity=0不会有这种问题;或者有需要使用reset方法重置的改为重新创建个LongAccumulator处理。
源码:
public void reset() {
Cell[] as = cells; Cell a;
base = identity;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
//对多个cell进行初始值赋值导致后面计算叠加了多份初始值
a.value = identity;
}
}
}
示例:
public class LongAccumulatorTest {
//设置初始值为1查看输出结果
private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 1);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.accumulate(1);
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count.longValue(), (t2 - t1)));
executor.shutdown();
}
}
输出:这时候你会发现只有第一次计算是正确的,只要使用了rest方法重置就会导致这个错误。
结果:50000001,耗时(ms):185
结果:50000005,耗时(ms):143
结果:50000005,耗时(ms):139
结果:50000005,耗时(ms):162
结果:50000005,耗时(ms):142
演示公平锁和非公平锁
先理解一下什么是公平锁、非公平锁?
公平锁和非公平锁体现在别人释放锁的一瞬间,如果前面已经有排队的,新来的是否可以插队,如果可以插队表示是非公平的,如果不可用插队,只能排在最后面,是公平的方式。
示例:
@Slf4j
public class ReentrantLockTest2 {
public static void main(String[] args) throws InterruptedException {
ReentrantLockTest2.LockTest(false);
TimeUnit.SECONDS.sleep(4);
log.error("-------------------------------");
ReentrantLockTest2.LockTest(true);
}
public static void LockTest(boolean fair) throws InterruptedException {
ReentrantLock lock = new ReentrantLock(fair);
new Thread(() -> {
lock.lock();
try {
log.error(Thread.currentThread().getName() + " start!");
TimeUnit.SECONDS.sleep(3);
new Thread(() -> {
//注意线程池要当前线程创建的才能使用,如果传给新开的线程会获取不到线程池
test("后到组",lock);
}).start();
//test(executorAfter,lock);
log.error(Thread.currentThread().getName() + "end!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Hold Lock 4 Test Thread").start();
test("先到组",lock);
TimeUnit.SECONDS.sleep(3);
}
private static void test(String name,Lock lock){
//自定义包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory(name), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
lock.lock();
try {
log.error("获取到锁!");
} finally {
lock.unlock();
}
});
}
executor.shutdown();
}
}
输出:
14:45:23.204 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
14:45:26.211 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
14:45:26.211 [From DemoThreadFactory's 先到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.211 [From DemoThreadFactory's 先到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 后到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.212 [From DemoThreadFactory's 先到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.213 [From DemoThreadFactory's 后到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.218 [From DemoThreadFactory's 后到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.218 [From DemoThreadFactory's 先到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.218 [From DemoThreadFactory's 后到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.218 [From DemoThreadFactory's 后到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.219 [From DemoThreadFactory's 后到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.219 [From DemoThreadFactory's 后到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.219 [From DemoThreadFactory's 后到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.219 [From DemoThreadFactory's 后到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:26.219 [From DemoThreadFactory's 后到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:30.205 [main] ERROR com.self.current.ReentrantLockTest2 - -------------------------------
14:45:30.205 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
14:45:33.206 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
14:45:33.206 [From DemoThreadFactory's 先到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.206 [From DemoThreadFactory's 先到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.209 [From DemoThreadFactory's 先到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.209 [From DemoThreadFactory's 先到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.209 [From DemoThreadFactory's 先到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.209 [From DemoThreadFactory's 先到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 先到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 先到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 先到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 先到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 后到组-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.210 [From DemoThreadFactory's 后到组-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.211 [From DemoThreadFactory's 后到组-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.212 [From DemoThreadFactory's 后到组-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
14:45:33.212 [From DemoThreadFactory's 后到组-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 获取到锁!
google提供的一些好用的并发工具类
关于并发方面的,juc已帮我们提供了很多好用的工具,而谷歌在此基础上做了扩展,使并发编程更容易,这些工具放在guava.jar包中。
guava maven配置
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
guava中常用几个类
MoreExecutors:提供了一些静态方法,是对juc中的Executors类的一个扩展。
Futures:也提供了很多静态方法,是对juc中Future的一个扩展。
案例1:异步执行任务完毕之后回调——相当于CompletableFuture的whenComplete方法
ListeningExecutorService
接口继承于juc中的ExecutorService
接口,对ExecutorService
做了一些扩展,看其名字中带有Listening,说明这个接口自带监听的功能,可以监听异步执行任务的结果。通过MoreExecutors.listeningDecorator
创建一个ListeningExecutorService
对象,需传递一个ExecutorService
参数,传递的ExecutorService
负责异步执行任务。
ListeningExecutorService
的submit
方法用来异步执行一个任务,返回ListenableFuture
,ListenableFuture
接口继承于juc中的Future
接口,对Future
做了扩展,使其带有监听的功能。调用submit.addListener
可以在执行的任务上添加监听器,当任务执行完毕之后会回调这个监听器中的方法。
ListenableFuture
的get
方法会阻塞当前线程直到任务执行完毕。
另一种回调方式是通过调用Futures
的静态方法addCallback
在异步执行的任务中添加回调,回调的对象是一个FutureCallback
,此对象有2个方法,任务执行成功调用onSuccess
,执行失败调用onFailure
。
失败的情况可以将代码中int i = 10 / 0;
注释去掉,执行一下可以看看效果。
示例:
@Slf4j
public class GuavaTest {
//相当于CompletableFuture的whenComplete方法
public static void main1(String[] args) throws ExecutionException, InterruptedException {
//创建一个线程池
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
//异步执行一个任务
ListenableFuture<Integer> submit = executorService.submit(() -> {
log.error("{}", System.currentTimeMillis());
//休眠2秒,默认耗时
TimeUnit.SECONDS.sleep(2);
log.error("{}", System.currentTimeMillis());
return 10;
});
//当任务执行完毕之后回调对应的方法
submit.addListener(() -> {
log.error("任务执行完毕了,我被回调了");
}, MoreExecutors.directExecutor());
log.error("{}", submit.get());
} finally {
delegate.shutdown();
}
}
//相当于CompletableFuture的whenComplete方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
ListeningExecutorService service = MoreExecutors.listeningDecorator(executor);
try {
ListenableFuture<Integer> future = service.submit(() -> {
log.error("{}", System.currentTimeMillis());
//休眠2秒,默认耗时
TimeUnit.SECONDS.sleep(2);
//int i = 10 / 0;
log.error("{}", System.currentTimeMillis());
return 10;
});
Futures.addCallback(future, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer integer) {
log.error("执行成功:{}", integer);
}
@Override
public void onFailure(Throwable throwable) {
log.error("执行失败:{}", throwable.getMessage());
}
});
log.error("{}", future.get());
} finally {
service.shutdown();
}
}
}
输出:
15:32:54.480 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 1599809574477
15:32:56.487 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 1599809576487
15:32:56.488 [main] ERROR com.self.current.GuavaTest - 10
15:32:56.488 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.GuavaTest - 执行成功:10
示例2:获取一批异步任务的执行结果——Futures.allAsList(futureList).get()
结果中按顺序输出了6个异步任务的结果,此处用到了Futures.allAsList
方法,看一下此方法的声明:
public static <V> ListenableFuture<List<V>> allAsList(
Iterable<? extends ListenableFuture<? extends V>> futures)
传递一批ListenableFuture
,返回一个ListenableFuture<List<V>>
,内部将一批结果转换为了一个ListenableFuture
对象。
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.error("star");
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
List<ListenableFuture<Integer>> futureList = new ArrayList<>();
for (int i = 5; i >= 0; i--) {
int j = i;
futureList.add(executorService.submit(() -> {
TimeUnit.SECONDS.sleep(j);
return j;
}));
}
//把多个ListenableFuture转换为一个ListenableFuture
//ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(futureList);
//获取一批任务的执行结果
List<Integer> resultList = Futures.allAsList(futureList).get();
//输出
resultList.forEach(item -> {
log.error("{}", item);
});
} finally {
delegate.shutdown();
}
}
输出:
15:45:51.160 [main] ERROR com.self.current.GuavaTest - star
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 5
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 4
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 3
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 2
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 1
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 0
示例3:一批任务异步执行完毕之后回调——包装futureList传递给Futures.addCallback 方法
异步执行一批任务,最后计算其和,代码中异步执行了一批任务,所有任务完成之后,回调了上面的onSuccess
方法,内部对所有的结果进行sum操作。
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.error("start");
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
List<ListenableFuture<Integer>> futureList = new ArrayList<>();
for (int i = 5; i >= 0; i--) {
int j = i;
futureList.add(executorService.submit(() -> {
TimeUnit.SECONDS.sleep(j);
return j;
}));
}
//把多个ListenableFuture转换为一个ListenableFuture
ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(futureList);
Futures.addCallback(listListenableFuture, new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
log.error("result中所有结果之和:"+result.stream().reduce(Integer::sum).get());
}
@Override
public void onFailure(Throwable throwable) {
log.error("执行任务发生异常:" + throwable.getMessage(), throwable);
}
});
} finally {
delegate.shutdown();
}
}
输出:
15:57:00.539 [main] ERROR com.self.current.GuavaTest - start
15:57:05.560 [pool-2-thread-1] ERROR com.self.current.GuavaTest - result中所有结果之和:15
其他疑问:
Q:运行下面这个例子结束不了,debug倒是可以,这是为什么呢?Thread[Monitor Ctrl-Break,5,main]是哪来的?
public class VolatileTest1 {
public static volatile int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (Thread thread : threads) {
thread = new Thread(()->{
for (int i = 0; i < 1000; i++) {
VolatileTest1.add();
}
});
thread.start();
thread.join();
}
//2
//java.lang.ThreadGroup[name=main,maxpri=10]
// Thread[main,5,main]
// Thread[Monitor Ctrl-Break,5,main]
//结束不了,debug倒是可以,这是为什么呢?Thread[Monitor Ctrl-Break,5,main]是哪来的?
while (Thread.activeCount() >1){
Thread.yield();
System.out.println(Thread.activeCount());
ThreadGroup parent = Thread.currentThread().getThreadGroup();
parent.list();
}
System.out.println("num="+num);
}
}
其他
AQS原理没讲,需要找资料补充。
JUC中常见的集合原来没讲,比如ConcurrentHashMap最常用的,后面的都很泛,没有深入,虎头蛇尾。
阻塞队列讲得不够深入。
网友评论