二十、FutureTask 原理
阿里巴巴长期招聘Java研发工程师p6,p7,p8等上不封顶级别,有意向的可以发简历给我,注明想去的部门和工作地点:1064454834@qq.com
20.1 一个例子
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(1000);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
return sum;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
executor.shutdown();
}
如上代码主线程会在futureTask.get()出阻塞直到task任务执行完毕,并且会返回结果。
20.1 原理
先看下类图结构
data:image/s3,"s3://crabby-images/ea88d/ea88dfd585c8780a7d0d02446ad127503ac36e54" alt=""
FutureTask 内部有一个state用来展示任务的状态,并且是volatile修饰的:
/** Possible state transitions:
* NEW -> COMPLETING -> NORMAL 正常的状态转移
* NEW -> COMPLETING -> EXCEPTIONAL 异常
* NEW -> CANCELLED 取消
* NEW -> INTERRUPTING -> INTERRUPTED 中断
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
其中构造FutureTask实例时候状态为new
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
把FutureTask提交到线程池或者线程执行start时候会调用run方法:
public void run() {
//如果当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程可以成功。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//当前状态为new 则调用任务的call方法执行任务
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移
}
//执行任务成功则保存结果更新状态,unpark所有等待线程。
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
//状态从new->COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//状态从COMPLETING-》NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//unpark所有等待线程。
finishCompletion();
}
}
任务提交后,会调用 get方法获取结果,这个get方法是阻塞的。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果当前状态是new或者COMPLETING则等待,因为位normal或者exceptional时候才说明数据计算完成了。
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果被中断,则抛异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//组建单列表
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
//超时则返回
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//否者设置park超时时间
LockSupport.parkNanos(this, nanos);
}
else
//直接挂起当前线程
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
在submit任务后还可以调用futuretask的cancel来取消任务:
public boolean cancel(boolean mayInterruptIfRunning) {
//只有任务是new的才能取消
if (state != NEW)
return false;
//运行时允许中断
if (mayInterruptIfRunning) {
//完成new->INTERRUPTING
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
//完成INTERRUPTING->INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
//不允许中断则直接new->CANCELLED
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
欢迎关注微信公众号:技术原始积累 获取更多技术干货_
data:image/s3,"s3://crabby-images/343a8/343a8f9a72604c73ffcdbd3956061da2839b9ed3" alt=""
网友评论