线程池终止
线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。
shutdown()后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的 和 在阻塞队列中等待处理的任务。
shutdownNow()后线程池将变成stop状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程。
awaitTermination() 等待线程池终止
线程池的终止核心
1 变更线程池的状态。
2 修改线程池状态保证所有work线程能在getTask返回null,导致所有work线程退出
3 针对因为获取工作队列而等待的线程进行线程中断。(工作队列阻塞响应阻塞)
shutdown
/**
* 温柔的终止线程池
*/
public void shutdown() {
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 判断调用者是否有权限shutdown线程池 **/
checkShutdownAccess();
/** CAS+循环设置线程池状态为shutdown **/
advanceRunState(SHUTDOWN);
/** 找到断线程池中空闲的work,中断其工作线程 **/
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
/** 释放锁 **/
mainLock.unlock();
}
/** 尝试将线程池状态设置为Terminate **/
tryTerminate();
}
shutdownNow
/**
* 强硬的终止线程池
* 返回在队列中没有执行的任务
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 判断调用者是否有权限shutdown线程池 **/
checkShutdownAccess();
/** CAS+循环设置线程池状态为shutdown **/
advanceRunState(STOP);
/** 找到断线程池中空闲的work,中断其工作线程 **/
interruptWorkers();
tasks = drainQueue();
} finally {
/** 释放锁 **/
mainLock.unlock();
}
/** 尝试将线程池状态设置为Terminate **/
tryTerminate();
return tasks;
}
检查调用者是否有权限shutdown线程池
/**
* 检查调用者是否有权限shutdown线程池
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
CAS+循环设置线程池状态为shutdown
/**
* CAS+循环设置线程池状态为shutdown
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
找到断线程池中空闲的work,中断其工作线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
将workQueue中的元素放入一个List并返回
**
* 将workQueue中的元素放入一个List并返回
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
/** 将队列中的值全部从队列中移除,并赋值给对应集合 **/
q.drainTo(taskList);
/** 并发在判断 workQueue是否为空,将新添加加入到taskList**/
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
尝试将线程池状态设置为Terminate
/**
* 尝试将线程池状态设置为Terminate
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 判断线程池能否进入TERMINATED状态
* 如果以下3中情况任一为true,return,不进行终止
* 1、还在运行状态
* 2、状态是TIDYING、或 TERMINATED,已经终止过了
* 3、SHUTDOWN 且 workQueue不为空
* 4
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/** 线程池workQueue不为空 return,并中断workQueue其中一个work**/
/**
* 线程池为stop状态,且还存在work,中断唤醒一个正在等任务的空闲worker,
* 再次调用getTask(),线程池状态发生改变,返回null,work工作线程退出循环
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 将线程池状态设置为TIDYING **/
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
/** 释放子类实现 **/
terminated();
} finally {
/** 将线程池状态设置为TERMINATED **/
ctl.set(ctlOf(TERMINATED, 0));
/** 释放锁 **/
termination.signalAll();
}
return;
}
} finally {
/** 释放锁 **/
mainLock.unlock();
}
// else retry on failed CAS
}
}
网友评论