问:谈谈你对 FutureTask 与线程池的理解?
答:FutureTask
是一个支持取消的异步处理器,一般在线程池中用于异步接受 callable
返回值。主要实现分三部分:
- 封装 callable,然后放到线程池中去异步执行->run。
- 获取结果->get。
- 取消任务->cancel。
一、FutureTask 在线程池中的使用
// 第一步,定义线程池,
ExecutorService executor = new ThreadPoolExecutor(
minPoolSize,
maxPollSize,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<>());
// 第二步,放到线程池中执行,返回FutureTask
FutureTask task = executor.submit(callable);
// 第三步,获取返回值
T data = task.get();
二、FutureTask 类属性
//以下是FutureTask的各种状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; //执行的任务
private Object outcome; //存储结果或者异常
private volatile Thread runner;//执行callable的线程
private volatile WaitNode waiters; //调用get方法等待获取结果的线程栈
其中各种状态存在 最终状态 status>COMPLETING
1)NEW -> COMPLETING -> NORMAL(有正常结果)
2) NEW -> COMPLETING -> EXCEPTIONAL(结果为异常)
3) NEW -> CANCELLED(无结果)
4) NEW -> INTERRUPTING -> INTERRUPTED(无结果)
三、FutureTask 类方法
3.1 run 方法
FutureTask 继承 runnable,ExecutorService submit 把提交的任务封装成 FutureTask 然后放到线程池 ThreadPoolExecutor 的 execute 执行。
public void run() {
//如果不是初始状态或者cas设置运行线程是当前线程不成功,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行callable任务 这里对异常进行了catch
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 封装异常到outcome
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
// 这里如果是中断中,设置成最终状态
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以上是 run 方法源码实现很简单,解析如下:
-
如果不是初始状态或者 cas 设置运行线程是当前线程不成功,直接返回,防止多个线程重复执行。
-
执行 callable 的
call()
,即提交执行任务(这里做了 catch,会捕获执行任务的异常封装到 outcome 中)。 -
如果成功执行 set 方法,封装结果。
3.2 set 方法
protected void set(V v) {
//cas方式设置成completing状态,防止多个线程同时处理
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 封装结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终设置成normal状态
finishCompletion();
}
}
解析如下:
-
cas 方式设置成 completing 状态,防止多个线程同时处理。
-
封装结果到 outcome,然后设置到最终状态 normal。
-
执行 finishCompletion 方法。
3.3 finishCompletion 方法
// state > COMPLETING; 不管异常,中断,还是执行完成,都需要执行该方法来唤醒调用get方法阻塞的线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// cas 设置waiters为null,防止多个线程执行。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 循环唤醒所有等待结果的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//该方法为空,可以被重写
done();
callable = null; // to reduce footprint
}
遍历 waiters 中的等待节点,并通过 LockSupport 唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能 cancel,异常等)。
3.4 get 方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
以上两个方法,原理一样,其中一个设置超时时间,支持最多阻塞多长时间。状态如果小于 COMPLETING,说明还没到最终状态,(不管是否是成功,还是异常,还是取消)调用 awaitDone
方法阻塞线程,最终调用 report
方法返回结果。
3.5 awaitDone 方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//线程可中断,如果当前阻塞获取结果线程执行interrupt()方法,则从队列中移除该节点,并抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果已经是最终状态,退出返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//这里做了个优化,competiting到最终状态时间很短,通过yield比挂起响应更快。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 初始化该阻塞节点
else if (q == null)
q = new WaitNode();
// cas方式写到阻塞waiters栈中
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 这里做阻塞时间处理。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞线程,有超时时间
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞线程
LockSupport.park(this);
}
}
整体流程已写到注解中,整体实现是放在一个死循环中,唯一出口,是达到最终状态。然后是构建节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。
3.6 report 方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
然后是 report 方法,如果是正常结束,返回结果,如果不是正常结束,(取消,中断)抛出异常。
3.7 cancel 方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
mayInterruptIfRunning
参数表示是否允许运行中被中断取消。
-
根据入参是否为 true,CAS 设置状态为 INTERRUPTING 或 CANCELLED,设置成功,继续第二步,否则直接返回 false。
-
如果允许运行中被中断取消,调用
runner.interupt()
进行中断取消,设置状态为 INTERRUPTED 唤醒所有在get()
方法等待的线程。此处有两种状态转换:如果mayInterruptIfRunning
为 true,status 状态转换为 new->INTERRUPTING->INTERRUPTED,主动去中断执行线程,然后唤醒所有等待结果的线程;如果mayInterruptIfRunning
为 false,status 状态转换为 new->CANCELLED,不会去中断执行线程,直接唤醒所有等待结果的线程,从awaitDone
方法中可以看到,唤醒等待线程后,直接跳转回 get 方法,然后把结果返回给获取结果的线程,当然此时的结果是 null。
以上就是 FutureTask 的源码简单解析,实现比较简单,FutureTask 是一个实现 Future 模式,支持取消的异步处理器。
网友评论