FutureTask的继承结构为:
- FutureTask实现RunnableFuture接口,RunnableFuture接口实现了Runnable接口和Future接口,Future接口为实现异步线程的接口其他提供多种方法,cancel()取消线程,isDone(),判断线程是否执行完毕,get()获取线程执行结构,get(long timeout, TimeUnit unit)在规定时间内获取线程返回结果。
- 对future理解可为:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。
- FutureTask解析:
当我们用Callable方法去创建线程的时候,步zou为,创建一个callable接口的实现类然后将其作为参数构造出FuturnTask对象,将FutureTask对象作为具体任务的承载体交给Thread去执行,当调用Thread的run方法的时候,执行FuturnTask的run方法
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
futurnTask中定义了线程任务执行的7个状态,分别为NEW(0新建),COMPLETING(1完成)NORMAL(2普通),EXCEPTIONAL(3异常),CANCELLED(4取消),INTERRUPTING(5中断),INTERRUPTED(6中断)。
当执行run方法的时候先进行任务状态的判断,不是新建状态代表已经执行完毕或者发生异常,直接renturn,否为继续向下,取当前FutureTask承载的callable对象,继续进行判空处理,然后调用承载对象的call方法进行执行。当执行完毕之后执行set方法
将当前执行任务的接口赋值给outcome然后使用CAS替换当前FutureTask对象的状态.
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
之后执行finishCompletion()方法进行后续处理,该方法主要作用为使用 LockSupport.unpark(t);方法唤醒因为调用get方法获取任务返回值结果而被阻塞的线程(被阻塞的线程存在链表之中)
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
- get()方法
FutureTask为我们提供两个get方法一个定时获取一个阻塞等待获取,本文以get()为例.
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
同样先是判断状态,判断当前FutureTask承载的任务是否执行完毕,如果没有执行完毕,则执行awaitDone方法
先判断是否设置超时时间 timed为false为没有设置 true为设置了等待时间然后进行循环,如果当前任务在执行过程中被中断则直接移除当前调用get方法的线程抛出异常,然后就是进行状态判断如果小于完成状态则表示为新建,将当前线程包装为一个WaitNode节点使用CAS方式存入阻塞队列之中(之后继续进行循环,如果状态不满足完成状态则 LockSupport.park(this)阻塞当前线程,当执行完毕的时候会run方法会进行一系列操作将状态设置为完成状态,然后唤醒被阻塞的线程,否则会一直阻塞在这里)被唤醒后,如果等于完成状态表示任务已经执行完毕,在进行后续处理,使当前线程让步即可,无需等待阻塞,如果大于完成状态则表示执行完毕,返回状态。回到get()方法执行report(s)方法。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null; //包装当前调用get方法的线程为WaitNode节点
boolean queued = false; //调用get()方法线程的阻塞队列
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
该方法的判断任务执行状态如果为普通则代表没有发生异常状态(将返回值包装为对应的泛型对象返回),如不是普通则代表被取消或者执行异常,直接抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
网友评论