线程池终止主要依靠以下2个命令:
- shutdown()
- shutdownNow()
首先看一下shutdown方法:
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取独占锁
mainLock.lock();
try {
// 检查各worker是否可操作
checkShutdownAccess();
// 将线程池状态更新为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 响应shutdown操作,由ThreadPoolExecutor的继承类来具体实现
onShutdown();
} finally {
mainLock.unlock();
}
// 执行tryTerminate()操作
tryTerminate();
}
advanceRunState
private void advanceRunState(int targetState) {
// 阻塞执行
for (;;) {
int c = ctl.get();
// 1. 若当前线程池状态>=targetState,直接break
// 2. 将线程池的状态更新为targetState,并在ctl中保留当前的工作线程数,若成功,则直接break
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
执行完advanceRunState方法后,当前线程池状态肯定已>=targetState。
interruptIdleWorkers
该方法用于中断线程池中的空闲线程。
在前面系列文章中我们讲过,worker在执行任务前会先获取锁,执行完任务则释放锁,所以处于锁定状态的worker(state为1)为工作线程,而处于无锁状态的worker(state为0)为空闲线程。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
// 获取线程池独占锁
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 若当前worker持有的线程未被中断过,且获取worker锁成功,则执行线程中断操作
// 若获取worker锁不成功,证明该线程为工作线程,不执行线程中断操作
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 仅执行1次即退出
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
待线程池状态更新为shutdown,且所有空闲线程被中断,则执行onShutdown方法来响应shutdown操作。
上述过程执行完之后,执行tryTerminate()操作来终止线程池。
tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 出现下述3种情况,直接return:
// 1. 当前线程池处于running状态
// 2. 当前线程池处于tidying或terminated状态
// 3. 当前线程池处于shutdown状态,且workQueue不为空,此时只是拒绝提交新任务,但workQueue中的任务还需要继续执行完
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 代码走到这,说明为以下2种情况:
// 1. 当前线程池处于stop状态
// 2. 当前线程池处于shutdown状态,且workQueue为空
if (workerCountOf(c) != 0) {
// 中断唤醒1个正在等任务的空闲worker
interruptIdleWorkers(ONLY_ONE);
return;
}
// 此时,线程池状态为shutdown,workQueue为空,且正在运行的worker也没有了,开始terminated()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态更新为tidying
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 此方法由线程池子类来定义
terminated();
} finally {
// 将线程池状态更新为terminated
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
假设如下情景:
当发出shutdown命令时,线程池还有3个核心工作线程,由于interruptIdleWorkers()仅会中断空闲线程,所以这3个核心工作线程不会被中断。
上述3个核心工作线程执行完自身携带和阻塞队列中的任务后,会变为空闲核心线程。由于此时线程状态已变为shutdown,所以会从runWorker()的while循环中(线程池状态为shutdown时,getTask会返回null)跳出,执行finally代码块中的processWorkerExit方法。
而processWorkerExit又会执行tryTerminate方法。所以tryTerminate会不断循环传递调用,tryTerminate方法的interruptIdleWorkers(ONLY_ONE)看起来仅是中断一个空闲线程,但其实这个中断信号会引发一个"循环中断风暴",直到线程池中所有worker被中断。
不得不说,Doug Lea是真大神呀!!!这种设计真是巧妙到极点!!!
梳理完shutdown整个代码后可以发现,线程池状态的变化过程如下:
running-->shutdown-->tidying-->terminated。
执行完shutdown,线程池状态首先会更新为shutdown,然后中断所有空闲线程,当剩余工作线程执行完持有的任务,且将阻塞队列中的任务也执行完毕,变为空闲线程时,执行tryTerminate()操作将线程池状态更新为tidying,待线程池完成terminated()操作后,线程池状态最终变为terminated。
趁热打铁,接着看一下shutdownNow方法:
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取线程池独占锁
mainLock.lock();
try {
// 检查各worker是否可操作
checkShutdownAccess();
// 将线程池状态更新为STOP
advanceRunState(STOP);
// 尝试中断所有已启动的worker
interruptWorkers();
// 将阻塞队列中的任务清空
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 执行tryTerminate()操作
tryTerminate();
// 返回任务集合
return tasks;
}
interruptWorkers
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 中断已启动的线程
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
可以发现,中断worker时,会首先执行判断getState() >= 0,前面我们已经说过了,worker除了工作线程(state为1),就是空闲线程(state为0),那getState() >= 0恒成立呀,为啥多此一举呢?
其实worker刚初始化未启动前,其状态为-1,执行到runWorker时,会执行w.unlock()操作将状态修改为0,此时线程才可被中断。
方法如其名,interruptIfStarted仅能中断已启动的worker。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
drainQueue
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 首先通过阻塞队列的drainTo方法将队列中的Runnable转移到taskList中
q.drainTo(taskList);
// 如果阻塞队列是DelayQueue或者阻塞队列执行poll或drainTo操作失败,则需要通过遍历的方法完成Runnable转移操作
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
// 返回阻塞队列中的任务集合
return taskList;
}
综上,drainQueue主要完成2种操作:
- 清空阻塞队列中的元素;
- 将阻塞队列中的元素保存到List中返回。
梳理完shutdownNow整个代码后可以发现,线程池状态的变化过程如下:
running-->stop-->tidying-->terminated。
执行完shutdownNow,线程池状态首先会更新为stop,接着中断所有已启动worker,然后执行tryTerminate()操作将线程池状态更新为tidying,待线程池完成terminated()操作后,线程池状态最终变为terminated。
可以发现,对线程池执行shutdown或shutdownNow后,线程池状态均需要一系列的流程才能将线程池状态更新为terminated。
那如何确认当前线程池已处于terminated状态呢?
线程池为此提供了awaitTermination方法:
awaitTermination
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 将时间换算为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果线程池状态已更新为terminated,则直接返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 若nanos <= 0,则直接返回false
if (nanos <= 0)
return false;
// 阻塞等待nanos,并返回新的nanos
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
所以线程池终止的最佳实践是:
- 首先对线程池执行shutdown操作;
- 然后通过awaitTermination方法判断当前线程池状态是否为terminated。
网友评论