概述
future-task.png FutureTask
实现了RunnableFuture
接口,它既可以作为Runnable
被提交给Executor
去执行,又可以作为Future
获取异步任务的执行结果,或者取消异步任务。一句话定义,FuntureTask
代表的是一个可取消的异步任务。
FutureTask的状态机模型
JDK1.8中,FutureTask
的实现采用了状态机模型,并且不同状态的数值是经过精心设计的。FutureTask
内部维护了一个状态变量,用来表示任务执行的各个阶段。
// 使用volatile禁用线程、cpu缓存
// 以便其它线程能立即观测到状态的改变
private volatile int state;
// 初始状态
private static final int NEW = 0;
// 等待完成中
private static final int COMPLETING = 1;
// 正常完成
private static final int NORMAL = 2;
// 异常结束
private static final int EXCEPTIONAL = 3;
// 已取消
private static final int CANCELLED = 4;
// 响应打断中
private static final int INTERRUPTING = 5;
// 已打断
private static final int INTERRUPTED = 6;
其中,COMPLETING
和INTERRUPTING
是中间状态,NORMAL
、EXCEPTIONAL
、CANCELLED
和INTERRUPTED
是终态,一旦到达终态后续就不能再发生状态转换了。同时状态的转换并不是任意的,比如不能从COMPLETING -> INTERRUPTING
,可能的状态转换是以下几种情形之一:
/**
* Possible state transitions:
*
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
FutureTask的实例变量
/** 实际被执行的Callable,因为Runnable不能返回结果 */
private Callable<V> callable;
/** 任务的执行结果,或者执行过程中出现的异常 */
private Object outcome;
/** 执行Callable的线程 */
private volatile Thread runner;
/**
* Treiber stack构成的等待线程栈,
* 因为可能会有多个线程调用Future#get()
* 来获取任务执行结果
*/
private volatile WaitNode waiters;
/** 单链表实现的Treiber stack */
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
Treiber stack
这个词在整个java.util.concurrent
包中出现的次数特别多,那么它到底是怎样的一个数据结构呢?简单来说,Treiber stack
是利用细粒度并发原语(CAS)实现的一种无锁结构。Treiber stack
内部维护链表头结点,在push
和pop
时通过CAS来保证线程安全,下面是一个简单的Treiber stack
实现。
public class TreiberStack<E> {
/**
* 使用AtomicReference维护栈顶元素,用于后续CAS操作
*/
private final AtomicReference<Node<E>> top = new AtomicReference<>();
/**
* 入栈时首先检查栈顶元素,只有在确定其它线程
* 没有修改时才能入栈成功,否则进入重试
*/
public void push(E e) {
Node<E> old;
Node<E> cur = new Node<>(e);
do {
old = top.get();
cur.next = old;
} while (!top.compareAndSet(old, cur));
}
/**
* 出栈时也需要检查栈顶元素,其它线程没有修改时
* 才能出栈,否则进入重试
*/
public E pop() {
Node<E> old;
Node<E> cur;
do {
old = top.get();
if (old == null) {
return null;
}
cur = old.next;
} while (!top.compareAndSet(old, cur));
return old.value;
}
private static class Node<E> {
final E value;
Node<E> next;
Node(E value) {
this.value = value;
}
}
}
构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask
实际上是通过将Runnable
包装成Callable
来获取异步任务的执行结果,而显示初始化state
则是利用了Happens-Before
中的程序次序规则和volatile
变量规则来保证Callable
的线程可见性:
- 程序次序规则:在一个线程内,按照控制流顺序,书写在前面的操作Happens-Before于书写在后面的操作。
- volatile变量规则:对一个volatile变量的写操作Happens-Before于后面对这个变量的读操作,这里的"后面"指时间上的先后顺序。
核心方法
run
/**
* run方法保证任务只会被执行一次
*/
public void run() {
// 1. 任务必须处于初始状态NEW
// 2. 当前没有分配线程来执行底层的callable
// 只有同时满足这两个条件,才能启动任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// callable会在任务执行完成之后置空
// state可能在#runAndReset()中恢复到初始状态
// 这一步检查也是为了确保确实有必要继续执行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
// 正常执行完成
ran = true;
} catch (Throwable ex) {
result = null;
// 执行过程中遇到异常
ran = false;
// 转换状态到EXCEPTIONAL
setException(ex);
}
// 如果正常执行完成
if (ran)
// 转换状态到NORMAL
set(result);
}
} finally {
// 整个任务执行期间,runner都是不为null的
// 保证了run()方法不会被多个线程同时执行
runner = null;
// run()方法结束之后,state必须到达终态
// 如果在执行过程中遭遇打断,此时状态可能
// 还是INTERRUPTING,没有到达INTERRUPTED
int s = state;
if (s >= INTERRUPTING)
// 自旋等待state到达终态INTERRUPTED
handlePossibleCancellationInterrupt(s);
}
}
/**
* #run()正常结束
*/
protected void set(V v) {
// CAS切换到中间状态COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置返回值
outcome = v;
// 设置为正常结束
// 到达终态后不能再继续转换,因此可以使用lazySet
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 唤醒执行期间因#get()阻塞的线程
finishCompletion();
}
}
/**
* #run()异常结束
*/
protected void setException(Throwable t) {
// CAS切换到中间状态COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置异常信息
outcome = t;
// 设置为异常结束
// 到达终态后不能再继续转换,因此可以使用lazySet
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 唤醒执行期间因#get()阻塞的线程
finishCompletion();
}
}
/**
* 公共的完成逻辑:清除并唤醒执行期间因#get()阻塞的线程
*/
private void finishCompletion() {
// assert state > COMPLETING;
// 获取链表头节点
for (WaitNode q; (q = waiters) != null;) {
// 清除表头,除了局部变量q再没有其它引用指向它了
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 循环处理每一个链表节点
for (;;) {
// 唤醒因#get()阻塞的线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 检查到达尾节点
WaitNode next = q.next;
if (next == null)
// 是则表示已完成
break;
// 断掉引用
// 这样GC时可以及时发现并清理
q.next = null; // unlink to help gc
// 转到下一个节点
q = next;
}
break;
}
}
// 调用钩子方法
done();
// callable在结束时置空
callable = null; // to reduce footprint
// 方法退出时,waiters实例变量为null,局部变量q超出作用域
// 这样就完成了对整个Treiber stack的清理
}
/**
* 确保在#run()退出时state到达终态
*/
private void handlePossibleCancellationInterrupt(int s) {
// 如果确实处于中间状态INTERRUPTING
if (s == INTERRUPTING)
// 那么就自旋等待
while (state == INTERRUPTING)
// 告诉调度器出让cpu时间
Thread.yield(); // wait out pending interrupt
// 按照约定,INTERRUPTING之后必然是INTERRUPTED
// assert state == INTERRUPTED;
}
cancel
/**
* 取消任务
* @param mayInterruptIfRunning 如果已经开始执行,是否需要打断执行中的任务
*/
public boolean cancel(boolean mayInterruptIfRunning) {
// 只有未达到终态的任务才能取消
if (!(state == NEW &&
// 如果已经开始执行,根据参数决定是否打断任务的执行
// 当然,这里也是使用CAS控制并发
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果需要打断的话
if (mayInterruptIfRunning) {
try {
// 就打断它
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
// 打断完成进入终态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 同样,取消成功以后需要进行清理
finishCompletion();
}
return true;
}
get
/**
* 获取任务执行结果
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 未开始或执行中的任务才有可能获取到结果
if (s <= COMPLETING)
// 阻塞当前线程,直到终态
s = awaitDone(false, 0L);
// 根据状态获取结果
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 未开始或执行中的任务才有可能获取到结果
if (s <= COMPLETING &&
// 等待结束后,如果仍未到达终态,则为超时
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 根据状态获取结果
return report(s);
}
/**
* 等待任务产生执行结果(阻塞式),或者等待过程中被打断的话,抛出异常
* @param timed 是否是带有时间限制的等待
* @param nanos 最长等待时间
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待的截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 根据LockSupport.parkXXX系列方法的文档
// park系列方法会因为
// 1. 其它线程调用unpark()
// 2. 到达parkXXX指定时间
// 3. 线程被打断
// 4. 无理由
// 以上4种情况返回,因此park系列方法需要保证在循环中,在方法返回时再次进行条件检测
for (;;) {
// 检测是否因为线程打断返回
if (Thread.interrupted()) {
// 是的话进行节点清理
removeWaiter(q);
// 抛出异常
throw new InterruptedException();
}
// 读取当前状态
int s = state;
// 状态机的值是经过精心设计的
// COMPLETING之后的状态要么是终态
// 要么是打断,而打断已经处理过了
// 因此大于COMPLETING也就是到达了终态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 处于COMPLETING状态,说明任务很快就会结束了
// 这里的处理是不进行阻塞等待,而是调用Thread.yield()
// 出让cpu时间,等待任务执行完成,从而由上一步s > COMPLETING处理
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 来到这里,说明s == NEW,这是真正需要进行阻塞等待的状态
// 如果还没有节点,新建之
else if (q == null)
q = new WaitNode();
// 还没有入栈的话,入栈之
// 这两步就是一个普通的Treiber stack入栈操作了
// 经过这两步之后,调用#get()的线程就成功加入等待队列了
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 计算需要等待的时间
nanos = deadline - System.nanoTime();
// 小于0表示等待已经结束了
if (nanos <= 0L) {
// 清理节点
// 对应Treiber stack出栈操作
removeWaiter(q);
return state;
}
// 带有时间限制的等待
LockSupport.parkNanos(this, nanos);
}
else
// 无限期等待
LockSupport.park(this);
}
}
/**
* 报告任务的执行结果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// NORMAL表示正常执行完成
if (s == NORMAL)
// outcome携带的是callable的返回值
return (V)x;
// 已取消的话,抛出异常
if (s >= CANCELLED)
throw new CancellationException();
// 剩下的就是执行异常的情况了
throw new ExecutionException((Throwable)x);
}
isCancelled/isDone
public boolean isCancelled() {
// 比CANCELLED大的只有INTERRUPTING和INTERRUPTED
// 而INTERRUPTING和INTERRUPTED同样是取消的状态之一
// 它表示任务已经开始执行的情况下的强制取消
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
得益于状态机数值的精心设计,isCancelled
和isDone
可以根据状态简单的判定。注意,isDone
并不仅仅表示任务正常执行结束或者执行遇到异常,任务被取消或者被打断同样也计入isDone
。FutureTask
的源码就分析到这里了,下次再分享一下线程池的源码分析吧。
网友评论