FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
如:
- 取消任务执行
- 查询任务是否执行完成
- 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
注意:一旦任务执行完成或取消任务,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask实现了Runnable接口和Future接口,因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。
如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。
示例
public class FutureTaskDemo {
public static void main(String[] args) throws InterruptedException{
FutureTask<Integer> ft = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num = new Random().nextInt(10);
TimeUnit.SECONDS.sleep(num);
return num;
}
});
Thread t = new Thread(ft);
t.start();
//这里可以做一些其它的事情,跟futureTask任务并行,等需要futureTask的运行结果时,可以调用get方法获取。
try {
//等待任务执行完成,获取返回值
Integer num = ft.get();
System.out.println(num);
} catch (Exception e) {
e.printStackTrace();
}
}
}
FutureTask 源码分析
JDK1.7及之前,FutureTask 通过使用内部类Sync继承AQS来实现。
内部使用的AQS的共享锁。
AQS具体实现可参考 AbstractQueuedSynchronizer 源码分析
JDK1.8没有使用AQS,而是自己实现了一个同步等待队列,在结果返回之前,所有的线程都被阻塞,存放到等待队列中。
下面我们来分析下JDK1.8的FutureTask 源码
FutureTask 类结构
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 当前任务的运行状态。
*
* 可能存在的状态转换
* NEW -> COMPLETING -> NORMAL(有正常结果)
* NEW -> COMPLETING -> EXCEPTIONAL(结果为异常)
* NEW -> CANCELLED(无结果)
* NEW -> INTERRUPTING -> INTERRUPTED(无结果)
*/
private volatile int state;
private static final int NEW = 0; //初始状态
private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。
private static final int NORMAL = 2; //任务正常完成,结果被set
private static final int EXCEPTIONAL = 3; //任务抛出异常
private static final int CANCELLED = 4; //任务已被取消
private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断
private static final int INTERRUPTED = 6; //线程已被中断
//将要执行的任务
private Callable<V> callable;
//用于get()返回的结果,也可能是用于get()方法抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
//执行callable的线程,调用FutureTask.run()方法通过CAS设置
private volatile Thread runner;
//栈结构的等待队列,该节点是栈中的最顶层节点。
private volatile WaitNode waiters;
....
FutureTask实现的接口信息如下:
RunnableFuture 接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
RunnableFuture 接口基础了Runnable和Future接口
Future 接口
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
//判断任务是否已经取消
boolean isCancelled();
//判断任务是否结束(执行完成或取消)
boolean isDone();
//阻塞式获取任务执行结果
V get() throws InterruptedException, ExecutionException;
//支持超时获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
run 方法
public void run() {
//保证callable任务只被运行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
//判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
1.如果state状态不为New或者设置运行线程runner失败则直接返回false,说明线程已经启动过,保证任务在同一时刻只被一个线程执行。
2.调用callable.call()方法,如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程。
3.如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING
set方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
- 首先通过CAS把state的NEW状态修改成COMPLETING状态。
- 修改成功则把v值赋给outcome变量。然后再把state状态修改成NORMAL,表示现在可以获取返回值。
- 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。
setException 方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
同上 set(V v) 方法
- 首先通过CAS把state的NEW状态修改成COMPLETING状态。
- 修改成功则把v值赋给outcome变量。然后再把state状态修改成EXCEPTIONAL,表示待返回的异常信息设置成功。
- 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。
handlePossibleCancellationInterrupt方法
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
该方法是如果正在响应中断(EXCEPTIONAL),则等待响应中断结束(INTERRUPTED)。
finishCompletion方法
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
//通过CAS把栈顶的元素置为null,相当于弹出栈顶元素
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
把栈中的元素一个一个弹出,并通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)
runAndReset 方法 (可被子类重写,外部无法直接调用)
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
该方法和run方法的区别是,run方法只能被运行一次任务,而该方法可以多次运行任务。而runAndReset这个方法不会设置任务的执行结果值,如果该任务成功执行完成后,不修改state的状态,还是可运行(NEW)状态,如果取消任务或出现异常,则不会再次执行。
而只是执行任务完之后,
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
如果state状态小于等于COMPLETING,说明任务还没开始执行或还未执行完成,然后调用awaitDone方法阻塞该调用线程。如果state的状态大于COMPLETING,则说明任务执行完成,或发生异常、中断、取消状态。直接通过report方法返回执行结果。
get(long timeout, TimeUnit unit) 方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
同上面get方法,该get方法支持阻塞等待多长时间,如果超时直接抛出TimeoutException异常。
report方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
如果state的状态为NORMAL,说明任务正确执行完成,直接返回计算后的值。
如果state的状态大于等于CANCELLED,说明任务被成功取消执行、或响应中断,直接返回CancellationException异常
否则返回ExecutionException异常。
awaitDone 方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果state状态大于COMPLETING 则说明任务执行完成,或取消
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//构建节点
else if (q == null)
q = new WaitNode();
//把当前节点入栈
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间
//如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//阻塞当前线程
else
LockSupport.park(this);
}
}
构建栈链表的节点元素,并将该节点入站,同时阻塞当前线程等待运行主任务的线程唤醒该节点。
JDK1.7版本是使用AQS的双向链表队列实现的。
removeWaiter 方法
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;
}
break;
}
}
}
移除栈中的节点元素,需要使用CAS自旋来保障移除成功。
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
- 如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
- 唤醒所有在get()方法等待的线程
总结:状态为NEW时,cancel和run方法才可以被运行。
- 任务开始运行后,不能在次运行,保证只运行一次(runAndReset 方法除外)
- 任务还未开始,或者任务已被运行,但未结束,这两种情况下都可以取消; 如果任务已经结束,则不可以被取消 。
想了解更多精彩内容请关注我的公众号
网友评论