## 前置说明
```
所有的源码基于JDK11.0.2
```
## 如何使用线程池呢?
```java
public class WeChatBlogDemos {
@Test
public void useThreadPool() throws InterruptedException {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 往线程池提交任务,等待异步执行
executorService.submit(() -> System.out.println("hello world"));
// 关闭线程池
executorService.shutdown();
// 阻塞 2 秒等待线程池终止完成
executorService.awaitTermination(2,TimeUnit.SECONDS);
}
}
```
## 为什么使用线程池呢?
```
1)每个Java线程都会对应一个操作系统的工作线程,频繁的创建和销毁线程会有很大的开销,线程池能提高线程的复用率,避免线程频繁的创建和销毁,
2)线程池空闲时能自动缩小容量,防止消耗过多的系统资源,避免资源浪费。
```
## 当我往线程池里提交一个任务时,发生了什么【ThreadPoolExecutor.execute】?
```
1)外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程,任务提交失败
2)线程池处于 RUNNING 状态
1)线程池的工作线程数 < 核心线程数,则新增工作者线程进行任务处理
2)线程池的工作线程数 = 核心线程数,往任务队列里提交成功
3)核心线程数 <= 线程池的工作线程数 < 最大线程数 && 往任务队列里提交失败,则新增工作者线程进行任务处理
4)线程池的工作线程数 = 最大线程数 && 往任务队列里提交失败,通过拒绝策略处理任务
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 用于创建工作者线程的线程工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 线程池饱和或关闭时,用于拒绝任务的拒绝执行处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲工作线程等待任务的最大纳秒数
*/
private volatile long keepAliveTime;
/**
* true:核心工作者线程空闲时也会推出
* false:核心工作者线程空闲时不退出
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心工作线程数,限制数量为2^29-1
*/
private volatile int corePoolSize;
/**
* 最大工作线程数,限制数量为2^29-1
*/
private volatile int maximumPoolSize;
/**
* 默认的拒绝执行处理器
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* 核心工作者线程数
*/
private volatile int corePoolSize;
/**
* 最大工作者线程数
*/
private volatile int maximumPoolSize;
/**
* 任务队列,
* 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务
* 2)否则使用 workQueue.take() 读取任务
*/
private final BlockingQueue<Runnable>workQueue;
/**
* 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的工作者线程集合,只有在持有 mainLock 锁时才能访问
*/
private final HashSet<Worker> workers = new HashSet<>();
/**
* 跟踪线程池同时存在的最大工作线程数
* Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* 往线程池提交一个 Runnable 任务,
* 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。
*/
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 读取控制变量
int c = ctl.get();
// 1)线程池工作线程数 < 核心线程数
if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) {
// 尝试创建一个新的工作者线程来处理这个任务
if (addWorker(command, true)) {
// 创建成功则直接返回
return;
}
// 创建失败,则重新读取控制变量
c = ctl.get();
}
/**
* 大前提:
* 1)外部触发了线程池停止
* 2)工作者线程本身创建失败了
* 3)当前工作者线程数 >= 核心工作者线程
*
* 线程池处于 RUNNING 状态 && 尝试向任务队列中提交任务
*/
if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) {
final int recheck = ctl.get();
/**
* 任务成功提交到任务队列
* 1)如果线程池正在停止,则尝试帮助终止线程池,并将任务从工作队列中移除
* 2)线程池处于 RUNNING 状态,但是没有可用的工作者线程了,则尝试添加一个新的工作者线程
*/
if (!ThreadPoolExecutor.isRunning(recheck) && remove(command)) {
// 执行拒绝处理器
reject(command);
} else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) {
// 尝试添加一个新的工作者线程
addWorker(null, false);
}
}
/**
* 大前提:
* 1)外部触发了线程池停止
* 2)线程池处于 RUNNING 状态 && 当前工作者线程数 >= 核心工作者线程 && 任务往队列中提交失败了【如队列已满】
*
* 尝试新增一个工作者来处理任务
*/
else if (!addWorker(command, false)) {
/**
* 最大可能性
* 1)外部触发了线程池停止
* 2)线程池处于 RUNNING 状态 && 工作者线程数>= maximumPoolSize
* 执行拒绝策略
*/
reject(command);
}
}
/**
* 读取线程池的工作线程数
*/
private static int workerCountOf(int c) { return c & ThreadPoolExecutor.COUNT_MASK; }
/**
* 尝试增加一个核心工作者线程来处理这个任务
* 什么时候会新增失败?
* 1)线程池状态在 STOP 及以上
* 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空
* 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空
* 4)core=true:工作者线程数 >= 核心线程数【corePoolSize】
* 5)core=false: 工作者线程数 >= 最大线程数【maximumPoolSize】
* 6)线程池本身原因工作者线程启动失败
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get(); ; ) {
/**
* 外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程
* 1)线程池状态在 STOP 及以上
* 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空
* 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空
*/
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)
&& (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP)
|| firstTask != null
|| workQueue.isEmpty())) {
// 不允许创建新的工作者线程
return false;
}
for (; ; ) {
/**
* 1)工作者线程数已经 >= 核心线程数【任务队列未满时】
* 2)工作者线程数已经 >= 最大线程数【任务队列已满时】
*/
if (ThreadPoolExecutor.workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) {
// 不允许创建新的工作者线程
return false;
}
// 尝试递增工作者线程数
if (compareAndIncrementWorkerCount(c)) {
// 工作者线程数递增成功,退出循环
break retry;
}
// 由于并发问题,其他线程优先递增了计数值,则重新读取计数值并重试
c = ctl.get();
// 线程池正在关闭,则重新进入循环后将直接退出
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)) {
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
// 1)阶段一:工作者线程是否已经添加到 workers 集合中
boolean workerAdded = false;
// 2)阶段二:工作者线程是否成功启动
boolean workerStarted = false;
Worker w = null;
try {
// 创建工作者线程
w = new Worker(firstTask);
// 读取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 占用锁
mainLock.lock();
try {
/**
* Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired.
* 读取控制变量再次进行校验
*/
final int c = ctl.get();
/**
* 1)线程池处于 RUNNING 状态
* 2)线程池处于 SHUTDOWN 状态 && 提交任务为null
*/
if (ThreadPoolExecutor.isRunning(c)||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) {
// 检测工作者线程是否异常启动了
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
// 将工作者线程添加到集合中
workers.add(w);
// 尝试记录最大并发工作者线程数
final int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
// 工作者线程添加到 workers 成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,则启动工作者线程
if (workerAdded) {
t.start();
// 工作者线程启动成功
workerStarted = true;
}
}
} finally {
// 如果工作者线程启动失败,则进行回退和清理
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
// 运行状态 c 大于等于指定状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 尝试原子的将工作者线程数+1
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 线程池是否在运行
*/
private static boolean isRunning(int c) {
return c < ThreadPoolExecutor.SHUTDOWN;
}
// 运行状态 c 小于指定状态s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1)从 workers 集合中移除工作者w
if (w != null) {
workers.remove(w);
}
// 递减总工作者线程数
decrementWorkerCount();
// 尝试停止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* 将工作者线程总数递减1
*/
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
/**
* 将目标任务从队列中移除,并返回移除结果
*/
public boolean remove(Runnable task) {
final boolean removed = workQueue.remove(task);
// 尝试终止线程池
tryTerminate(); // In case SHUTDOWN and now empty
// 返回移除结果
return removed;
}
/**
* 使用指定的拒绝执行处理器来处理该任务
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
}
```
## 工作者线程是如何工作的呢?
```
1)如果初始化任务不为空,则先执行它
2)从任务队列中循环拉取任务
1)允许当前工作者线程超时退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务
2)当前工作者线程数 <= corePoolSize 并且不允许超时退出:通过 workQueue.take() 阻塞读取任务
3)如果拉取到了任务,则执行它,并循环步骤2
4)任务拉取失败、阻塞时被中断、任务执行一次,则执行工作者退出流程。
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 工作者线程的核心循环,重复的从任务队列中读取任务并执行。
*/
final void runWorker(Worker w) {
// 读取当前线程
final Thread wt = Thread.currentThread();
// 读取第一个任务
Runnable task = w.firstTask;
// 清理
w.firstTask = null;
w.unlock(); // 允许中断
/**
* 是否异常退出
* 1)前置钩子函数抛出异常
* 2)任务执行时抛出异常
* 3)后置钩子函数抛出异常
*/
boolean completedAbruptly = true;
try {
// 1)尝试从工作队列中读取任务
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* If pool is stopping, ensure thread is interrupted;
* if not, ensure thread is not interrupted.
* This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
*/
if ((ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP)||
Thread.interrupted()&&
ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP))&&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
/**
* 线程池钩子函数,在每个任务执行之前触发
*/
beforeExecute(wt, task);
try {
task.run();
/**
* 线程池钩子函数,在每个任务执行之后或执行异常时触发
*/
afterExecute(task, null);
} catch (final Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 将当前任务置空
task = null;
// 递增累积完成任务数,包括正常完成和异常完成
w.completedTasks++;
w.unlock();
}
}
// 标记是正常完成任务
completedAbruptly = false;
} finally {
/**
* 1)completedAbruptly=false:工作线程拉取不到任务正常退出
* 2)completedAbruptly=true:工作线程执行任务时异常退出,包括前置钩子、核心 run 方法、后置钩子
*/
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
// 上次拉取任务超时了吗?
boolean timedOut = false;
for (; ; ) {
// 读取控制变量
final int c = ctl.get();
/**
* 1)线程池正在停止,状态>= STOP
* 2)线程池状态为 SHUTDOWN,并且任务队列为空
*/
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)
&& (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/**
* 1)线程池处于 RUNNING 状态
* 2)线程池处于 SHUTDOWN 状态,但是 workQueue 还未排空
*/
// 计算当前工作者线程数
final int wc = ThreadPoolExecutor.workerCountOf(c);
/**
* 是否允许当前工作者线程退出
* 1)allowCoreThreadTimeOut=true:允许核心工作者线程退出
* 2)allowCoreThreadTimeOut=false:当前工作者线程数 > 核心工作者线程数
*/
final boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1)外部系统通过 setMaximumPoolSize 调小了最大线程数 && 当前工作线程数溢出了
* 2)允许当前线程过期 && 上次拉取未得到任务
* &&
* 1)工作者线程数> 1
* 2)工作者线程数=1 && 任务队列为空
*
* 什么情况下线程池的所有工作者线程都会退出?
* 1)allowCoreThreadTimeOut=true && workQueue 为空
*/
if ((wc > maximumPoolSize || timed && timedOut)
&& (wc > 1 || workQueue.isEmpty())) {
// 拉取任务失败就直接递减工作者线程数了
if (compareAndDecrementWorkerCount(c)) {
// 返回 null 以终止该工作者线程
return null;
}
// 出现竞争,重新拉取任务
continue;
}
try {
/**
* 1)允许当前工作者线程退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务
* 2)当前工作者线程数 <= corePoolSize:通过 workQueue.take() 阻塞读取任务
*/
final Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):
workQueue.take();
if (r != null) {
// 成功获取到一个任务
return r;
}
// 拉取超时了
timedOut = true;
} catch (final InterruptedException retry) {
// 当前线程被中断,则继续循环拉取任务
timedOut = false;
}
}
}
protected void beforeExecute(Thread t, Runnable r) {
}
protected void afterExecute(Runnable r, Throwable t) {
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常退出,则递减工作者线程数
if (completedAbruptly) {
decrementWorkerCount();
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1)将当前工作者 w 完成的任务数累加到线程池已完成任务数中
completedTaskCount += w.completedTasks;
// 2)从工作者集合中删除该工作者
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
final int c = ctl.get();
// 线程池处于 RUNNING 或 SHUTDOWN 状态
if (ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP)) {
// 1)如果不是异常退出
if (!completedAbruptly) {
/**
* 计算需要保留的最小工作者线程数,
* 1)如果允许核心工作者线程退出则为 0;
* 2)否则为corePoolSize
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 任务队列不为空,则至少保留一个工作者线程
if (min == 0 && !workQueue.isEmpty()) {
min = 1;
}
// 已有工作者线程 > 期望工作者线程数,则直接返回
if (ThreadPoolExecutor.workerCountOf(c) >= min) {
return; // replacement not needed
}
}
// 2)异常退出则尝试新增工作者线程
addWorker(null, false);
}
}
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** Worker 驻留线程,创建失败时为null */
final Thread thread;
/** 第一个运行的任务,可能为null */
Runnable firstTask;
/** 每个驻留线程完成的任务数,在线程退出时会累加到线程池中*/
volatile long completedTasks;
/**
* 基于指定的初始任务和线程工厂创建工作者线程
*/
Worker(Runnable firstTask) {
// 禁止中断,直到工作者线程运行为止
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
/**
* Worker 本身实现了 Runnable 并且重写了 run 方法,
* 基于 Worker 创建驻留线程,并启动运行。
*/
thread = getThreadFactory().newThread(this);
}
/** 运行工作者线程*/
@Override
public void run() {
runWorker(this);
}
}
}
```
## 如何停止线程池呢?
```
触发线程池关闭之后,提交到线程池的任务会被直接拒绝
1)通过 shutdown 停止线程池时,线程池的状态会递进到 SHUTDOWN,并且活跃工作者线程还能处理剩余任务。
2)通过 shutdownNow 停止线程池时,线程池的状态会递进到 STOP,并且活跃工作者线程不能处理剩余任务,拉取到的任务是 null。
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前线程是否允许关闭线程池
checkShutdownAccess();
// 将线程池状态更新为SHUTDOWN
advanceRunState(ThreadPoolExecutor.SHUTDOWN);
// 中断所有空闲工作者,正在处理任务的工作者线程可以继续运行
interruptIdleWorkers();
// 执行钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
private void checkShutdownAccess() {
// assert mainLock.isHeldByCurrentThread();
final SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(ThreadPoolExecutor.shutdownPerm);
for (final Worker w : workers) {
security.checkAccess(w.thread);
}
}
}
/**
* 将线程池状态设置为目标状态targetState
*/
private void advanceRunState(int targetState) {
for (; ; ) {
final int c = ctl.get();
if (ThreadPoolExecutor.runStateAtLeast(c, targetState)||
// CAS 更新线程池状态
ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(targetState, ThreadPoolExecutor.workerCountOf(c)))) {
break;
}
}
}
/**
* 中断所有阻塞拉取任务的空闲线程
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean atMostOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历所有的工作者
for (final Worker w : workers) {
// 读取工作者驻留线程
final Thread t = w.thread;
/**
* 当前线程还未被设置中断标志,则尝试锁定此 Worker【
* 如果此 worker 已经获取到了任务正在执行,则锁已经被占用无法获取】
*/
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断阻塞等待任务的空闲线程
t.interrupt();
} catch (final SecurityException ignore) {
} finally {
w.unlock();
}
}
// 随机获取的第一个线程正在处理任务 && atMostOne=true,此时一个线程都不会被中断退出
if (atMostOne) {
break;
}
}
} finally {
mainLock.unlock();
}
}
void onShutdown() {
}
final void tryTerminate() {
for (; ; ) {
final int c = ctl.get();
/**
* 1)线程池处于 RUNNING 状态
* 2)线程池处于 TIDYING、TERMINATED 状态
* 3)线程池处于 SHUTDOWN 状态 && 任务队列不为空
* 不允许终止
*/
if (ThreadPoolExecutor.isRunning(c)||
ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING)||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && !workQueue.isEmpty()) {
return;
}
/**
* 工作者线程数不为 0,尝试中断最多一个空闲工作者线程
*/
if (ThreadPoolExecutor.workerCountOf(c) != 0) {// Eligible to terminate
interruptIdleWorkers(ThreadPoolExecutor.ONLY_ONE);
return;
}
// 所有的工作者线程都已退出,或最后一个工作者线程正在退出
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 递进状态为TIDYING
if (ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TIDYING, 0))) {
try {
// 执行线程池的终止钩子函数
terminated();
} finally {
// 递进状态为TERMINATED
ctl.set(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TERMINATED, 0));
// 唤醒通过 awaitTermination 阻塞的所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
protected void terminated() {
}
@Override
public List<Runnable> shutdownNow() {
List<Runnable>tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前线程是否允许关闭线程池
checkShutdownAccess();
// 将线程池状态更新为STOP
advanceRunState(ThreadPoolExecutor.STOP);
// 强制中断所有工作者线程,包括正在执行任务的线程
interruptWorkers();
// 读取所有未完成的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回所有未完成的任务
return tasks;
}
private List<Runnable> drainQueue() {
final BlockingQueue<Runnable>q = workQueue;
final ArrayList<Runnable> taskList = new ArrayList<>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (final Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r)) {
taskList.add(r);
}
}
}
return taskList;
}
}
```
## 等待线程池完全退出
```
目标线程会最多阻塞 unit.toNanos(timeout) 时间来等待线程池完全销毁。
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 计算超时时间
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果线程池还未递进到 TERMINATED 状态【线程池还未退出】
while (ThreadPoolExecutor.runStateLessThan(ctl.get(), ThreadPoolExecutor.TERMINATED)) {
if (nanos <= 0L) {
return false;
}
// 阻塞等待指定的纳秒数
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
}
```
## 线程池的逻辑最大线程数是多少呢?
```
线程池的线程数保存在 ctl 控制变量的低 29 位中,因此线程池的逻辑最大线程数为 2^29-1。
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 控制变量低 29 位为线程池的工作线程数
* 控制变量高 3 位为线程池的生命周期状态
*/
private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0));
/**
* 工作线程数所占的位数,为 29 位,最大工作线程数为 2^29-1 个
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 工作线程数掩码,低位 29 个1
*/
private static final int COUNT_MASK = (1 << COUNT_BITS)- 1;
/**
* 读取线程池的工作线程数
*/
private static int workerCountOf(int c) { return c & ThreadPoolExecutor.COUNT_MASK; }
}
```
## 线程池内置了哪些拒绝策略呢?
```
1)CallerRunsPolicy:线程池未关闭,则交给提交任务的线程自己执行
2)AbortPolicy:抛出 RejectedExecutionException 异常,默认拒绝策略。
3)DiscardPolicy:静默丢弃
4)DiscardOldestPolicy:静默丢弃最老的任务后,重新提交到线程池
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 线程池未关闭,则交给提交任务的线程自己执行
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出异常
throw new RejectedExecutionException("Task " + r.toString()+
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 静默丢弃
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 丢弃最老的任务,再尝试提交到线程池中
e.getQueue().poll();
e.execute(r);
}
}
}
}
```
## 内置线程池有哪些?
```
1)newFixedThreadPool:固定工作者线程池
2)newCachedThreadPool:一个任务一个工作者线程池,无法存储任务
3)newSingleThreadExecutor:单工作者线程池
```
```
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
}
```
## 线程池有哪几种运行状态呢?
```
线程池有 5 种运行状态,状态值是顺序递增的,分别为 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
RUNNING: 接受新任务,同时处理工作队列中的任务
SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务
STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程
TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数
TERMINATED: terminated() 钩子函数运行完毕,线程池退出
RUNNING -> SHUTDOWN【调用 shutdown 时触发状态流转】
RUNNING -> STOP【调用 shutdownNow 时触发状态流转】
SHUTDOWN -> TIDYING【队列和线程池都为空时】
STOP -> TIDYING【队列和线程池都为空时】
TIDYING -> TERMINATED【terminated 方法完成执行时】
```
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 线程池运行状态通过 int 高 3 位进行区分
* 11100000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
}
```
## 什么情况下线程池的所有工作者线程都会退出?
```
allowCoreThreadTimeOut=true && workQueue.isEmpty()
```
## 工作者线程拉取任务时被中断了会发生什么?
```
会继续拉取任务?
```
## 线程池停止过程中会中断空闲工作者线程,如何保证不会伤及运行中的工作者线程?
```
工作者线程本身继承了 AbstractQueuedSynchronizer,是一个互斥锁,当其获取到任务时会对自己进行锁定,
线程池中断空闲线程过程中由于无法获取锁,此工作者线程不会被中断。
```
## shutdown 和 shutdownNow 的区别是什么?
```
1)shutdown 之后:线程池的工作线程能完成正在处理的任务,也能拉取到存留的任务,任务队列中的任务会被执行完毕。
2)shutdownNow 之后:线程池的工作线程只能完成正在处理的任务,但是无法拉取到存留的任务,存留任务会通过方法返回。
```
## 使用线程池的最佳实践
```
1)自定义 ThreadFactory 并在其创建线程时提供一个有效的名称,用于后续的跟踪分析。
2)限定任务队列的长度,避免突发流量导致系统 OOM。
3)线程池创建完毕可以通过预先启动工作者线程来缩短响应时间
prestartCoreThread:预先启动一个工作者线程
prestartAllCoreThreads:预先启动所有的核心工作者线程
```
网友评论