JAVA8之前的Future
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "callable finished";
});
//do something else
Thread.sleep(2000);
String callableResult = future.get();
System.out.println(callableResult);
}
CompletableFuture的优势
- 提供了异步程序执行的另一种方式:回调,不需要像future.get()通过阻塞线程来获取异步结果或者通过isDone来检测异步线程是否完成来执行后续程序。
- 能够管理多个异步流程,并根据需要选择已经结束的异步流程返回结果。
构建CompletableFuture
构造函数
/**
* Creates a new incomplete CompletableFuture.
*/
public CompletableFuture() {
}
纯翻译:构建一个不完整的CompletableFuture,为什么说不完整呢,请往下看
式例1
public static class test{
public static String getTestResult()
{
int i = 10/0;
return "test";
}
}
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture();
new Thread(()->{
try {
completableFuture.complete(test.getTestResult());
} catch (Exception e) {
System.out.println("get exception in side");
completableFuture.completeExceptionally(e);
}
}).start();
try {
String result = completableFuture.get();
System.out.println(result);
} catch (Exception e) {
System.out.println("get exception out side");
e.printStackTrace();
}
}
一般需要complete() 设置异步线程可能返回的值,以及completeExceptionally() 向上抛出异常
工厂方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
从入参可以看到,CompletableFuture 允许我们自定义执行器,在实际项目中我们可以选择合适的线程池来提高异步程序的效率。
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() ->
{
try {Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "test";
}
);
CompletableFuture 流
java8 流式处理在CompletableFuture 也得到了完美的体现,流处理包含的中间操作,终端操作分别对应CompletableFuture 中以thenAccept开头返回CompletableFuture <Void>(也就是回调)的实例方法。中间操作对应thenApply,thenCompose等等返回非CompletableFuture <Void>的实例方法。
式例2
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "test ";
}).thenApply(u -> {
System.out.println(Thread.currentThread().getName());
return u + "in thenApply first";
})
.thenCompose(u -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return u + "in thenCompose second";
})
).thenAccept(u -> {
System.out.println(Thread.currentThread().getName());
System.out.println(u + "in thenAccept last");
});
ForkJoinPool.commonPool-worker-1
main
ForkJoinPool.commonPool-worker-1
main
test in thenApply firstin thenCompose secondin thenAccept last
可以看到默认的异步线程池都是ForkJoinPool.commonPool,同步操作都在main线程中处理。
多说一句thenApply 和thenCompose的区别,thenCompose在调用外部接口返回CompletableFuture<>类型时更方便。
多个CompletableFuture任务的管理
现实应用中可能同时存在多个异步任务,有时候我们需要他们一起完成才能进行下面的操作,有时候我们又只需要在存在一个结果的情况下就返回。
private static final Random random = new Random();
public static String randomDelay()
{
int delay = 500 + random.nextInt(2000);
try {
System.out.println(String.format("%s sleep in %d",Thread.currentThread().getName(),delay));
Thread.sleep(delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("%s sleep in %s",Thread.currentThread().getName(),"end"));
return Thread.currentThread().getName()+" return";
}
public static void main(String[] args) {
CompletableFuture [] futures = {CompletableFuture.supplyAsync(()->randomDelay()),
CompletableFuture.supplyAsync(()->randomDelay()),CompletableFuture.supplyAsync(()->randomDelay())};
CompletableFuture.allOf(futures).join();
System.out.println("all timeout process end");
}
ForkJoinPool.commonPool-worker-2 sleep in 1957
ForkJoinPool.commonPool-worker-3 sleep in 2097
ForkJoinPool.commonPool-worker-1 sleep in 2422
ForkJoinPool.commonPool-worker-2 sleep in end
ForkJoinPool.commonPool-worker-3 sleep in end
ForkJoinPool.commonPool-worker-1 sleep in end
all timeout process end
上段代码展示了 CompletableFuture.allOf 的用法,可以看到所有的线程结束后打印了"all timeout process end",注意 allOf 接受的是数组类对象。如果把allOf改为 anyOf
CompletableFuture [] futures = {CompletableFuture.supplyAsync(()->randomDelay()),
CompletableFuture.supplyAsync(()->randomDelay()),CompletableFuture.supplyAsync(()->randomDelay())};
System.out.println(CompletableFuture.anyOf(futures).get());
System.out.println("all timeout process end");
ForkJoinPool.commonPool-worker-2 sleep in 529
ForkJoinPool.commonPool-worker-3 sleep in 759
ForkJoinPool.commonPool-worker-1 sleep in 1750
ForkJoinPool.commonPool-worker-2 sleep in end
ForkJoinPool.commonPool-worker-2 return
all timeout process end
可以看到只有一个线程结束时结果已经返回,另外CompletableFuture还提供了专为两个任务处理的方法
acceptEither
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->randomDelay());
completableFuture.acceptEither(completableFuture.supplyAsync(()->randomDelay()),u-> System.out.println(u)).join();
ForkJoinPool.commonPool-worker-2 sleep in 935
ForkJoinPool.commonPool-worker-1 sleep in 2422
ForkJoinPool.commonPool-worker-2 sleep in end
ForkJoinPool.commonPool-worker-2 return
CompletableFuture 异常处理
除了在get的时候通过 try catch 处理异常,CompletableFuture 提供了更优雅的方式 exceptionally()和 handle()。handle处理方法类似,都是把异常对象转为我们所需要的其他类型对象,然后处理。
public static String getTestResult()
{
int i = 10/0;
return "test";
}
public static void main(String[] args) {
CompletableFuture<String> completableFuture = new CompletableFuture();
new Thread(()->{
try {
completableFuture.complete(getTestResult());
} catch (Exception e) {
System.out.println("get exception in side");
completableFuture.completeExceptionally(e);
}
}).start();
completableFuture.exceptionally(e->"we hava a exception"+e.getMessage())
.thenAccept(u-> System.out.println(u));
}
总结
有关CompletableFuture 大部分内容已经讲完了,相信看了之后应该都会用了吧!
更多的细节还是参考JAVA8官方文档和源码吧!
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
网友评论