Future 表示异步计算的结果。提供了一些方法来检查计算是否完成,等待其完成以及检索计算结果。只有在计算完成后才可以使用get方法检索结果,必要时将其阻塞,直到准备就绪为止。取消通过 cancel 方法执行。提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果出于可取消性的目的而使用 Future 而不提供可用的结果,则可以声明 Future<?>
形式的类型,并作为基础任务的结果返回null。
示例:
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target)
throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask
类是Future的实现,该实现类实现 Runnable 接口,因此可以由执行程序执行。例如,上面带有 Submit 的构造可以替换为:
FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
内存一致性影响:异步计算采取的操作发生在另一个线程中相应的 Future.get() 之后的操作之前。
接口结构
public interface Future<V> {
//尝试取消执行此任务。如果任务已经完成,已经被取消或由于某些其他原因而无法取消,则此尝试将失败。
//如果成功,并且在调用 cancel 时此任务尚未开始,则该任务永远无法运行。
//如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应中断执行该任务的线程以尝试停止该任务。
//mayInterruptIfRunning == true, 表示中断执行中的线程,false 表示让线程正常完成
boolean cancel(boolean mayInterruptIfRunning);
//如果此任务在正常完成之前被取消,则返回true。
boolean isCancelled();
//如果此任务完成,则返回true。完成可能是由于正常终止,异常或取消引起的,在所有这些情况下,此方法都将返回true。
boolean isDone();
//必要时等待计算完成,然后检索其结果
V get() throws InterruptedException, ExecutionException;
//必要时最多等待给定时间以完成计算,然后检索其结果(如果有)。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future 有很多实现类,这里主要讲下他的一个实现类 FutureTask。
实现类 FutureTask
一个可取消的异步计算。此类提供 Future 的基本实现,其中包含启动和取消计算,查询以查看计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;如果计算尚未完成,则get方法将阻塞。一旦计算完成,就不能重新启动或取消计算(除非使用 runAndReset() 方法调用计算)。
FutureTask 可用于包装 Callable 或 Runnable 对象。由于 FutureTask 实现 Runnable,因此 FutureTask 可以提交给执行程序以执行。
除了用作独立类之外,此类还提供受保护的功能,这些功能在创建自定义任务类时可能很有用。
类图:
FutureTask.png
可以看到 FutureTask 实现了 RunableFuture,RunableFuture 顾名思义就是可执行的 Future,它是 Runable 和 Future 的结合。RunableFuture 重写了 Runable 的 run 方法,它的其中一个作用就是把执行结果放入返回结果中,具体到 FutureTask 的话就是把 Callable (Runable 也会被包装成 Callable)的返回值由 run -> set(result) 调用路径放入 outcome 成员变量中。
一般的执行过程:
- 主线程调用 submit 执行 FutureTask,FutureTask 执行中
- 主线程调用 get 方法,判断是否已执行完。如果已执行完,则直接获取结果。如果未执行完,则主线程阻塞直到 run 方法执行完成后去唤醒主线程。其中阻塞和唤醒用的是 LockSupport 的方法。
FutureTask 有一个成员变量 waiters,类型是 WaitNode,是一个静态内部类,它表示等待获取结果的线程链表,它是一个从尾部开始的单向链表,主要作用是存储等待获取结果的线程,在完成时唤醒这些线程:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
重点方法解析
1. run
run 方法实现自 Runable 接口,所以当线程启动的时候会首先运行这个方法来。
public void run() {
// 1. 如果 state != NEW 说明 run 方法已经运行过,直接 return
// 2. 如果 state == NEW && CAS 竞争 设置 runner 失败,说明已经有别的线程在运行,直接 return
// NEW 的状态由构造方法初始化,runner 是运行该 Callable 的线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;// 表示运行中是否出现异常
try {
result = c.call();// 调用实际自己实现的程序
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);// 出现异常,调用 setException
}
// 中间出现异常,则掉过 set
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果最终状态 >= INTERRUPTING,则处理中断
// cancel 方法会通过参数 mayInterruptIfRunning 来设置 state 的值
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看出,润 run 方法只能运行一次。
1.1 setException
运行时出现异常。
protected void setException(Throwable t) {
// 这里为什么要用 CAS 因为可能会和 cancel 方法产生竞争。
// 如果竞争失败,说明取消竞争成功,在 cancel 方法承担唤醒的工作,所以直接跳过。
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 竞争成功
outcome = t; // outcome 为一个 Throwable
// 把最终状态改为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
1.2. finishCompletion
//删除并发出所有等待线程的信号,调用done(),并取消进行中的方法
private void finishCompletion() {
// assert state > COMPLETING;
// 从 waiters 末尾开始遍历,for 自旋直到 CAS 成功。
for (WaitNode q; (q = waiters) != null;) {
// 使用 CAS 把 waiters 设置为 null,和 awaitDone 和 removeWatier 方法竞争
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自旋唤醒所有线程
for (;;) {
Thread t = q.thread;
// 唤醒线程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 这个一个空方法,主要用于继承重写,这也是模板模式的体现
done();
callable = null; // to reduce footprint
}
done 是一个模板方法,之前的一篇文章中讲到的 ExecutorCompletionService 有用到它。
1.3. handlePossibleCancellationInterrupt
它的作用就是当接收来自 cancel 的中断时,确保 task 还在运行,具体为什么我也不太清除。
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
2. runAndReset
顾名思义这个方法有两个操作 run 和 reset。它有一个 boolean 的返回值,只有在两个操作都成功的情况下才会返回 true,如果返回了 false ,有一种情况是它还是会执行了一次 task。
runAndReset 和 run 方法最大的区别是它不处理结果,也没有唤醒等待中的线程的操作。所以使用 runAndReset 的 task 最好没有返回值。还有就是使用了 runAndReset 就不要用 get 方法了(此时 get 的作用只剩下阻塞这个功能)。
一旦 cancel 就不能在次运行了。它被设计出来的用意就是执行多次的任务。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
3. get 和 get(time)
get 和 get(time) 的差别只是增加了等待时间。在代码层面,这两者几乎一样。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
// 根据 awaitDone 返回状态返回结果或抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
// 顺利运行
if (s == NORMAL)
return (V)x;
// 取消
if (s >= CANCELLED)
throw new CancellationException();
// task 执行过程中出现异常
throw new ExecutionException((Throwable)x);
}
可以看到几乎一模一样。最重要的方法是 awaitDone 方法。awaitDone 的作用就是把获取结果的线程放入等待链表,然后阻塞,直至被中断,计算完成或出现异常。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;//用于计时
WaitNode q = null;
boolean queued = false;
for (;;) {//自旋
//如果已经被中断,则 removeWaiter,抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;//获取该任务的运行状态,state 有7个值
// s > COMPLETING,说明 task 已经结束
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 马上就要结束,则让出cpu
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 初始化 WaitNode
else if (q == null)
q = new WaitNode();
// 是否已入队,没有则把 WaitNode 接到末尾
else if (!queued)
// 和 finishCompletion 和 removeWaiter 竞争
// 1. finishCompletion 竞争成功,说明 state 已经 > COMPLETING 则下次循环就会退出
// 2. removeWaiter 竞争成功,说明 waiters 变化了,下一次循环再次竞争
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
// 如果使用了计时,则判断是否超时,如果超时则以出 WaitNode 并立即返回无需等待结果,否则阻塞 nanos
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
//阻塞,直到被唤醒(正常完成 || 异常 || 中断)
LockSupport.park(this);
}
}
其中 state 有7个值,仅在set,setException和cancel方法中转换为中止状态。在完成时,它是一个瞬时值 COMPLETING(在 outcome 设置时)或 INTERRUPTING (中断时)。从这些中间状态到最终状态的转换使用便宜的有序/惰性写入,因为值是唯一的,并且无法进一步修改。分别是:
state | 描述 |
---|---|
1. NEW = 0 | 构造方法创建时 |
2. COMPLETING = 1 | 这是一个中间态,完成时和出现异常时有使用到 |
3. NORMAL = 2 | 完成运行时的最终状态 |
4. EXCEPTIONAL = 3 | 异常时的最终状态 |
5. CANCELLED = 4 | 已取消 |
6. INTERRUPTING = 5 | 中断中 |
7. INTERRUPTED = 6 | 已中断 |
可能的 state 转换:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
3.1. removeWaiter
// 半机翻:尝试取消链接超时或中断的等待节点以避免堆积垃圾。 内部节点的拼接
// 没有CAS,因为这对释放者无论如何遍历都没有影响。 为了避免因未拼接而造成的影响
// 删除的节点,如果出现明显的情况,则遍历该列表根。 当节点很多时,这很慢,但是我们没有
// 期望列表足够长以超过更高的开销
计划。
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
//遍历整个链表
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// q.thread != null,则把 q 当作前一个节点,遍历下一个节点
if (q.thread != null)
pred = q;
// q.thread == null && pred != null,表示当前节点不是第一个节点,是一个中间节点
// 这里没有使用 CAS,如果出现多个线程同时遍历,前一个节点变为null,则重新从头遍历
// 为什么没有使用 CAS 因为作者的想法时这个链表不会太长,所以我们使用时不应该使这个量表太长
// 操作:把下一个节点连接到前一个节点的后面
else if (pred != null) {
pred.next = s;// 把 s 连接到 pred 后面
if (pred.thread == null) // check for race
continue retry;
}
// q.thread == null && pred == null,表示第一个节点的 thread == null,
// 这里使用 CAS,因为可能多个线程在操作
// 操作:把下一个节点设置为末尾节点,如果竞争失败则重新从头遍历
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}
这个方法告诉我们使用这个链表时,不应该使得链表太长。
既然前面说到 Future 是要给可取消的计算,我们接下来就来看下 cancel 方法:
4. cancel
中断运行中(state == NEW)的线程,但是真正的中断还是要配合你的 callable 的实现,就是是否对中断信号进行了检测并作出相应的处理。
public boolean cancel(boolean mayInterruptIfRunning) {
// state != NEW 或 state == NEW CAS 竞争失败就直接返回 false 取消失败,说明线程已经成功运行或出现了异常
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
// runner 在 run 方法的最后会被置为 null
if (t != null)
t.interrupt();// 发出中断信号
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒线程,请看1.2节
finishCompletion();
}
return true;
}
好了 Future 到这里就讲完了,如果有错误的请指正。
网友评论