
FutureTask.png

1561082003855.png

1561082025770.png
Runnable
run
public void run() {
// Task的状态必须是NEW才能开始run
// 将当前线程设置为task的runner,如果失败,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务,并拿到返回结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 如果执行失败
result = null;
ran = false;
// 唤醒等待队列,并返回异常
setException(ex);
}
if (ran)
// 唤醒等待队列,并返回执行结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 最后将runner置空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 再次查看任务状态,看是不是被人cancel了
if (s >= INTERRUPTING)
// 等待中断完毕
handlePossibleCancellationInterrupt(s);
}
}
setException
protected void setException(Throwable t) {
// 首先设置任务为COMPLETING的中间状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
// 如果前面进入中间状态成功,那么直接设为EXCEPTIONAL,不再去做CAS
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
set
protected void set(V v) {
// 首先设置任务为COMPLETING的中间状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 保存执行结果
outcome = v;
// 如果前面进入中间状态成功,那么直接设为EXCEPTIONAL,不再去做CAS
// 也可以认为,在进入NORMAL前,必须是COMPLETING,所以也没必要再去做CAS
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
finishCompletion
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 首先先拿到等待队列的同时,清空列表
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 将等待队列依次唤醒,说明任务执行的结果有了
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 钩子,子类定义
done();
callable = null; // to reduce footprint
}
handlePossibleCancellationInterrupt
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
// INTERRUPTING就代表该任务执行线程正在中断中,一直自旋到中断完毕才退出
// 否则让出CPU资源,等待下个调度再来查看是否中断完毕,如果往复
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
Future
cancel
public boolean cancel(boolean mayInterruptIfRunning) {
// 任务状态必须为NEW,且能更新为INTERRUPTING或CANCELLED,就往下继续
// 否则,cancel失败
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果mayInterruptIfRunning为true,就表示当前在执行的任务会被中断
// 反之,会让任务执行完,也就是说只会设置状态为CANCELLED,并不会做其他操作
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断任务执行线程
t.interrupt();
} finally { // final state
// 设置状态为INTERRUPTED,表示中断完成
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒等待列表
finishCompletion();
}
return true;
}
isDone
public boolean isDone() {
// 只要不是NEW,就认为任务已经完成,不管是成功,异常,还是取消。
// 因为只要不是NEW,最终都会流转成终态,而不是中间态
return state != NEW;
}
isCancelled
public boolean isCancelled() {
// 从前面的Cancel来看,只要是CANCELLED/INTERRUPTING/INTERRUPTED,都认为是被取消
return state >= CANCELLED;
}
get
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果是NEW或COMPLETING,说明任务没有完成,那么开始等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
// 如果该线程已经被中断,那么移除该线程,抛出InterruptedException
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果大于COMPLETING,说明进入终态,也就是拿到了执行结果,返回s
// 且将执行线程置空
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果还在COMPLETING,说明正在执行中,暂停等待
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 如果当前的waitnode为空,新建一个
else if (q == null)
q = new WaitNode();
// 这里的意义是将新节点作为栈顶
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 超时处理
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果已经到期,那么移除该waitnode,并返回任务状态
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 否则,超时等待
LockSupport.parkNanos(this, nanos);
}
// 否则,阻塞,等待任务执行完唤醒
else
LockSupport.park(this);
}
}
report
private V report(int s) throws ExecutionException {
Object x = outcome;
// 只有当任务状态时NORMAL,才返回执行结果
if (s == NORMAL)
return (V)x;
// 如果任务被取消,抛出CancellationException
if (s >= CANCELLED)
throw new CancellationException();
// 否则任务的状态是EXCEPTIONAL,再抛出捕获的执行异常
throw new ExecutionException((Throwable)x);
}
runAndReset
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 跟run不同的是,这里会重置任务状态为NEW,以便下次周期性的重复调用
return ran && s == NEW;
}
网友评论