本文只介绍 ThreadPoolExecutor
源码的关键部分,开篇会先介绍 ThreadPoolExecutor
中的一些核心常量定义,然后选取线程池工作周期中的几个关键方法分析其源码实现。其实,看 JDK
源码的最好途径就是看类文件注释,作者把想说的全都写在里面了。
一些重要的常量
ThreadPoolExecutor
内部作者采用了一个 32bit
的 int
值来表示线程池的运行状态(runState)
和当前线程池中的线程数目(workerCount)
,这个变量取名叫 ctl
(control
的缩写),其中高 3bit
表示允许状态,低 29bit
表示线程数目(最多允许 2^29 - 1
个线程)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池最大容量
// runState is stored in the high-order bits
// 定义的线程池状态常量
// 111+29个0,值为 -4 + 2 + 1 = -1(不懂的面壁)
private static final int RUNNING = -1 << COUNT_BITS;
// 000+29个0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001+29个0
private static final int STOP = 1 << COUNT_BITS;
// 010+29个0
private static final int TIDYING = 2 << COUNT_BITS;
// 011+29个0
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 得到线程池线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 反向构造 ctl 的值
因为代表线程池状态的常量可以通过值的大小来表示先后关系(order)
,因此后续源码中会有:
rs >= SHUTDOWN // 那就表示SHUTDOWN、 STOP or TIDYING or TERMINATED,反正不是 RUNNING
理解上述的常量意义有助于后面理解源码。
讨论线程池的状态转换
从第一节我们已经知道了线程池分为五个状态,下面我们聊聊这五个状态分别限制了线程池能执行怎样的行为:
-
RUNNING:
可以接受新任务,且执行Queue
中的任务 -
SHUTDOWN:
不再接受新的任务,但能继续执行Queue
中已有的任务 -
STOP:
不再接受新的任务,且也不再执行Queue
中已有的任务 -
TIDYING:
所有任务完成,workCount=0
,线程池状态转为TIDYING
且会执行hook method
,即terminated()
-
TERMINATED:``hook method
terminated()
执行完毕之后进入的状态
线程池的关键逻辑
上图总结了 ThreadPoolExecutor
源码中的关键性步骤,正好对应我们此次解析的核心源码(上图出处见水印)。
-
execute
方法用来向线程池提交task
,这是用户使用线程池的第一步。如果线程池内未达到corePoolSize
则新建一个线程,将该task
设置为这个线程的firstTask
,然后加入workerSet
等待调度,这步需要获取全局锁mainLock
- 已达到
corePoolSize
后,将task
放入阻塞队列 - 若阻塞队列放不下,则新建新的线程来处理,这一步也需要获取全局锁
mainLock
- 当前线程池
workerCount
超出maxPoolSize
后用rejectHandler
来处理
我们可以看到,线程池的设计使得在 2
步骤时避免了使用全局锁,只需要塞进队列返回等待异步调度就可以,仅剩下 1
和 3
创建线程时需要获取全局锁,这有利于线程池的效率提升,因为一个线程池总是大部分时间在步骤 2
上,否则这线程池也没什么存在的意义。
源码分析
本文只分析 execute
,addWorker
,runWorker
,三个核心方法和一个 Worker
类,看懂了这几个,其实其他的代码都能看懂。
Worker 类
// 继承自 AQS 实现简单的锁控制
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// worker 运行所在的线程
final Thread thread;
// 赋予该线程的第一个 task,可能是 null,如果不是 null 就运行这个,
// 如果是 null 就通过 getTask 方法去 Queue 里取任务
Runnable firstTask;
// 线程完成的任务数量
volatile long completedTasks;
Worker(Runnable firstTask) {
// 限制线程直到 runWorker 方法前都不允许被打断
setState(-1);
this.firstTask = firstTask;
// 线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 线程内部的 run 方法调用了 runWorker 方法
runWorker(this);
}
}
execute 方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 调用 addWorker 方法新建线程,如果新建成功返回 true,那么 execute 方法结束
if (addWorker(command, true))
return;
// 这里意味着 addWorker 失败,向下执行,因为 addWorker 可能改变 ctl 的值,
// 所以这里重新获取下 ctl
c = ctl.get();
}
// 到这步要么是 corePoolSize 满了,要么是 addWorker 失败了
// 前者很好理解,后者为什么会失败呢?addWorker 中会讲
// 如果线程池状态为 RUNNING 且 task 插入 Queue 成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果已不处于 RUNNING 状态,那么删除已经入队的 task,然后执行拒绝策略
// 这里主要是担心并发场景下有别的线程改变了线程池状态,所以 double-check 下
if (! isRunning(recheck) && remove(command))
reject(command);
// 这个分支有点难以理解,意为如果当前 workerCount=0 的话就创建一个线程
// 那为什么方法开头的那个 addWorker(command, true) 会返回 false 呢,其实
// 这里有个场景就是 newCachedThreadPool,corePoolSize=0,maxPoolSize=MAX 的场景,
// 就会进到这个分支,以 maxPoolSize 为界创建临时线程,firstTask=null
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 这个分支很好理解,workQueue 满了那么要根据 maxPoolSize 创建线程了
// 如果没法创建说明 maxPoolSize 满了,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
addWorker 方法
// core 表示以 corePoolSize 还是 maxPoolSize 为界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 看看 addWorker 什么时候返回 false
// 这里的 if 逻辑有点难懂,用下数学上的分配率,将第一个逻辑表达式放进括号里就好懂了
// 1、rs >= SHUTDOWN && rs != SHUTDOWN 其实就表示当线程池状态是 STOP、TIDYING, 或 TERMINATED 的时候,当然不能添加 worker 了,任务都不执行了还想加 worker?
// 2、rs >= SHUTDOWN && firstTask != null 表示当提交一个非空任务,但线程池状态已经不是 RUNNING 的时候,当然也不能 addWorker,因为你最多只能执行完 Queue 中已有的任务
// 3、rs >= SHUTDOWN && workQueue.isEmpty() 如果 Queue 已经空了,那么不允许新增
// 需要注意的是,如果 rs=SHUTDOWN && firstTask=null 或者 rs=SHUTDOWN && workQueue 非空的情况下,还是可以新增 worker 的,需要创建临时线程处理 Queue 里的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 这里也是一个返回 false 的情况,但很简单,就是数目溢出了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 成功了,就跳出 loop
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 失败的话,check 下目前线程池状态,如果发生改变就回到外层 loop 再来一遍,这个也好理解,否则单纯 CAS 失败但是线程池状态不变的话,就只要继续内层 loop 就行了
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这是全局锁,必须持有才能进行 addWorker 操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环直至 task = null,可能是由于线程池关闭、等待超时等
while (task != null || (task = getTask()) != null) {
w.lock();
// 下面这个 if 逻辑没怎么读懂。。。翻译了下注释
// 如果线程池停止,确保线程中断;
// 如果没有,确保线程不中断。这需要在第二种情况下进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置钩子函数,可以自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 运行 run 方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 线程的 run 不允许抛出 Throwable,所以转换为 Error
thrown = x; throw new Error(x);
} finally {
// 后置钩子函数,也可以自定义
afterExecute(task, thrown);
}
} finally {
// 获取下一个任务
task = null;
// 增加完成的任务数目
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
总结
看完 ThreadPoolExecutor
的源码,不得不惊叹于代码写得真优雅,但是正因为写的太简洁优雅甚至找不到一句啰嗦的代码,所以让人有点难懂。看源码的建议是先仔细阅读一遍类注释,然后再配合 debug
,理清关键性的步骤在做什么,有些 corner case
夹杂在主逻辑里面,如果一开始看不懂可以直接略过,事后再来反思。
网友评论