前言
本文是承接上文[Java源码][并发J.U.C]---解析线程池之ThreadPoolExecutor(1)的思路继续进行分析的, 主要会涉及到线程池的关闭等方法.
关闭线程池主要有两个方法分别为
shutdown()
和shutdownNow()
,他们的区别在于shutdown()
让线程池不接受新任务并且要处理队列中的任务, 然而shutdownNow()
让线程池不接受新任务也不处理队列中的任务,并且中断正在执行任务的线程.(只是名义上的中断,关于中断的话题可以参考我的另一篇博客. [并发J.U.C] 用例子理解线程中断)
辅助方法
在讨论这两个方法之前,我们先看看几个辅助方法.
interruptIdleWorkers(boolean onlyOne)
该方法代码比较容易理解, 此时也可以看到为什么需要将
Worker
包装成一个互斥不可重入锁了. 因为需要先获得锁然后才去中断此线程.
onlyOne
:true
只中断一个空闲线程,false
中断所有空闲线程.
作用: 中断空闲的线程(正在等待任务的线程也就是那些没有被锁住的线程).
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
advanceRunState(int targetState)
作用: 将
runState
转换为给定目标,或者如果已经至少已经给定目标则将其保留.
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptWorkers()
作用: 对所有的线程池中的线程发出中断.
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
tryTerminate()
作用: 尝试去
terminate
线程池. 主要是通过线程池的状态判断是否需要terminate
线程池, 如果条件满足,但是线程池中含有线程,则尝试对其线程发出中断, 反之更新线程池状态从TIDYING
到TERMINATED
.
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 从线程池的状态来进行分析
* 1. 如果线程池处于RUNNING状态 直接return 因为不需要terminate线程池
* 2. 如果线程池大于等于TIDYING状态(也就是TIDYING或者TERMINATION) 此时已经terminate过了所以直接返回
* -> 不是1或者2 就表明线程池状态只能是SHUTDOWN或者是STOP
* 如果是STOP 此时则需要terminate线程池 所以不能直接return
* 如果是SHUTDOWN
* a. 阻塞队列不为空,则不能terminate线程池,因为要处理阻塞队列的任务 所以不能直接return
* b. 阻塞队列为空, 则需要terminate线程池
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* 进入到这里有两种可能性
* 1. 线程池状态为STOP
* 2. 线程池状态为SHUTDOWN并且队列为空
*
* 如果workerCount大于0,则中断一个线程 进而引发的操作是
* 1. 运行getTask()中的某个线程会返回null
* 2. 进而该线程返回到runWorker中退出while循环
* 3. 进而进入到processWorkerExit方法中
* 4. processWorkerExit中会调用tryTerminate方法进而进入了一个循环效应
*
* 当线程池状态是SHUTDOWN或者STOP时会慢慢得到线程池的最终终止状态.
* 因为getTask()方法中数量会减1.
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
/**
* 进入到这里有两种可能性
* 1. 线程池状态为STOP 并且workerCount为0
* 2. 线程池状态为SHUTDOWN并且队列为空并且workerCount为0
*
* 在该两种情况下就可以把线程池的状态设置为TIDYING继而设置为TERMINATED
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/** 先设置成TIDYING状态 然后调用子类实现的terminated方法
* 最后设置成TERMINATED状态 并唤醒那些在等待线程池TERMINATED的线程 */
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
processWorkerExit(Worker w, boolean completedAbruptly)
作用: 处理因为某种原因退出的
Worker w
并且尝试终止线程池进而判断是否需要给线程池增加线程.
/** 只被runWorker调用 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 1. 如果异常退出runWorker,那么正常工作的worker线程数量需要减少1
* 2. 如果正常退出runWorker,表明getTask()==null,已经在getTask()返回null前减1了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
/** 做一个尝试终止线程池的操作 只是有益于早发现线程池的终止指令进而真正去终止线程池 */
tryTerminate();
/**
* 因为有一个线程因为异常或者正常退出,所以检查线程池是否需要增加worker线程
* 1. 如果线程池状态>=STOP 则肯定不需要添加worker了
* 2. 线程池只能是RUNNING或者SHUTDOWN状态的时候来检查
* 2.1 如果是异常退出,则直接增加一个worker进入到线程池中
* 2.2 如果是正常退出,则判断一下是否需要增加worker到线程池中
*/
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
关闭线程池
shutdown()
作用: 将线程池状态调整为
SHUTDOWN
,进而中断所有空闲线程并尝试终止线程池.
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/** 判断调用者是否有权限shutdown线程池 */
checkShutdownAccess();
/** 使用CAS将线程池状态设置为SHUTDOWN,此时线程池将不会接受新任务不处理队列任务 */
advanceRunState(SHUTDOWN);
/** 中断所有空闲线程 */
interruptIdleWorkers();
/** hook for ScheduledThreadPoolExecutor 暂时无实现 */
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
/** 尝试终止线程池 */
tryTerminate();
}
shutdownNow()
作用: 将线程池状态调整为
STOP
,进而中断线程池中所有线程并尝试终止线程池. 最终返回所有未处理的任务.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/** 判断调用者是否有权限shutdown线程池 */
checkShutdownAccess();
/** 使用CAS将线程池状态设置为STOP,此时线程池将不会接受新任务不处理队列任务并且中断所有线程 */
advanceRunState(STOP);
/** 尝试中断线程池中所有线程 */
interruptWorkers();
/** 取出队列中所有的任务 */
tasks = drainQueue();
} finally {
mainLock.unlock();
}
/** 尝试终止线程池 */
tryTerminate();
/** 返回所有未处理的任务 */
return tasks;
}
参考
1.
java 1.8
源码
网友评论