completableFuture是JDK1.8版本新引入的类。下面是这个类:
data:image/s3,"s3://crabby-images/79478/7947893561527a0c132881ef13566cba09e20036" alt=""
实现了俩接口,本身是个class。这个是Future的实现类,使用 completionStage
接口去支持完成时触发的函数和操作。
一个 completetableFuture
就代表了一个任务,他能用Future的方法,还能做一些之前说的 executorService
配合 futures
做不了的。
之前future需要等待isDone为true才能知道任务跑完了,或者就是用get方法调用的时候会出现阻塞,而使用 completableFuture
的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。
1.创建 CompletableFuture
直接new对象。
一个 completableFuture
对象代表着一个任务,这个对象能跟这个任务产生联系。
下面用的 complete
方法意思就是这个任务完成了需要返回的结果,然后用 get()
方法可以获取到。
data:image/s3,"s3://crabby-images/24c1d/24c1de06d1ddcd1935f1a0608fbf8ed4f718ad14" alt=""
2.JDK1.8使用的接口类。
在本文的 CompletableFuture
中大量的使用了这些函数式接口。
注:这些声明大量应用于方法的入参中,像 thenApply
和 thenAccept
这俩就是一个用Function一个用Consumer
而lambda函数正好是可以作为这些接口的实现。例如 s->{return 1;}
这个就相当于一个Function。因为有入参和返回结果。
data:image/s3,"s3://crabby-images/223f4/223f4c806d4f2c4230469c62c00b21cc3a4b7de2" alt=""
(1)Function
data:image/s3,"s3://crabby-images/de475/de47593d6407a81ae1a39e656b8f3c356bc4db8e" alt=""
(2)Consumer
data:image/s3,"s3://crabby-images/5ae3a/5ae3a665f781b0320aa73a86cecc8b7b7e2b52b5" alt=""
对于前面有Bi的就是这样的,BiConsumer就是两个参数的。
data:image/s3,"s3://crabby-images/0585f/0585f83e53790ae16c34cabeaaded31ebdb0abfc" alt=""
(3)Predicate这个接口声明是一个入参,返回一个boolean。
data:image/s3,"s3://crabby-images/53a26/53a26ee85ba45ac71832dde3d94c2de531fa4c8e" alt=""
(4)supplier
data:image/s3,"s3://crabby-images/c8948/c89481af7038ead36133b9b307a5cbeee8e5be38" alt=""
3.下面是这个类的静态方法
带有Async就是异步执行的意思、也是一个 completableFuture
对象代表着一个任务这个原则。
这种异步方法都可以指定一个线程池作为任务的运行环境,如果没有指定就会使用 ForkJoinPool
线程池来执行
data:image/s3,"s3://crabby-images/c883b/c883b6f4f8f1afefae9d22aed248c71c9a45031e" alt=""
(1) supplyAsync&runAsync
的使用例子。
<pre class="prettyprint hljs livescript" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("executorService 是否为守护线程 :" + Thread.currentThread().isDaemon());
return null;
}
});
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("this is lambda supplyAsync");
System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result1";
});
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("this is task with executor");
System.out.println("supplyAsync 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
return "result2";
}, executorService);
System.out.println(completableFuture.get());
System.out.println(future.get());
executorService.shutdown();
} </pre>
data:image/s3,"s3://crabby-images/bc586/bc5863efb63f4e8fcc825558f12589174635c19b" alt=""
这些任务中带有supply是持有返回值的,run是void返回值的,在玩supply时发现一个问题:如果使用supplyAsync任务时不使用任务的返回值,即 不用get方法阻塞主线程会导致任务执行中断。
注:跟get方法无关,后面有答案
data:image/s3,"s3://crabby-images/46a6a/46a6af67f8fca5ac39eac5e5aa7a0100553f5302" alt=""
data:image/s3,"s3://crabby-images/54840/54840fdb693308e96623581a6da0a8ac7d27d290" alt=""
然后我开始探索是否是只有 supplyAsync
是这样。我测试了 runAsync
发现也是这样。
data:image/s3,"s3://crabby-images/6a1f2/6a1f2db66bdf08e1df207c1552089d7f22bc8db1" alt=""
下图为与 supplyAsync
任务执行不全面一样的问题,我甚至测试了将lambda换成runnable发现无济于事。
data:image/s3,"s3://crabby-images/6f155/6f1553bfb3e4e846a5a6367ac9d2f9c33773f422" alt=""
答案:
造成这个原因是因为Daemon。因为 completableFuture
这套使用异步任务的操作都是创建成了守护线程,那么我们没有调用get方法不阻塞这个主线程的时候。主线程执行完毕,所有线程执行完毕就会导致一个问题,就是守护线程退出。
那么我们没有执行的代码就是因为主线程不再跑任务而关闭导致的,可能这个不叫问题,因为在开发中我们主线程常常是一直开着的。但是这个小问题同样让我想了好久。
下面我们开一个非守护线程,可以看到程序执行顺利。
data:image/s3,"s3://crabby-images/3b703/3b7033d71e4acd1eb767b0d31c00c6a69bb1252f" alt=""
下面证实守护线程在其他非守护线程全部退出的情况下不继续执行。
<pre class="prettyprint hljs livescript" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("this is lambda supplyAsync");
System.out.println("supplyAsync 是否为守护线程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(1);
try(BufferedWriter writer = new BufferedWriter
(new OutputStreamWriter(new FileOutputStream(new File("/Users/zhangyong/Desktop/temp/out.txt"))))){
writer.write("this is completableFuture daemon test");
}catch (Exception e){
System.out.println("exception find");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result1";
}); </pre>
这个代码就是操作本地文件,并且sleep了一秒。其他线程就一句控制台输出的代码,最终的结果是文件没有任何变化。
当我把主线程 sleep 5
秒时,本地文件会写入一句 this is completableFuture daemon test
验证成功。
(2)allOf&anyOf
这两个方法的入参是一个 completableFuture
组、allOf就是所有任务都完成时返回,但是是个Void的返回值。
anyOf是当入参的 completableFuture
组中有一个任务执行完毕就返回,返回结果是第一个完成的任务的结果。
<pre class="prettyprint hljs livescript" style="padding: 0.5em; font-family: Menlo, Monaco, Consolas, "Courier New", monospace; color: rgb(68, 68, 68); border-radius: 4px; display: block; margin: 0px 0px 1.5em; font-size: 14px; line-height: 1.5em; word-break: break-all; overflow-wrap: break-word; white-space: pre; background-color: rgb(246, 246, 246); border: none; overflow-x: auto; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public static void otherStaticMethod() throws ExecutionException, InterruptedException {
final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
return "futureOneResult";
});
final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
System.out.println("futureTwo InterruptedException");
}
return "futureTwoResult";
});
CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
System.out.println(future.get());
// CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
// System.out.println(completableFuture.get());
} </pre>
data:image/s3,"s3://crabby-images/cb218/cb218c3da38dd5ffcbb57285909e88debaed3120" alt=""
data:image/s3,"s3://crabby-images/248e4/248e426f7191923dcf089a9d09b62fc36f53b924" alt=""
(3) completedFuture
这个方法我没懂他是干啥的,源码就是返回一个值。感觉没啥意义。
data:image/s3,"s3://crabby-images/861d9/861d9b74d515797f50c82f1372b64ea1f85fbd39" alt=""
(4)取值方法,除了get还有一个
getNow();
这个就比较特殊了。
这个方法是执行这个方法的时候任务执行完了就返回任务的结果,如果任务没有执行完就返回你的入参。
data:image/s3,"s3://crabby-images/75157/75157cf0aeccb5605b3fe55c81c274cd504a335b" alt=""
(5)join方法跟线程的join用法差不多。
data:image/s3,"s3://crabby-images/4bdaa/4bdaa3ae4426fa00a54789ff4a0340937bd802e9" alt=""
(6) whenXXX
,在一个任务执行完成之后调用的方法。
这个有三个名差不多的方法: whenComplete
、 whenCompleteAsync
、还有一个是 whenCompleteAsync
用自定义 Executor
data:image/s3,"s3://crabby-images/1209c/1209c3c5b564ed75e0bcf9f38890ab1e9dd3da95" alt=""
首先看一下这个 whenComplete
实例方法。这个就是任务执行完毕调用的,传入一个action,这个方法的执行线程是当前线程,意味着会阻塞当前线程。
下面图中test的输出跟 whenComplete
方法运行的线程有关,运行到main线程就会阻塞test的输出,运行的是 completableFuture
线程则不会阻塞住test的输出。
data:image/s3,"s3://crabby-images/c3413/c3413404e29bb7ec2f2bd249b6f3e2586fe97c02" alt=""
下面是任务执行的线程的探索。
data:image/s3,"s3://crabby-images/9ee7c/9ee7c4e9a3617978c119bc6034dfb373d379efa9" alt=""
data:image/s3,"s3://crabby-images/2100c/2100c4f6ac34ba085210c789c164bfd9dbf2be77" alt=""
根据测试得出的结论是:如果调用 whenComplete
的中途,还发生了其他事情,图中的主线程的 sleep(400);
导致 completableFuture
这个任务执行完毕了,那么就使用主线程调用。
如果调用的中途没有发生其他任务且在触碰到 whenComplete
方法时 completableFuture
这个任务还没有彻底执行完毕那么就会用 completableFuture
这个任务所使用的线程。
下面是 whenCompleteAsync
方法。这个方法就是新创建一个异步线程执行。所以不会阻塞。
data:image/s3,"s3://crabby-images/b81d2/b81d264b9630eafd020e50cc51e04734e9ff7f33" alt=""
(7) then方法瞅着挺多的,实际上就是异不异步和加不加自定义 Executor
data:image/s3,"s3://crabby-images/0d878/0d878e9dc2cdcaba1e8a07e75a9b65ae4cc645f9" alt=""
注: whenComplete
中出现的问题在then中测试不存在、使用的就是上一个任务的线程。这个 thenCompose
就是一个任务执行完之后可以用它的返回结果接着执行的方法,方法返回的是另一个你期盼泛型的结果。
compose
理解就是上一个任务结果是then的一部分。
data:image/s3,"s3://crabby-images/cd971/cd9716446fc84e1157cbc5a1e62c21e86851600b" alt=""
下面介绍一下 thenCombine
这个 combine
的理解就是结合两个任务的结果。
data:image/s3,"s3://crabby-images/0f8b8/0f8b8de85effd5f10d6e6879c82b5ee128b7824a" alt=""
综上:这个线程的问题并不是大问题,只要你不用线程来做判断条件,他并不会影响你的效率。试想pool线程都执行完了就用主线程跑呗。没跑完,而使你等了那你就用pool线程呗。
thenRun就是这个任务运行完,再运行下一个任务,感觉像是join了一下。
data:image/s3,"s3://crabby-images/9ac92/9ac92ce7e88e618e2d88e6f298f3fc124289fa44" alt=""
其余不再介绍,大同小异。
像 thenApply(Function);
这样的就是有入参有返回值类型的。
像 thenAccept(Consumer);
这样的就是有入参,但是没有返回值的。
网友评论