在上一篇文章 线程池源码-任务提交 中,我主要讲解了:
- 线程池初始化参数
- 线程池处理任务的整体流程
- worker 线程的创建
小伙伴们可能对线程池任务执行机制有疑问,今天我们就来探索以下几个问题:
- worker 线程的启动方式
- 线程池如何区分核心线程,非核心线程
- 线程池中的线程如何复用
worker 对象
前面提到,在任务提交之后,线程池可能会创建 worker 对象去处理该任务。
private boolean addWorker(Runnable firstTask, boolean core) {
// 省略代码
Worker w = null;
try {
// 1.创建 worker 对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 2.更新 worker 列表,代码省略
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 3.开启 worker 线程
t.start();
workerStarted = true;
}
}
} finally {
// 4.如果 worker 线程开启失败,则将该 worker 对象从列表中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
通过上面这段代码,可知:
- worker 对象内部持有一个 Thread 类型的成员变量 t
- 成功将 worker 更新到线程池 worker 列表之后,会启动它的线程 t
开启的这个线程,都做了些什么呢?接着往下。
来看一下 Worker 这个类的声明和构造方法 :
- Worker 类实现了 Runnable 接口,意味着它可以作为一个任务放到线程中执行的
- 在初始化成员变量 thread 时,把自身作为一个任务放到了 thread 中
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 省略代码
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
看到这也许大家都明白了,Worker 的成员变量 t 线程开启之后,又会反过来执行自身,这便是 worker 线程的启动方式。

worker 对象的 run() 方法中都做了什么,这是今天的重点——线程池中的线程如何复用。
线程复用
终于看到了期待已久的 while 循环,省略了大量代码,为了让流程看起来更清晰。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
try {
while (task != null || (task = getTask()) != null) {
// 省略代码
try {
// 执行任务前的操作
beforeExecute(wt, task);
// 执行提交的任务
task.run();
// 执行任务后的操作
afterExecute(task, thrown);
} finally {
// 将执行完的任务置空,方便后续线程处理任务队列中的任务
task = null;
}
}
} finally {
// 省略代码
}
}
可以看到在执行任务的前后都加入了 hook 操作,后面会讲到线程池提供给外部的可扩展接口,其中就包括 beforeExecute(Runnable r, Throwable t), afterExecute(Runnable r, Throwable t) 两个方法。
猜测 while 循环的条件是线程复用的关键,条件成立只需满足下面任意一种情况:
- 当前 task 不为空
- getTask() 方法返回的结果不为空
第一种情况容易理解,如果当前提交的任务涉及创建新的 worker 时,task 就不为空,并且作为这个 worker 的 firstTask 被执行。
第二种情况就需要看一下 getTask() 方法代码,发现控制线程「死活」的逻辑都在这里。
主要操作如下:
- 如果线程池状态 SHUTDOWN,并且任务队列为空,停止当前 worker 线程,线程池停止处理任务
- 如果线程池状态 >= STOP,停止当前 worker 线程,线程池停止处理任务
- 判断线程是否会受 keepAliveTime 超时时间的影响,从这里也可以看出,线程池是根据线程数量来定义核心线程的,如果线程数量 >= 核心线程数,则认为超出的线程是非核心线程
- allowCoreThreadTimeOut = true 意味着核心线程过了 keepAliveTime 也会被干掉
- 如果当前线程受 keepAliveTime 影响,则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法从队列中获取任务,如果超时,则返回 null,并将 timeOut 变量设为 true,那么在下一次 timeOut 时间判定时,就会认为该线程过了超时时间,可能会尝试将其停止
- 如果不受 keepAliveTime 影响,则直接阻塞式获取任务,直到队列中存在任务,则返回该任务,以此来达到线程的复用
流程图:

源码阅读笔记:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态 SHUTDOWN,并且任务队列为空,停止当前 worker 线程
// 如果线程池状态 >= STOP,停止当前 worker 线程,线程池停止处理任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程是否会受 keepAliveTime 超时时间的影响
// allowCoreThreadTimeOut = true 意味着核心线程过了 keepAliveTime 也会被干掉
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 再次判断是否可以停止当前线程,其中就包含 timed 条件判定
// 如果 timed 条件满足,意味着只要任务队列空了,或者有足够的线程应付队列中的任务,就会停止当前线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果 timed = true,就意味着当前线程会受 keepAliveTime,因此调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法从队列中获取任务,如果超时,则返回 null,并将 timeOut 变量设为 true,那么在下一次 timeOut 时间判定时,就会认为该线程过了超时时间,可能会尝试将其停止
// 如果 timed = false,则不管 keepAliveTime 时间,阻塞式获取任务,直到队列中存在任务,则返回该任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结
通过今天这篇文章,我们了解了:
- worker 线程的启动方式
- 线程池如何区分核心线程,非核心线程
- 线程池线程复用机制
到这里,相信大家对线程池又有了一个新的认识,如果觉得文章对你有帮助,欢迎留言点赞。
网友评论