概述
引入
在上面讲CountDownLatch
时我们介绍了几种方法实现线程的先后执行,这种存在是很必要的,很多时候我们在执行某些操作前需要预处理。但是很多时候我们不仅需要子线程做一些处理,主线程还需要子线程处理的结果进行后续操作。于是我们就有了Callable
接口,他和Runnable
接口一样,实现此接口的类可以被用来创建一个Thread
并在其中被执行。我们为了方便的取回Callable
计算出的值,我们又有了Future
接口,专门负责协调主线程【一条或者多条线程,可以在多个线程中从一个子线程中同时取值】和子线程的执行,以达到主线程的等待效果。
我们本文介绍的FutureTask
实现了上面的Future
、Callable
接口,实现了上面我们的需求:主线程获得子线程执行结果。
摘要
本文内容如下:
- 介绍了
FutureTask
的实现原理 - 介绍了
FutureTask
的用法 - 对
FutureTask
的工作过程中的具体机理进行了概述
类介绍
类定位
FutureTask
实现了Future
、Callable
两个接口。它实现的功能罗列如下:
- 完成了以
Runnable/Callable
为入参,经过内部机制重新包装成自己的实例的统一化过程。【自己也实现了Runnable
,可以被执行】 - 完成了协调主线程对子线程的等待的功能
- 完成了对多个等待线程的协调功能
- 完整了取消、查询状态等基础功能
注意
FutureTask
实现的功能中并不包括启动一个子线程执行计算,需要根据项目环境依赖线程池或者自己创建一个子线程完成操作。
源码解读
对Runnable/Callable
的统一包装
数据保存
/** 要调用的逻辑,不管入参是 Callable 还是 Runnable 都最终封装成这个*/
private Callable<V> callable;
/**
* callable 执行的结果,get()方法根据状态直接从这里取值返回。
* 这里保存的或者是 V 类型的返回值、或者是抛出的异常。
* 在取用处理时要结合状态值
**/
private Object outcome;
/** 执行逻辑的子线程,在调用完成后将此值置空 */
private volatile Thread runner;
入参转化
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
其实即使入参是Runnable
,转化也很简单,只是将result
记录下来,在调用run()
完成后即使返回即可。
注意
我们介绍的里面很容明白:
-
callable
用来存储执行的逻辑 -
outcome
用来存储返回的结果 -
runner
用来记录执行任务的子线程
其中,因为我们初始化FutureTask
是在主线程中,为了记录下runner
,需要我们在FutureTask.run()
方法中对runner
进行赋值。毕竟执行run()
方法的才是子线程,才能通过Thread.currentThread()
获得目标线程并进行保存。
根据使用场景:
上面三个变量严格意义上都是只有一个线程来修改的,即使可能多个线程同时读取,但是是在我们修改结束后才读,所以不需要进行加锁和volatile
标注。
协调等待
总览
我们在协调等待中主要依据两个主题介绍:一个是子线程的执行及结果赋值操作,一个是主线程的等待及取值操作。
子线程的执行及结果赋值主要的实现原理其实就是通过Thread.start()
调用Runnable.run()
方法执行其中的逻辑,FutureTask.run()
对callable
进行了执行及对一些执行状态进行了封装、对一些执行结果进行了保存。执行结束后唤醒阻塞的栈中的线程让其进行取值。
主线程的等待及取值操作主要是对state
进行检验,直到其指向结束返回,否则进栈阻塞等待。【考虑到多个线程取值,所以用的栈】
基础数据结构
/**
* 任务的执行状态
*
* 使用 volatile 标注的原因:
* 1. 等待的主线程和执行任务的子线程共享 state ,要对其进行不断的访问以进行状态,不能让编译器
* 修改 state 相关操作顺序
* 2. 修改 state 值使用了 Unsafe.putOrderedInt ,这个要求变量为 volatile
*
* state 标识了任务执行的状态,在创建 FutureTask 时状态为 NEW ,在执行完成但是还没有对结果赋值
* 保存时状态为 COMPLETING 。在有线程调用取消时,正在打断时状态为 INTERRUPTING ,其余的都是结束
* 状态。状态流转如下:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
**/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 调用了 get() 线程的阻塞的栈 */
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
子线程方面:执行
介绍
主要介绍FutureTask
对run()
的实现,以及任务完成后的后续处理操作,如:结果保存、状态流转、主线程唤醒等等。
源码
任务逻辑操作
public void run() {
// 如果
// 状态不是初始状态【被提前取消了】
// runner 不是空,是不是有多个 Thread 借用这个实例并启动了,拒绝重复操作
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 调用 caller.call()
// 正常结束将结果保存
// 抛出异常就将异常保存
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING) // 执行过程中调用了取消函数
handlePossibleCancellationInterrupt(s);
}
}
/**
* 保存调用结果,并修改状态值
*
* 保存结束后调用 finishCompletion() 通知等待线程
**/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* 保存调用抛出的 Exception,并修改状态值
*
* 保存结束后调用 finishCompletion() 通知等待线程
**/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
后续处理操作
/**
* 唤醒栈中所有的等待线程
*
**/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 直接把栈顶用一个临时变量记录下来然后去掉栈顶的引用
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 不断唤醒栈中有意义的等待节点
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 线程结束后的额外操作,提供此方法以方便用户定制
done();
callable = null; // 防止被重复调用子线程逻辑
}
/**
* 子线程执行结束并唤醒所有等待线程后执行的方法
* 专门用来方便开发者根据环境定制
*
**/
protected void done() { }
private void handlePossibleCancellationInterrupt(int s) {
// 相应中断,放弃CPU轮转
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
// 我们假定我们放弃时间片等待之后 state == INTERRUPTED;
}
主线程方面:等待及取值
介绍
主要介绍FutureTask
提供的获得子线程操作结果的get()
方法。
源码
/**
* 检测状态,如果还未完成就调用 awaitDone()等待。如果是完成状态,直接调用 report() 返回结果
*
**/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* 检测状态,限时,没得说
*
**/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
* 返回结果,根据状态看是应该返回 V 还是抛出取消的异常,还是抛出线程中自己抛出的异常
**/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/**
* 等待子线程完成任务。等待的思路很简单:
* 1. 如果中断了或者有限时且超时了就停止等待
* 2. 如果快完成了 COMPLETING ,就放弃时间片等一下再来检查
* 3. 进队、阻塞,等待被唤醒
*
* 这里有个问题:
* “中断了或者有限时且超时了就停止等待”会专门调用方法遍历链表并删除多余节点,以方便垃圾回收,中断
* 超时可能会存在大量的废弃节点,在大型应用场景中如果不即使处理,会造成内存的浪费。
*
* “进队、阻塞,等待被唤醒”,如果进队后还没来得及阻塞发现ok了,那不会对节点进行删除,原因很简单,
* 因为状态ok了,马上要进行整个队列的所有节点的唤醒、销毁,这种工作就直接委托
* 给了 finishCompletion() 方法。
*
*
**/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
// 其实就是一个 CAS 的进栈操作
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**
* 删除废弃的节点
**/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
取消及查询结果功能
很简单,直接上源码得了。
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
使用示例
自己创建子线程并使用
/**
* @author lipengcheng3 Created date 2019-02-13 10:05
*/
public class FutureLearn {
static class TestCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum=0;
for (int i=0;i<10;i++){
Thread.sleep(100);
sum += i;
}
return sum;
}
}
public static void main(String[] args){
FutureTask<Integer> t1 = new FutureTask<>(new TestCallable());
new Thread(t1).start();
try {
System.out.println(t1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
借助线程池
/**
* @author lipengcheng3 Created date 2019-02-13 10:05
*/
public class FutureLearn1 {
static class TestCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum=0;
for (int i=0;i<10;i++){
Thread.sleep(100);
sum += i;
}
return sum;
}
}
public static void main(String[] args){
ExecutorService es = Executors.newCachedThreadPool();
FutureTask<Integer> t1 = new FutureTask<>(new TestCallable());
es.submit(t1);
try {
System.out.println(t1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}
}
总结
唯一的区别就是生成调用FutureTask
的子线程的方法,数量少就自己new
,数量多尽量用线程池,方便线程的充分利用,减少频繁进行线程创建回收引起的开销。
当然,你也可以自行继承FutureTask
并对钩子done()
进行实现以实现根据自己的调用环境定制FutureTask
。
问题
用了volatile
,为什么呢?等后续解答。
网友评论