前言
本文重点分析了ThreadPoolExecutor两个方法execute() 和 submit() 的执行原理,并说明Future如何实现阻塞返回。
继承关系图
关键方法介绍
构造方法
/**
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 临时线程保留时间
* @param unit 临时线程保留时间单位
* @param workQueue 阻塞队列
* @param threadFactory 线程工程
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
为了方便区分,本文会将超过核心线程数创建的线程叫临时线程,本质上这两类线程没有任何区别,到期回收哪个线程完全是跟当时线程池哪个线程先被空闲有关,跟创建时间的先后无关
execute(Runnable command)
默认参数
先介绍主要方法实现前,先说明一些静态变量的含义和值。
ctl 官方给出的注释是The main pool control state,这个值包含了两部分,workerCount和runState。
int COUNT_BITS = Integer.SIZE - 3 = 29; 一共32位,高3位表示线程池的运行状态,低29位表示线程池中的线程数量。是一种高低位的实现。
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。
int CAPACITY = (1 << COUNT_BITS) - 1 = 536870912;也就是从的线程容量是536870912个。
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 都是用高3位表示不同的含义。低29位都是0
具体值参考下表:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始值 RUNNING | 0 = -536870912 , 1110 0000 + 24位0
private static final int COUNT_BITS = Integer.SIZE - 3; //29 高3位表示状态 低29表示线程数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //536870912 0001 1111 + 24位1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // -536870912 1110 0000 + 24位0
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0 0000 0000 + 24位0
private static final int STOP = 1 << COUNT_BITS; // 536870912 0010 0000 + 24位0
private static final int TIDYING = 2 << COUNT_BITS; // 1073741824 0100 0000 + 24位0
private static final int TERMINATED = 3 << COUNT_BITS; // 1610612736 0110 0000 + 24位0
// Packing and unpacking ctl
// 如果c是默认值-536870912,
// runStateOf = (-536870912 & ~29) = -536870912,
// workerCountOf = (-536870912 & 29) = 0
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
源码分析
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//c = -536870912
int c = ctl.get();
// workerCountOf(c) = 0
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
首先,这个execute有三个主要的if判断:
//判断当前线程池中的线程数量有没有到核心线程数,没有就创建新的worker来处理任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//执行到此处,说明此时线程池的线程数已经超过了coolPoolSize。先判断线程池状态,且尝试将任务添加到阻塞队列里。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 最后意味着此时阻塞队列已满,尝试创建新的worker来处理,不能创建则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
addWorker()
很长的一个方法,注释就不贴了,两个参数分别是当前要执行的任务和core(表示要创建的是核心线程还是临时线程)。
这里的worker是真正负责处理任务的对象,worker内部封装了所属线程和待执行的任务.
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
接下来主要看addWorker方法的实现。
private boolean addWorker(Runnable firstTask, boolean core) {
//第一部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//第二部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
第一部分:
rs表示是线程池的状态,先校验线程池状态和队列数量。前文已经提过,RUNNING的值是负数,SHOTDOWN是0,其他值都是正数。
之后是for循环,判断容量和是否超过了预设的线程数量。
如果成功增加了workerCount的值就跳出循环,开始执行任务。
如果失败,说明有并发情况,就重新获取ctl,判断rs状态是否变了,从而决定是重新执行一遍大或小循环。
for循环结束后,说明当前可以增加worker对象。此时就真正创建对象开始执行任务。
第二部分:
在创建worker对象时,构造方法中也创建了一个Thread。并通过lock来保证原子性,校验状态之后将worker对象add到HashSet中。
private final HashSet<Worker> workers = new HashSet<Worker>();
添加后,释放锁并start线程。
如果在addWorker过程中失败,且第一阶段顺利完成,就从hashSet中移除,并减少workerCount。
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
如果添加任务顺利,则在t.start();执行完成后,主要任务就完成了并返回true。此时线程会执行worker对象内的run方法。
worker内 run()
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
runWorker真正执行,这个this只得是worker对象,task和线程都已经封装到worker内了。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池已经是STOP或TIDYING或TERMINATED,需要将线程也主动中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里说明一些最核心的逻辑
执行过程:
- 在while中判断当前的task和队列中的task,如果当前task != null,说明是线程是伴随着任务一起创建的,直接调用task.run来执行。
- 第一圈执行完成后,task=null,第二次执行while时,需要从getTask中取task来执行。
- 当getTask() 返回null时,while结束,设置completedAbruptly = false;表明任务时正常结束。最后调用processWorkerExit来退出线程。
这里提供了两个方法:beforeExecute 和 afterExecute,task.run()的切面,我们可以定义worker的子类,来实现扩展,比如加入一些监控等。
getTask() 返回null就代表着线程可以正常结束,那么什么情况下会返回null?
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask() 的主要任务是从阻塞队列中获取task。通过判断当前的wc 是否超过了核心线程数,来决定poll还是take来取任务。
如果超过了,说明此时已经创建过了临时线程,临时线程的有效期就是等待从队列返回的时间,超过这个时间没有取到,则设置timeOut表示已经超时,在下一次for循环的if判断中,返回null,让这个临时线程自动结束。
如果没超过,说明此时还处在核心线程的阶段,可以take长期等待。
至此,run方法的执行过程就此完成。
任务是如何添加到队列中的,还得回到execute方法。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
如果已经达到核心线程数,就不能在继续addWorker,而是要offer到workQueue中,并再次检查线程池状态。
如果offer失败,说明阻塞队列已满,此时需要继续创建新的worker来完成任务。
else if (!addWorker(command, false))
reject(command);
这里的false代表 创建时和最大线程数进行比较,如果超过了最大线程数,则调用reject来执行拒绝策略。
reject()
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
4种默认的拒绝策略
AbortPolicy : 直接抛出异常(默认策略)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy : 什么也不处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy : 把当前最早在队列的任务丢弃,并将再次执行此任务(可能会直接执行,也可能被加到队列中)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
CallerRunsPolicy : 由当前线程来直接执行run,不再交给线程池。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
submit 源码分析
submit()
Future<?> future = Executors.newCachedThreadPool().submit(new Thread());
...
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit方法可用于带返回值的任务执行。可以返回Future来获取线程的执行结果,具体的实现定义在AbstractExecutorService中。
首先创建了一个FutureTask对象,传入了要执行的任务。把封装后的FutureTask交给execute来执行。
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable 要执行的任务
* @param 返回的默认值
* @param <T> the type of the given value
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask
继承关系图和构造方法
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// FutureTask 可能的状态列表
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 执行的任务 */
private Callable<V> callable;
/** get() 的返回值,即最终的执行结果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
// 单项列表的node
private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask 既然将任务封装到了callable属性中,且它自身还是一个Runnable,那么真正执行一定在run方法中。而get() 是一个阻塞方法,当执行完成后,可以获取返回值,否则就等待。
那重点看下run() 和 get() 的实现。
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
state 有多种状态,用来标记当前任务的执行情况,如果已经是完成状态,通过report方法直接返回outcome即可。
如果还未到达完成态,就说明当前任务还在执行,此时需要await等待,也就是awaitDone。
awaitDone()
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
awaitDone的两个参数分别用于表示是否有等待时间,以及等待时间的纳秒数。
如果有等待时间,deadline就是截止时间。
下面则是主要逻辑:
一般来说,这里的for循环会执行3圈,(不考虑已经执行完成和中断的情况)。
- 第一圈:因为WaitNode q 最初被赋值为null,在run执行完之前,state是NEW,所以for循环会执行q=null的逻辑,先创建一个WaitNode对象。
- 第二圈:因为q此时有值,但queued是false,此时for循环执行! queued的逻辑,如果设置成功,则queued = true。
- 第三圈:LockSupport.park(this); (如果有deadline,就判断是否超时了)此时线程进入阻塞状态等待唤醒。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
//背景:
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
重点就是这一句。
这一句是做了两个事情:
- 构建waiters的Node单向链表
- 如果添加队列成功就返回true。
这里为什么要构建单向链表?
一般来说,一个task通过一个get()方法等待获取就OK了,是一个单任务。但如果,同一个FutureTask的get() 方法被多个线程调用时,多个线程(可能)会同时处于阻塞状态,这时就需要一个存储介质来存储这些等待线程,这里是通过单链表来实现。
构建单向链表的过程如下:
- 第一次调用get():
当前waiters = null;q.next = waiters(null); waiters = q; 即waiters的头节点是q,q.next是null。 - 第二次调用get(); 如果当前的任务命名为p;
当前waiters = q; p.next = waiter(q); waiters = p; 即构建了一个 p -> q的链表结构,waiters是头节点p。 - 第三次调用get(); 如果当前的任务命名为r;
最后的效果是 r -> p -> q; 可以看出来是头插法。
run()
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
run() 比较简单,如果当前FutureTask是NEW的状态,就调用callable.call(),将执行完成的result通过set方法设置到outcome中。
且无论成功失败,都将runner线程置为null,并判断执行过程中是否被其他线程中断,如果因为中断而失败,则此线程一直交出时间片,直到状态从INTERRUPTING变成INTERRUPTED。
如果成功执行且没有被中断过,则通过set方法进行返回值的设置。
set()
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
先判断此时状态是NEW,则改成COMPLETING,设置outcome后,状态改成NORMAL(完成态),调用finishCompletion来唤醒等待中的线程。
finishCompletion()
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// done是一个空方法,给子类重写用。
done();
callable = null; // to reduce footprint
}
这个方法比较简单,可以看到就是在遍历waiters单链表,依次唤醒内部的阻塞线程。(阻塞的发起点是get方法)。
总结
execute()
实现思想:
- task因为交由线程池来执行,线程池的线程直接调用task中的run,而不是执行task.start()。
- 如果当前线程池中的线程数 < corePoolSize ,就创建新的线程添加到线程池中(HashSet存储)。
- 如果当前的线程数 > corePoolSize 就先存放到阻塞队列里
- 如果阻塞队列已满,且 < maximumPoolSize,就创建新的线程添加到线程池中(HashSet存储),当keepAliveTime的时间没有处理任务,则销毁(也就是让run方法结束)。
- 如果已经超过maximumPoolSize,则根据拒绝策略执行。
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
submit()
实现思想:
任务执行的思想还是execute,阻塞等待返回值的思想是通过Future完成。实现类是FutureTask。
- get()返回值时如果还未完成,将当前线程封装成WaiterNode,进行LockSupport.park,并将所有park的线程按照头插法构建一个单向链表。
- run() 执行完成后,将内部的outcome属性设置成当前FutureTask的返回值,并unpark单链表中的所有阻塞线程,这些线程的get()会直接返回outcome的值。
网友评论