作者: 一字马胡
转载标志 【2017-12-07】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-12-07 | 学习Future的总结 | 关于Future的深入学习内容 |
2017-12-14 | 深度学习Java Future (二) | 补充相关内容 |
Future
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
上面这段文字已经说明了Future的本质,一个Future代表一个异步计算的结果,并且它提供一些方法来让调用者检测异步过程是否完成,或者取得异步计算的结果,或者取消正在执行的异步任务。本文将分析总结Future的一些实现细节,希望可以弄明白Future的原理。
为了有一定的目标性,本文将选取Future的一个基本实现FutureTask来进行分析总结,其他更为复杂丰富的Future实现日后再进行分析总结。下面的图片展示了FutureTask的类关系图,下文会对FutureTask进行详细的分析:
从类图可以看出,FutureTask实现了Runnable接口和Future接口,下面的图片展示了FutureTask提供的一些接口,下文中将对其中的一些接口做详细分析。
FutureTask
首先来看一下FutureTask的一些关键字段,第一个需要注意的是state字段,看下面的代码:
/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state代表当前任务的状态,NEW代表当前以及获取到任务,准备开始执行任务。COMPLETING状态代表正在进行任务处理中,NORMAL表示任务执行结束,并且任务处理结果正常,没有异常出现,EXCEPTIONAL则表示执行任务的过程中出现了异常,CANCELLED表示任务被取消了,INTERRUPTING表示任务在执行过程中被中断,是一个中间状态,INTERRUPTED表示中断结束。这些状态的可能转换关系在上面的注释中可以看到,可以发现总共只有四种状态转移路径,在下文的某些方法分析中还会提到state。
第二个需要关注的字段是callable字段,这就是实际需要执行的任务,而结果将被设置到outcome字段中去,runner字段代表了运行任务的线程。waiters字段则代表阻塞在该Future上的线程链表,可以看一下waiters的数据结构:
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
可以看到是一个非常简单的单链表数据结构。下面来看一下FutureTask的构造函数,FutureTask有两个构造函数,分别如下:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到,第一个构造函数传递了一个Callable类型的参数,构造函数将该Callable参数初始化给类的callback字段,并且初始化state为NEW。第二个构造函数传递了两个参数,一个是Runnable类型的runnable,代表需要执行的任务,以及任务的返回结果,其实还是使用这两个参数来构造出一个callback,并且执行和第一个构造函数一样的逻辑,下面是如何使用Runnable和result来构造出一个callback的剩下细节:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
下面来看一下Future的一个核心方法get的实现细节,下面是get方法的具体代码:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
首先获取到任务的当前状态,如果状态小于等于COMPLETING,那么根据最开始的定义,可以知道目前的状态只可能是NEW或者COMPLETING,也就是任务还没有开始执行,或者还在继续执行没有结束,那么就调用awaitDone方法来进行等待任务执行完成,否则,也就是说,当任务的当前状态大于COMPLETING的时候,那么当前状态可能为:
- NORMAL 正常结束
- EXCEPTIONAL 异常结束
- CANCELLED 被取消
- INTERRUPTING 正在中断
- INTERRUPTED 中断结束
先来看第一条分支,也就是调用awaitDone的具体流程。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
这个方法还是很复杂的,下面根据分支来分析一下这个方法都在做什么事情:
- 如果当前线程被中断,那么就将所有等待在该Future上的线程都从阻塞链表移除
- 如果发现任务的状态变为某种final态的时候,也就是state大于COMPLETING的时候,说明任务执行已经结束了(无论是怎么结束的都不再运行中),那么返回任务的状态,并且清除等待在该Future上的线程
- 如果发现任务的当前状态为COMPLETING,那么说明任务正在执行过程中,需要等待一下
- 否则,就需要等待了,如果配置了不等待,那么不会将当前线程添加到等待链表中,否则将当前线程添加到等待链表中去
- 如果配置了等待时间,那么就需要判断是否超时了,如果超时了,那么就阻塞等待,如果没有设置超时时间,那么就会一直阻塞等待下去直到任务处理完成(不管怎么完成)
上面分析完了当状态小于等于COMPLETING的时候的处理流程,下面来看一下当任务的状态大于COMPLETING的时候的处理流程:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
需要判断任务的当前状态,其实现在可以知道目前的状态可能为什么,任务肯定已经不再运行了,要么正常结束,要么异常结束,要么被中断,根据不同的情况进行不同的处理,比如当发现状态为NORMAL的时候,就判断为任务正常结束,处理结果应该保存在outcome中,所以返回outcom就可以了,当发现任务时被取消的时候,get操作会抛出异常,其他情况也会抛出异常来告知调用者发生的情况。Future除了提供不带参数的get方法外,还提供了一个带参数的get方法,也就是可以设置等待时间,超时会抛出异常,下面是带参数的get方法的细节:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
该方法的实现细节与不带参数的get方法一样,只是增加了超时机制,等待时间超过了设定的时间之后就会抛出TimeoutException异常。该方法和不带参数的get方法是共用一个awaitDone方法来实现任务结果的等待获取的,所以就不再往下赘述了。
下面再来看一个Future的比较重要的方法cancel,也就是取消任务的执行,它的具体实现细节看下面的代码:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
首先判断任务的当前状态,如果不为NEW或者试图将任务的状态设置为NEW失败之后,就会返回false告诉用户cancel失败了,否则,就调用负责执行任务的线程的interrupt方法来结束任务的运行,并且会更新任务的状态。在finally中会调用一个方法finishCompletion,下面是这个方法的具体实现细节:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
这个方法做的事情就是告诉所有等待在该Future上的线程,让他们别等了,任务已经被cancel了,再等下去也不会有结果了。
文章开头给出了FutureTask的类关系图,并且知道了FutureTask继承了Runnable,我们在创建了一个FutureTask之后,会使用线程池来执行这个FutureTask,最后会执行FutureTask的run方法,所以最为重要的就是FutureTask的run方法,下面开始分析FutureTask的run方法的具体实现细节:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
首选,如果任务的当前状态不是NEW,或者试图将任务的状态变为NEW失败的时候,或者试图设置runner字段为当前的线程的时候遇到失败,也就是获取到了执行任务的具体Thread,但是设置字段失败,run方法都将直接返回,这也就说明了为什么说Future是不可逆的,只能执行一次。接着,run方法获取到了具体的任务,并且再次判断该任务的状态是否为NEW,以及判断任务是否为null,如果这些判断都通过的话,那么就可以执行任务了,具体的执行就是调用Callable的call方法来获取结果,如果在执行过程中抛出异常,那么就需要调用setException来设置具体的异常,否则调用set方法来设置任务的执行结果,下面先来看setException的具体细节:
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
这个方法会设置任务的状态为EXCEPTIONAL,并且调用finishCompletion来做一些任务的收尾工作。下面来看一下正常结束时候的set细节:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
和setException一样,只是将任务的状态设置为了NORMAL而不是EXCEPTIONAL,这样调用线程在进行get方法调用的时候就可以获取到正常的结果了。
到此,本文分析了Future的基本实现,并且基于FutureTask来进行具体的分析,思路更加清晰,再次需要说明的是,一个Future代表一个异步计算的结果,我们可以取消任务,可以等待任务,并且可以设置一个超时时间以控制等待时间,当然,本文的目的是初步理解Future的原理,为了深刻理解Future的原理,需要做更为复杂丰富的分析总结,下一步可以借助CompletableFuture来深入学习Future,关于CompletableFuture的一些基础知识,可以参考文章Java CompletableFuture,对于CompletableFuture的更为深入的学习总结将在未来适宜的时候进行。
网友评论
removeWaiter(q);
throw new InterruptedException();
}
当前线程中断后为什么要把所有等待在该Future上的线程都从阻塞链表移除?
private volatile Thread runner;
尝试执行一次这个CAS,来将当前的ThreadPool中对应的Worker的thread设置为当前的FutureTask的 runner失败,这次就直接放弃执行了,是这个意思吗?? 如果是这样的话,如果竞争,FutureTask的执行岂不是没有任何保证?