一、Future计算
Future异步计算很难操作,通常我们希望将任何计算逻辑视为一系列步骤。但是在异步计算的情况下,表示为回调的方法往往分散在代码中或者深深地嵌套在彼此内部。但是当我们需要处理其中一个步骤中可能发生的错误时,情况可能会变得更复杂。
Futrue接口是Java 5中作为异步计算而新增的,但它没有任何方法去进行计算组合或者处理可能出现的错误。
当处理一个任务时,总会遇到以下几个阶段:
-
提交任务
-
执行任务
-
任务完成的后置处理
1.1 Future示例代码
接下来我们先用最简单的例子迅速对 Future 有个直观理解。
1.1.1 Future异步执行,不需要取返回值
代码
public class FutureTest {
/**
* Future异步执行,不需要取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
Thread.sleep(2000);
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
});
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 16:44:29 719
子线程-是否为守护线程:false
Thread[main,5,main] 主线程-退出,time->2022-02-16 16:44:29 756
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 16:44:29 756
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 16:44:31 761
1.1.2 Future异步执行,取返回值
代码
public class FutureTest2 {
/**
* Future异步执行,取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
ExecutorService executor = Executors.newFixedThreadPool(3);
Future future = executor.submit(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
Thread.sleep(2000);
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
});
// 中间可以先执行一堆逻辑,再阻塞获取返回结果
System.out.println("中间可以先执行一堆逻辑,再阻塞获取返回结果");
try {
//阻塞获取返回结果
System.out.println("异步执行返回结果:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 16:49:19 027
中间可以先执行一堆逻辑,再阻塞获取返回结果
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 16:49:19 070
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 16:49:21 079
异步执行返回结果:AlanChen
Thread[main,5,main] 主线程-退出,time->2022-02-16 16:49:21 080
1.2 Future 如何被构建的
Future 是如何被创建的呢? 生产者线程提交给消费者线程池任务时,线程池会构造一个实现了 Future 接口的对象 FutureTask 。该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。以上两个示例代码,可以使用FutureTask来实现
1.2.1 FutureTask异步执行,不需要取返回值
代码
public class FutureTaskTest {
/**
* FutureTask异步执行,不需要取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new FutureTask<>(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
Thread.sleep(2000);
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
}));
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 16:55:57 436
Thread[main,5,main] 主线程-退出,time->2022-02-16 16:55:57 472
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 16:55:57 472
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 16:55:59 476
1.2.2 FutureTask异步执行,取返回值
代码
public class FutureTaskTest2 {
/**
* FutureTask异步执行,取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
ExecutorService executor = Executors.newFixedThreadPool(3);
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
Thread.sleep(2000);
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
});
executor.submit(futureTask);
// 中间可以先执行一堆逻辑,再阻塞获取返回结果
System.out.println("中间可以先执行一堆逻辑,再阻塞获取返回结果");
try {
//阻塞获取返回结果
System.out.println("异步执行返回结果:" + futureTask.get());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 17:02:05 705
中间可以先执行一堆逻辑,再阻塞获取返回结果
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 17:02:05 739
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 17:02:07 750
异步执行返回结果:AlanChen
Thread[main,5,main] 主线程-退出,time->2022-02-16 17:02:07 751
二、CompletableFuture
在Java 8中,引入了CompletableFuture类。与Future接口一起,它还实现了CompletionStage接口。此接口定义了可与其他Future组合成异步计算契约。例如,CompletableFuture源码
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// 省略相关源码
}
2.1 runAsync 和 supplyAsync方法
CompletableFuture 提供了四个静态方法来创建一个异步操作
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);
CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor);
- runAsync 方法接收的是 Runnable 的实例,但是它没有返回值
- supplyAsync 方法是JDK8函数式接口,无参数,会返回一个结果
- 这两个方法是 executor 的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的。
2.2 runAsync
runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)
方法。
2.2.1 runAsync异步执行,取不到返回值-指定线程池
代码
public class RunAsyncTest {
/**
* runAsync异步执行,取不到返回值-指定线程池
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture.runAsync(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
}, GlobalThreadPool.getExecutor());
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 17:28:06 524
Thread[main,5,main] 主线程-退出,time->2022-02-16 17:28:06 577
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 17:28:06 577
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 17:28:08 580
Process finished with exit code 0
以上代码调用runAsync
时,指定了线程池GlobalThreadPool.getExecutor()
,通过执行结果我们可以看到,子线程不是守护线程,且执行结果正常。如果我们不指定线程池,用默认的线程池ForkJoinPool.commonPool()
,会有什么不一样吗?
2.2.2 runAsync异步执行,取不到返回值-不指定线程池
代码
public class RunAsyncTest2 {
/**
* runAsync异步执行,取不到返回值-不指定线程池
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture.runAsync(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
});
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 17:37:03 196
Thread[main,5,main] 主线程-退出,time->2022-02-16 17:37:03 229
子线程-是否为守护线程:true
Thread[ForkJoinPool.commonPool-worker-1,5,main] 子线程-开始,time->2022-02-16 17:37:03 229
Process finished with exit code 0
通过执行结果,我们可以看到,子线程为守护线程,且子线程休眠2秒后,子线程-退出
的打印语句没有被执行,即runAsync()
方法里的逻辑代码没有被完全执行,如果这段代码在项目中使用,就会出现重大BUG。
出现这个问题的原因在于子线程为守护线程,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。
2.2.3 守护线程(Daemon Thread)
在Java中有两类线程:
- 用户线程 (User Thread)
- 守护线程 (Daemon Thread)。
所谓守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。
Java程序入口就是由JVM启动main
线程,main
线程又可以启动其他线程。当所有线程都运行结束时,JVM退出,进程结束。
将线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。在使用守护线程时需要注意一下几点:
-
thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。
-
在Daemon线程中产生的新线程也是Daemon的。
-
守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。
2.3 supplyAsync
supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable<T> task)
方法。
2.3.1 supplyAsync异步执行,不取返回值-指定线程池
代码
public class SupplyAsyncTest {
/**
* supplyAsync异步执行,不取返回值-指定线程池
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture.supplyAsync(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
}, GlobalThreadPool.getExecutor());
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 17:58:52 557
Thread[main,5,main] 主线程-退出,time->2022-02-16 17:58:52 594
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 17:58:52 595
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 17:58:54 600
Process finished with exit code 0
supplyAsync()
方法和runAsync()
方法一样有守护线程的问题,因此我们调用时指定线程池,而不用默认的线程池。
2.3.2 supplyAsync异步执行,同步取返回值-get
代码
public class SupplyAsyncTest2 {
/**
* supplyAsync异步执行,同步取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture<String> completableFuture =CompletableFuture.supplyAsync(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
}, GlobalThreadPool.getExecutor());
// 中间可以先执行一堆逻辑,再阻塞获取返回结果
System.out.println("中间可以先执行一堆逻辑,再阻塞获取返回结果");
try {
//阻塞获取返回结果
System.out.println("异步执行返回结果:" + completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 18:02:45 956
中间可以先执行一堆逻辑,再阻塞获取返回结果
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 18:02:46 000
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 18:02:48 008
异步执行返回结果:AlanChen
Thread[main,5,main] 主线程-退出,time->2022-02-16 18:02:48 009
2.3.3 supplyAsync异步执行,异步取返回值-whenCompleteAsync
代码
public class SupplyAsyncTest3 {
/**
* supplyAsync异步执行,异步取返回值
*/
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
//int a=1/0;
System.out.println(Thread.currentThread() + " 子线程-退出,time->" + getTime());
return "AlanChen";
}, GlobalThreadPool.getExecutor());
// 中间可以执行一堆逻辑
System.out.println("中间可以执行一堆逻辑");
//异步回调获取返回值
completableFuture.whenCompleteAsync((result, e) -> {
System.out.println("-------异步执行返回结果:" + result);
System.out.println("-------e=" + e);
}).exceptionally(f -> {
System.out.println("-----exception:" + f.getMessage());
return "出现异常后的返回值";
});
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 18:12:45 109
中间可以执行一堆逻辑
子线程-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程-开始,time->2022-02-16 18:12:45 149
Thread[main,5,main] 主线程-退出,time->2022-02-16 18:12:45 150
Thread[pool-1-thread-1,5,main] 子线程-退出,time->2022-02-16 18:12:47 159
-------异步执行返回结果:AlanChen
-------e=null
2.4 supplyAsync-异步回调thenApply / thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中。
2.4.1 thenApply同步调用
代码
public class SupplyAsyncTest5 {
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程1-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程1-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程1-退出,time->" + getTime());
return "AlanChen";
}, GlobalThreadPool.getExecutor());
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
// 中间可以执行一堆逻辑
System.out.println("中间可以执行一堆逻辑");
//同步执行
CompletableFuture<String> completableFuture2=completableFuture.thenApply((result)->{
System.out.println("子线程2-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程2-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程2-退出,time->" + getTime());
return "Hello "+result;
});
//异步回调获取返回值
completableFuture2.whenCompleteAsync((result, e) -> {
System.out.println("-------异步执行返回结果:" + result);
System.out.println("-------e=" + e);
});
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 18:31:07 833
子线程1-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程1-开始,time->2022-02-16 18:31:07 874
Thread[pool-1-thread-1,5,main] 子线程1-退出,time->2022-02-16 18:31:09 877
中间可以执行一堆逻辑
子线程2-是否为守护线程:false
Thread[main,5,main] 子线程2-开始,time->2022-02-16 18:31:10 878
Thread[main,5,main] 子线程2-退出,time->2022-02-16 18:31:12 892
Thread[main,5,main] 主线程-退出,time->2022-02-16 18:31:12 893
-------异步执行返回结果:Hello AlanChen
-------e=null
Process finished with exit code 0
2.4.2 thenApplyAsync异步调用
代码
public class SupplyAsyncTest6 {
public static void main(String[] args) {
System.out.println(Thread.currentThread() + " 主线程-开始,time->" + getTime());
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程1-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程1-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程1-退出,time->" + getTime());
return "AlanChen";
}, GlobalThreadPool.getExecutor());
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
// 中间可以执行一堆逻辑
System.out.println("中间可以执行一堆逻辑");
//异步执行
CompletableFuture<String> completableFuture2 = completableFuture.thenApplyAsync((result) -> {
System.out.println("子线程2-是否为守护线程:" + Thread.currentThread().isDaemon());
System.out.println(Thread.currentThread() + " 子线程2-开始,time->" + getTime());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " 子线程2-退出,time->" + getTime());
return "Hello " + result;
}, GlobalThreadPool.getExecutor());
//异步回调获取返回值
completableFuture2.whenCompleteAsync((result, e) -> {
System.out.println("-------异步执行返回结果:" + result);
System.out.println("-------e=" + e);
});
System.out.println(Thread.currentThread() + " 主线程-退出,time->" + getTime());
}
private static String getTime() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
return formatter.format(new Date());
}
}
执行结果
Thread[main,5,main] 主线程-开始,time->2022-02-16 18:31:10 701
子线程1-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程1-开始,time->2022-02-16 18:31:10 741
Thread[pool-1-thread-1,5,main] 子线程1-退出,time->2022-02-16 18:31:12 752
中间可以执行一堆逻辑
子线程2-是否为守护线程:false
Thread[pool-1-thread-1,5,main] 子线程2-开始,time->2022-02-16 18:31:13 748
Thread[main,5,main] 主线程-退出,time->2022-02-16 18:31:13 749
Thread[pool-1-thread-1,5,main] 子线程2-退出,time->2022-02-16 18:31:15 752
-------异步执行返回结果:Hello AlanChen
-------e=null
Process finished with exit code 0
网友评论