在FutureTask对于get()方法容易造成阻塞,所以在其基础上诞生了CompletableFuture。他们的关系就像i和i++的关系,FutureTask能做的,CompletableFuture也能做,并且更加高效,功能更加扩展。
创建CompletableFuture
在CompletableFuture源码注释中,作者并不希望开发人员直接使用实例化去创建CompletableFuture,而是使用四大静态方法。
实例化创建示例:
CompletableFuture completableFuture = new CompletableFuture();
CompletableFuture的四大静态方法
supplyAsync(Supplier<U> supplier) ----有返回值
创建带有返回值的异步任务,类似方法ExecutorService
的 submit(Callable<T> task)
方法
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
});
supplyAsync(Supplier<U> supplier,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 如果自己没有创建线程池,则使用默认的ForkJoinPool线程池
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}, threadPool);
runAsync(Runnable runnable)----没有返回值
创建没有返回值的异步任务,类似ExecutorService
submit(Runnable task)
方法
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(runAsync.get());
runAsync(Runnable runnable,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 如果没有指定线程池,则使用默认的ForkJoinPool线程池
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in.....");
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, threadPool);
System.out.println(runAsync.get());
CompletableFuture获取值
get()
最直接的获值方法,但会抛出异常。源码如下
/**
* Waits if necessary for this future to complete, and then
* returns its result.
* 等待线程计算完成以后获取结果
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
get(long timeout, TimeUnit unit)
如上,但可以加入等待时间,超过等待时间,抛出Timeout异常。源码如下
/**
* Waits if necessary for at most the given time for this future
* to complete, and then returns its result, if available.
* 最多等待给定的时间以完成此计算,然后返回其结果(如果可用)
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
join()
直接获取值,类似于get(),但与get()不同的是,join()不需要手动抛出异常。代码如下
/**
* Returns the result value when complete, or throws an
* (unchecked) exception if completed exceptionally. To better
* conform with the use of common functional forms, if a
* computation involved in the completion of this
* CompletableFuture threw an exception, this method throws an
* (unchecked) {@link CompletionException} with the underlying
* exception as its cause.
* 完成时返回结果值,如果异常完成则抛出(未选中)异常。
* 为了更好地符合通用函数形式的使用,如果完成此CompletableFuture所涉及的计算引发异常,
* 则此方法将引发(未选中){@link CompletionException},其原因是基础异常。
*
* @return the result value
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
getNow(T valueIfAbsent)
和join()相同,不需要手动抛出异常,在执行过程中会返回遇见的异常。不同的是可以给定默认值,如果执行错误或未完成,返回给定的默认值。源码如下
/**
* Returns the result value (or throws any encountered exception)
* if completed, else returns the given valueIfAbsent.
* 如果计算完成,则返回结果值(或引发任何遇到的异常),否则返回给定的默认值
* @param valueIfAbsent the value to return if not completed
* @return the result value, if completed, else the given valueIfAbsent
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this future completed
* exceptionally or a completion computation threw an exception
*/
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
complete
通俗来说,在complete之前设定一个等待时间,如果在等待时间内没有计算出结果,则返回true,并返回complete给定的默认值。反之为false,返回原定计算的值。源码如下
/**
* If not already completed, sets the value returned by {@link
* #get()} and related methods to the given value.
*
* @param value the result value
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
示例:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return "123";
});
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(completableFuture.complete("给定的默认值") + "\t" + completableFuture.join());
返回结果: true 给定的默认值
CompletableFuture的回调
其中,带Async
后缀的函数表示需要连接的后置任务会被单独提交到线程池中
,从而相对前置任务来说是异步运行
的。除此之外,两者没有其他区别。
thenApply / thenAccept / thenRun互相依赖
thenApply()-----有入参有返回
表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。但遇见错误时会终止后面所有的线程。
通俗来说,任务A执行完执行B,B需要A的结果,并且B有返回值
thenApply
也是有三个方法重载
// 后一个任务与前一个任务在同一线程执行
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
// 后一个任务与前一个任务在不同线程中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
//后一个任务使用自定义线程池执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
ps:示例
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}, threadPool);
// 承接上一个多线程,有返回值
CompletableFuture<Integer> thenApply = supplyAsync.thenApply((result) -> {
System.out.println("上一线程结果" + result);
return ThreadLocalRandom.current().nextInt(10);
});
System.out.println(thenApply.get());
threadPool.shutdown();
如果没用使用thenApplyAsync()指定自己的线程池,线程池依旧使用的是默认的ForkJoinPool线程池。
thenAccept() ----有入参无返回
消费型回调。接受上一个任务的结果作为参数,但是没有返回值。
通俗来说,任务A执行完执行B,B需要A的结果,B没有返回值
// 后一个任务与前一个任务在同一线程执行
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
// 后一个任务与前一个任务在不同线程中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
//后一个任务使用自定义线程池执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
示例:
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Void> async = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("111111");
return 1;
}, threadPool).thenApplyAsync((f) -> {
// 手动创建异常
System.out.println("222222");
f += 2;
return f;
}, threadPool).thenAcceptAsync(f -> {
f += 3;
System.out.println("最终的值:" + f);
}, threadPool);
System.out.println(Thread.currentThread().getName() + " 先去做其他事情了");
async.join();
threadPool.shutdown();
}
ps:只消费
thenRun()----无入参无返回
thenRun
的方法没有入参,也没有返回值。
通俗来说,任务A执行完执行B,并且B不需要A的结果,B没有返回值
示例:
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return "123";
},threadPool);
completableFuture.thenRunAsync(() -> {
System.out.println("thenRun 无入参,无返回值");
},threadPool);
completableFuture.join();
threadPool.shutdown();
}
小结
thenApply()、thenAccept()、thenRun()的区别,示例代码如下:
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());
System.out.println("《-----------------------------》");
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());
System.out.println("《-----------------------------》");
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((f)-> "resultB").join());
结果:
null
《------------------------------------------》
resultA
null
《-------------------------------------------》
resultB
exceptionally()
如果执行任务出现异常,则执行 exceptionally 中的代码块,并且需要一个返回值。
示例:
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
int a = 10 / 0;
return 1;
});
//whenComplete:参数列表 (T t, U u),如果执行过程正常完成,则执行该分支
System.out.println(completableFuture.whenComplete((t, u) -> {
if (u==null) {
System.out.println(t);
}
//exceptionally :如果执行过程出现异常,则走该分支
}).exceptionally((f) -> {
System.out.println("异常详细信息:" + f.getMessage());
return 500;
}).get());
}
whenComplete()
whenComplete
算是 exceptionally
和thenApply
的结合,将任务执行的结果和异常作为回到方法的参数,如果没有发生异常则异常参数为null。源码如下:
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
示例:
ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 自定义线程池,可以防止主线程立刻结束,导致守护线程forkjoin的问题
try {
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() ->{
System.out.println(Thread.currentThread().getName() + " come in.....");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}, threadPool).whenComplete((v,e) -> {
if (e==null) {
System.out.println(Thread.currentThread().getName() + " come in.....");
System.out.println("上一个线程执行的结果:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("上一个线程执行的异常" + e);
return null;
});
System.out.println(Thread.currentThread().getName() + " 忙其他的去了.....");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
执行结果:
handle()
表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。相对于thenApply(),handle()可以将异常带入下一个线程处理。源码如下
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
示例:
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("111111");
return 1;
}, threadPool).handle((f, n) -> {
// 手动创建异常,跳过此线程后面的动作
int i = 10/0;
System.out.println("222222");
f += 2;
return f;
}).handle((f, n) -> {
// 如果检测到上一线程有异常,可以处理
if (n!=null) {
f=10;
}
System.out.println("333333");
f += 3;
return f;
}).whenComplete((f,e) -> {
if (e==null) {
System.out.println("计算结果为:" + f);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + " 先去做其他事情了");
handle.join();
threadPool.shutdown();
}
运行结果:
main 先去做其他事情了
111111
333333
计算结果为:13
CompletableFuture对计算速度的选用
使用applyToEither
可对两个线程执行速度进行比较,获取速度最快的执行结果。源码如下
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
示例:
CompletableFuture<String> supplyAsyncA = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return "playA";
});
CompletableFuture<String> supplyAsyncB = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return "playB";
});
CompletableFuture<String> applyToEither = supplyAsyncA.applyToEither(supplyAsyncB, f -> {
return f + " is winner!";
});
System.out.println(Thread.currentThread().getName() + "----" + applyToEither.join());
applyToEither
、acceptEither
、runAfterEither
的区别与前面回调函数的区别一致,在于是否有返回值
thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
applyToEither
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
acceptEither
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
runAfterEither
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
runAfterBoth
两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
CompletableFuture多任务合并
thenCombine
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。,源码如下
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
示例:
CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程一。。。。。。启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程二。。。。。。启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = integerCompletableFuture1.thenCombine(integerCompletableFuture2, (x, y) -> {
System.out.println("开始合并。。。。。。。。");
return x + y;
});
System.out.println(result.join());
结果:
线程一。。。。。。启动
线程二。。。。。。启动
开始合并。。。。。。。。
30
allof
等待所有任务完成。源码如下:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
示例:
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("线程一。。。。。。启动");
} catch (Exception e) {
e.printStackTrace();
}
return "华为";
},threadPool);
CompletableFuture<String> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("线程二。。。。。。启动");
} catch (Exception e) {
e.printStackTrace();
}
return "小米";
},threadPool);
CompletableFuture<Void> allOf = CompletableFuture.allOf(integerCompletableFuture1, integerCompletableFuture2);
System.out.println(integerCompletableFuture1.join());
System.out.println(integerCompletableFuture2.join());
System.out.println("main..........end.........");
threadPool.shutdown();
因为allof
需要等待所有线程执行完毕,所以会先打印线程二。并等待线程一执行完毕。
结果:
线程二。。。。。。启动
线程一。。。。。。启动
华为
小米
main..........end.........
anyof
等待其中一个任务完成。源码如下:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
示例(在任务合并时不同,将allof改为anyof):
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(integerCompletableFuture1, integerCompletableFuture2);
System.out.println(anyOf.join());
System.out.println("main..........end.........");
threadPool.shutdown();
因为线程二的执行比线程一快,所以直接打印线程二。anyof返回的CompletableFuture,存储的时先完成线程的返回结果。
结果:
线程二。。。。。。启动
小米
main..........end.........
线程一。。。。。。启动
ps:本文将仅限学习使用。本人才疏学浅,如有错误,请指出,不胜感激。
网友评论