6、FutureTask
可取消的异步计算。该类提供了一个Future的基本实现,提供了启动和取消计算、查询计算是否完成以及检索计算结果的方法。结果只能在计算完成后才能检索; 如果计算尚未完成,则get方法将阻塞。一旦计算完成,则无法重新启动或取消计算(除非使用runAndReset()调用计算 )。
FutureTask可用于包装Callable或Runnable对象。因为FutureTask实现Runnable,一个FutureTask可以提交到一个Executor执行。除了作为独立类之外,此类还提供了protected功能,在创建自定义任务类时可能很有用。
以上根据JDK注释翻译而来,读起来有些拗口。FutureTask可以与Thread绑定、或提交到线程池执行。下面来看一个具体的例子:
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> future = new FutureTask<>(new Runnable() {
@SneakyThrows
@Override
public void run() {
TimeUnit.SECONDS.sleep(5);
}
}, searchByKey("name"));
new Thread(future).start();
doSomething();
System.out.println(future.get());
}
@SneakyThrows
private static String searchByKey(String key) {
return "小明";
}
private static void doSomething() {
System.out.println("异步任务提交后,可以做一些别的事情了。。。");
}
}
6.1、FutureTask生命周期
private volatile int state; // 维护生命周期
private static final int NEW = 0; // 任务初始状态
private static final int COMPLETING = 1; // 任务执行完、或抛出异常,尚未设置结果
private static final int NORMAL = 2; // 任务执行完,结果设置完毕
private static final int EXCEPTIONAL = 3; // 任务抛出异常,结果(异常信息)设置完毕
private static final int CANCELLED = 4; // 任务取消
private static final int INTERRUPTING = 5; // 取消任务,且线上尚未被中断
private static final int INTERRUPTED = 6; // 取消任务,且任务已被中断
6.2、创建FutureTask
FutureTask构造函数有两种类型:
- 参数为Callable类型。
- 参数为Runnable类型,并通过RunnableAdapter适配其返回值。
// Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// Runnable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
6.3、run()方法
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
// 任务运行期间是否抛出异常
boolean ran;
try {
// 若任务为Callable,直接执行;若任务为Runnable,通过RunnableAdapter执行
result = c.call();
ran = true;
} catch (Throwable ex) {
// 执行任务失败,设置异常结果
result = null;
ran = false;
setException(ex);
}
if (ran)
// 执行任务成功,设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 执行任务
- 若任务为Callable,直接执行。
- 若任务为Runnable,通过RunnableAdapter执行。
- 记录任务是否抛出异常
- 无异常,调用set()方法设置结果。
- 有异常,调用setException()方法设置结果。
在set()、setException()方法中可以看到FutureTask生命周期的变化。
6.3.1 set() 设置结果
protected void set(V v) {
// state NEW-->COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置结果
outcome = v;
// state COMPLETING-->NORMAL
// 任务正常执行完,则NORMAL为最终状态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
6.3.1 setException() 设置异常结果
protected void setException(Throwable t) {
// state NEW-->COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 设置异常结果
outcome = t;
// state COMPLETING-->EXCEPTIONAL
// 任务执行过程中抛出异常,EXCEPTIONAL为最终状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
6.3.1 finishCompletion()
无论run()是否抛出异常,最终都会执行finishCompletion()方法。finishCompletion()可以移除队列中已经执行完任务的线程、唤醒其它阻塞的线程。该方法需要与get()方法共同分析。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒因调用get()方法而阻塞的线程
LockSupport.unpark(t);
}
// 回收当前节点,并继续唤醒下一个节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 模板方法
done();
callable = null; // to reduce footprint
}
6.4、get()方法
get()方法以阻塞的形式等待任务执行完成,然后检索结果。并提供了无限期阻塞、带超时时间阻塞两种方式。
// 两种方式最终均调用awaitDone()方法,并通过timed参数指定是否超时获取
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
// 若线程被中断,抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任务完成,回收线程,返回结果
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 任务完成,尚未设置结果,让出线程调度权
else if (s == COMPLETING)
Thread.yield();
// 构建节点
else if (q == null)
q = new WaitNode();
// 节点加入链表
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
// 超时获取
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 阻塞
else
LockSupport.park(this);
}
}
awaitDone()以自旋的方式不断处理获取任务的线程,并将其构建成一个WaitNode(FutureTask内部类)节点,从而形成一个简单的链表。
若调用get()方法之后,run()方法尚未执行完成,则阻塞、或超时阻塞获取任务的线程,直至run()方法完成后,在finishCompletion()方法中将其唤醒。
awaitDone()方法执行完成后,调用report()方法检索计算结果、或异常结果。
6.4、cancel()方法
FutureTask一个显著的特点,是可以取消任务。
// mayInterruptIfRunning,是否中断线程
public boolean cancel(boolean mayInterruptIfRunning) {
// 取消任务的前提是FutureTask的状态为NEW
// 若状态符合取消条件,则根据mayInterruptIfRunning的值
// 将任务的装态改为INTERRUPTING、或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// 允许中断
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}
finally {
finishCompletion();
}
return true;
}
取消任务的前提是FutureTask的状态为NEW,即任务尚未开始运行。此方法返回后,再调用isDone()、isCancelled()则将返回true。
6.4、runAndReset()
runAndReset()方法执行计算而不设置其结果,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则不执行此操作。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 执行任务,注意这里没有设置运行结果
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
在前文提到的ScheduledThreadPoolExecutor执行周期行任务中就有对runAndReset()的应用。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
// 周期任务
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
网友评论