概述
引入
我们之前介绍了AbstractExecutorService
,它将批量调用等操作进行了介绍,但是对最底层的单任务执行的详细细节并没有定义。我们今天了解到ThreadPoolExecutor
就对一个线程池的绝大部分机制进行了实现,包括任务的传递、线程的管理、数据的收集、线程池状态的轮转等。通过对ThreadPoolExecutor
的梳理,我们将详细了解线程池的工作原理,因为ThreadPoolExecutor
是我们后面接触的各种形式的线程池的一个基础实现类。
摘要
本文详细介绍了ThreadPoolExecutor
的实现机理,内容如下:
- 总览:对线程池的操作策略做宏观梳理和介绍
- 对线程池基础数据结构及相关操作进行介绍
- 根据角色进行介绍
- 角色A——线程池服务调用者
- 角色B——线程池中的工作线程
- 总结,介绍线程池的使用方法,重点介绍线程池可定制的方法【那几个钩子】
- 使用示例
- 自行定制线程池
- 使用工厂方法构建线程池
- 问题罗列
- 扩展、科普
总览
ThreadPoolExecutor
的服务策略如下:
线程池状态介绍:
线程池分为五个状态:
RUNNING
:正常运行状态,线程池创建后就是这种状态,可以正常执行任务的。SHUTDOWN
:准备关闭的状态,此状态线程池仍然在运行之前提交到队列中的任务,但是不再接受新任务。STOP
:准备关闭的状态,此状态线程池不光不接受新任务,原有队列中的任务也不再执行。TIDYING
:线程池中的任务、线程都已经清空,正在执行关闭的回调函数。TERMINATED
:已经关闭,不再进行任何操作。几种状态下线程池会有不同的表现,一个线程池的生命周期一般如下:
1.png
另外,线程池在接受到来的任务时,进行了以下的安排,以合理的使用占有的资源并在合理的范围内针对到来的任务数量进行占用资源和响应速度之间的平衡:
线程池规定了以下几个变量,用来在线程池占用资源和子任务响应速度之间“平衡”:
coorPoolSize
:核心线程数,线程数日常保持的线程数量maximumPoolSize
:最大线程数,在来的任务特别多的时候允许超过核心线程数多创建线程,但是不能超过这个数量workQueue
:任务队列,任务到来时可选择进入队列等待当一个任务到来时,会经过以下流转:
2.png
线程池中的线程工作原理和我们之前自行实现的差不多,不同的是:
线程池更贴心的做了一些修饰,使得线程池中线程在执行任务时更“专注”,比如说:
- 在等待获取任务时允许打断等待以重新
check
线程池状态- 在获取到任务之前主动清除线程打断【运行状态一定不打断、关闭状态一定打断】、清除线程变量【需自行实现】
以上,我们大概了解了大方向的线程池的操作策略。
基础数据结构介绍
这里主要介绍线程池的一些状态流转的控制数据结构。
线程池状态
介绍
线程池标识量,使用线程安全的 AtomicInteger ,32 位 int的前3位表示线程池的状态,后29位表示线程池中的线程(worker
)数。所以线程池中最大的线程数量应该是 ,但是不清楚为什么定义的最大变量是,可能是本身数量就很大无所谓了?或者是把二进制位全都是1的情况空出来留给CAPACITY
?emmmm,不清楚了,影响不大,就不纠结了。
源码
/**
* 表示状态、线程数的变量
*
**/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 设置状态的几个常量
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 分别获得状态、线程数的几个方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* 因为我们在设置状态时明显根据线程池的生命周期从小到大的设计,而且表示状态的都是前几位,
* 所以很多时候,我们可以通过直接比较两个int 大小完成一些判断。
* 比如:
* c < SHUTDOWN --> RUNNING
* c >= SHUTDOWN --> 开始走向关闭,不清楚关到了哪个步骤,但是不接受新任务是肯定的
*
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 对线程池中线程的增减操作,注意了:
*
* 为什么增加时必须要有个入参?
* 因为线程池有线程上限的,不能无限多,如果也用
* do {} while (! compareAndIncrementWorkerCount(ctl.get()));
* 的话可能会出现越界异常【在竞争很激烈的情况下,差3个超出,结果4个线程在判断可以后调用了此方法,
* 最后就溢出了】
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 线程数量-1
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 线程数量-1,不做边界检测,强制性-1【当然,它仍然是线程安全的】
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
任务队列及线程集
介绍
任务队列: 任务的一个缓存队列。线程安全。具体用法参见之前的博客BlockingQueue
线程集: 保存创建的子线程的集合
源码
/**
* 在本类中不通过 poll() 返回 null 判断队空,通过 isEmpty() 判断,所以在自己定义 BlockingQueue
* 时不必纠结。比如你可以使用 `DelayQueues`,现在返回了 null ,一会儿可以返回非 null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
一些线程集合数据统计量
介绍
统计线程池的工作情况,即:
-
completedTaskCount
: 线程池已经结束的线程一共做的任务【还健在的线程(正在执行任务的线程、空闲等待任务的线程)的信息不统计】 -
largestPoolSize
:线程池生命周期中最多的一刻存在了多少个子线程
源码
private int largestPoolSize;
private long completedTaskCount;
用户控制的、影响处理策略的变量
介绍
本节介绍一些和线程池处理策略相关的变量,比如:
-
threadFactory
:如何为线程池创建子线程,在此变量引用的实例中实现 -
handler
:上面我们介绍了线程池对新任务的安排策略,这个是任务最中被拒绝后的处理策略 -
keepAliveTime
:线程池的非核心线程在空闲状态下的存活时间【非核心线程空闲时间超过此值时就结束此线程】 -
allowCoreThreadTimeOut
:上面的过期杀死策略是否对核心线程也适用 -
corePoolSize
:核心线程数量 -
maximumPoolSize
:线程池允许的最大线程数量
还针对上面的引用定义了一个常量:
-
defaultHandler
: 默认的拒绝策略
源码
/*
* 我们把这些用户自设的变量都设置成 volatile ,来保证我们读取到的值都是最新的,而且在读取时不需要加锁
*/
/**
* 用来创建新的线程的,调用此工厂创建子线程时注意可能创建失败,创建失败可能导致任务被拒绝或者任务阻在
* 队列中得不到执行。
*
* 由于创建的新线程执行 Thread.start() 时需要创建新的线程堆栈,占用不少内存,故在创建新线程时可能会抛出
* OutOfMemoryError , 在面对这个情况时我们会保证线程池中的变量不变,在后续会执行清理代码完成坏死线程
* 的清理,一般情况下执行清理操作的内存还是可以分配的到的。
*/
private volatile ThreadFactory threadFactory;
/**
* 在为任务分配执行失败后就执行拒绝策略
*/
private volatile RejectedExecutionHandler handler;
/**
* 非核心线程空闲时最大等待时间
*/
private volatile long keepAliveTime;
/**
* 是否对核心线程也采取非核心线程的“空闲超过制定时间就杀死”的策略
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程边界。
* 当线程池中的线程数量少于此值时执行核心线程池的策略
* 当线程池中的线程数量大于此值时执行非核心线程池的策略,直到数量降下来
*
* 正常情况下【allowCoreThreadTimeOut = false】,线程池中的线程数应该维持在 corePoolSize ,
* 保证来任务不用新建线程,还不用在维护线程上产生太多系统开销。
*
* 当然,你设置成核心线程也能空闲超时后关闭的话,线程池中的最小线程数量可能就是0了
*/
private volatile int corePoolSize;
/**
* 在任务繁忙时可以多创建线程以达到快速响应,但是线程数不能超过此值。
*
* 这个值作为 int 最大是 2^(31)-1,但是由于我们取了前三位做线程池状态标志,所以最大是 2^(29)【注释中
* 写的2^(29)-1】,知道就好,不必纠结
*/
private volatile int maximumPoolSize;
/**
* 定义了一个默认的拒绝策略,当没有任何要求时可以用这个拒绝策略
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
线程池变量访问控制锁
介绍
用于控制和线程池中线程集合、线程集合数据统计相关的访问。
我们使用可重入锁而非线程安全集合的原因:
- 我们还要控制线程集合数据统计的线程安全维护,就是上面的
largestPoolSize
,completedTaskCount
,线程安全集合只是维护集合的线程安全,不能管到集合外面的东西。 - 使用线程安全集合,在我们进行线程中断时能保证中断有序进行,(一个操作线程一个操作线程的执行对子线程的中断),避免了不必要的中断风暴(尤其是在关闭线程池时)
另外,我们在执行shutdown()/shutdownNow()
时也保持此锁,以实现关闭过程中线程集合的平稳过渡【保证不会有多个用户线程同时调用关闭,也保证在关闭时不会有其他的针对子线程的操作】
源码
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
一些权限控制集
介绍
介绍一下Java的安全权限管控,在扩展里统一介绍吧。
源码
/**
* 在调用 shutdown()/shutdownNow() 时,需要检测调用者是否有权限修改线程(包括线程中断、睡眠唤醒等等)
* 所有校验都通过之后才会执行对应的关闭操作。
*
*
* 在调用线程中断时【中断空闲子线程/中断所有子线程】一般不会检测是否有权限,所以有时线程中断可能没有什么显示
* 就直接操作失败了。
* 但是在关闭时是不允许失败的,所以要明确检测一下,因为如果线程中断失败而没有响应的话,可能会造成关闭时响应
* 时间过长
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* 执行 finalize() 进行垃圾回收时使用的上下文【虽然我也不知道有什么用】*/
private final AccessControlContext acc;
线程池详细介绍
线程池的调用线程
角色介绍
线程池的调用线程,一般指的就是我们编码的线程,主要用来进行线程池的一些使用的操作,包括但不限于:
- 创建线程池、初始化、后续的业务逻辑参数调整
- 提交任务
- 关闭线程池、等待线程池关闭
我们主要还是从这三个方面进行源码解析
源码解析
线程池创建、初始化及调整
// constructor
/**
* 创建一个新的线程池,使用默认的线程工厂和拒绝策略
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 非核心线程最大闲置时间【超时关闭】
* @param unit 上面时间的单位
* @param workQueue 任务队列
* @throws IllegalArgumentException 存在以下情况时抛出
* corePoolSize < 0
* keepAliveTime < 0
* maximumPoolSize <= 0
* maximumPoolSize < corePoolSize
* @throws NullPointerException workQueue为空时抛出
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 入参增加线程工厂,其他相同
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 入参增加拒绝策略,其他相同
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 入参增加线程工厂、拒绝策略,其他相同
*
* 是 ThreadPoolExecutor 构造函数中有主要实现逻辑的方法
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// getter & setter
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* 这个setter有一些额外的操作在里面:
*
* 1. 校验入参合法性
* 2. 设置 corePoolSize
* 3. 设置后看门槛是变高了还是低了
* 高了的话就根据需要创建线程
* 低了的话就打断空闲线程以使其重读配置参数,自行调整数量
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// 我们并不清楚到底需要多少新的线程,我们做一个启发式的判断,在任务数和可增加的核心线程数中选个
// 少的并创建线程。
//
// 因为任务队列中的任务是每时每刻都在变的,所以我们每次都检查,当任务队列为空时,再创建新线程
// 也不会提高现有任务的处理速度【搞不好由于cpu的轮转还会降低现有任务的处理速度】,那就不再创建
// 新线程
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public int getCorePoolSize() {
return corePoolSize;
}
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
/**
* 设置完后打断空闲线程使各个子线程根据新的参数调整策略
*
**/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* 设置完之后如果闲置最大时间变短了,打断空闲线程使其根据最新的参数自行调整
* 当然,如果设置时间更长了,那无所谓,后面我们介绍 getTask()时会详细介绍
* 子线程读取参数的机理。
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* Statistics */
/**
* 注意: 涉及线程集合的操作要使用锁【如上面的介绍】
*
* 问题: 为什么不直接使用 ctl 的后29位得到子线程数量呢?是为了更确切的数字么?
* 个人感觉没必要,毕竟在 ctl 和 workers.size() 不一致时往往处在加/减线程 的过程中,
* 说哪个对都随便,而且本身这就是随时间一直变化的,取出来到你用的时候都不一定有效。不必较真。
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* 获得正在执行任务的线程数量。
* 注意:这里的 active 和我们之前例子里面的 active 不一样,我们这里的 active 特指正在执行任务,没有闲
* 置的;我们之前的调用 Thread.isAlive() 表示线程是否还存活【start()了但是还没stop()】。
*
* 我们用 tryLock() 进行判断,具体原因我们在后面介绍子线程工作原理时再细谈。
*
* 记得使用主锁
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* 记得使用主锁
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* 获得所有,包括正在运行的,所以判断线程如果正在运行就+1
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* 获得已经完成的任务,所以正在运行任务的线程不用+1
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
子线程、任务的相关操作
/**
* 提交任务,这个任务会在以后被执行,可能使用池中已有的线程或者为此任务新创建线程。
*
* 如果无法提交,可能是因为线程池状态为关闭或者线程池已经饱和【线程加满了、任务队列也满了】,这样就执行拒绝
* 策略。
*
* @param command 提交的任务
* @throws RejectedExecutionException 在执行拒绝策略时可能抛出【看使用什么策略了】
* @throws NullPointerException 提交的任务为 null 时抛出
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 参照我最开始画的图,整个处理过程大概分3步:
*
* 1. 线程池中线程数 < corePoolSize,新加线程,成功则返回
* 2. 失败了,可能是资源有限导致的创建失败、也可能是因为线程池处于关闭状态,看看,如果状态正常且能排队就
* 排队
* 3.如果不能排队,可能是因为队满或者线程池处于关闭状态,反正创建线程会检查状态,你就当超过核心线程数,创
* 建线程即可,如果失败了则表示线程池处于关闭状态或者线程池资源已经满了,就执行拒绝操作
*
* 其实如果要我来写的话,可能就是:
* 1. 在开始先判断线程池状态,有问题直接执行拒绝操作
* 2. 根据线程池中的数量看是否可以创建核心线程
* 3. 看是否可以进队列
* 4. 看是否可以创建非核心线程池
*
* 虽然 addWorker() 方法会自动检测创建操作是否合理【包括是否核心、线程池状态】,但是我认为这种检测是
* 一种防错机制,最好不要依赖吧。当然,如果你依赖它,就全依赖呀,为什么最开始还要判断线程池中的线程数看
* 能否创建核心线程呢?
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {// 因为入队列没有检测线程池状态,所以要自行判断
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))// 复查一下,看是不是你刚加进去就被关了,个人
reject(command) // 感觉没必要,反正 offer(),非阻塞,而且你之前判断过了,也不用
// 卡这么死
else if (workerCountOf(recheck) == 0) // 判断队列线程数是否为空,这个很必要,
addWorker(null, false); // 可以封装一下任务队列,新加任务时自动检测,并在需要时添加,多方便
}
else if (!addWorker(command, false))
reject(command);
}
/*
* 子线程的创建、清理方法【不包括运行,运行我们后面专门说】
*/
/**
* 创建线程。
*
* 我们先检测创建线程操作是否合理,包括:
* 1. 检测线程池的状态是否允许创建线程【如果线程池处在关闭状态就看看是否需要创建线程来执行队列中的任务】
* 2. 检测创建线程的要求是否合理,比如核心线程数还没满就创建非核心线程了,比如核心线程数已经满了还在创建
* 核心线程,比如都满了还创建
*
* 如果经检测,请求合理则执行创建,如果请求不合理则不创建,返回false。
*
*
*
* @param firstTask 新创建的线程应该立刻执行的任务,可以为 null , 表示就是为了创建线程而创建线程,比如:
* 1. 采用预加载机制,创建线程池后不等任务来,自行创建核心线程。
* 2. 比如在 SHUTDOWN 状态下没有线程执行任务队列中的任务了,需要创建新线程。
* 3. 比如子线程执行的任务抛出了异常导致任务挂了,要重新申请空闲线程来替换。
* 这个一般是不空的,表示新任务绕过排队直接创建核心线程或者队满了且核心线程数也满了,
* 新来的任务导致增加非核心线程以增加处理速度。
*
* @param core 表示此次创建的线程是否是核心线程【用 corePoolSize 限制还是 maximumPoolSize 限制】
* @return true 线程创建成功
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检测线程池状态是否允许增加线程【专门将特殊情况筛出来】
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 尝试增加线程数的计数器
for (;;) {
// 检测创建的请求是否合理【是否核心/线程数是否满了】
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 竞争失败时要重复竞争,直到竞争完成,反正也是要重读 ctl 的
continue retry; // 线程数计数器的,因为是一直循环,所以也顺手校验状态变没变,
// 如果状态变了就要重新判断新状态能不能增加线程
}
}
// 走到这里说明经检查此次创建线程的要求合理且已经将线程数计数器+1了,接下来进行 new Thread()
// 的操作
boolean workerStarted = false; // 记录新线程是否已经启动
boolean workerAdded = false; // 记录新线程是否已经创建并加入线程集合
Worker w = null;
try {
w = new Worker(firstTask); // 创建一个记录新线程的对象【创建对象时会调用其封装好的方法完成
//子线程的创建,封装的不错】
final Thread t = w.thread;
if (t != null) { // 创建成功,且分配到的对应的子线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新检测线程池状态,和最开始的检测一样,状态不允许就不新创建了
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 新创建的线程池是没有启动的,判断一下状态【这个后面详述】
throw new IllegalThreadStateException();
workers.add(w); //加入线程集合
int s = workers.size();
if (s > largestPoolSize) // 更新数据统计值
largestPoolSize = s;
workerAdded = true; // 子线程成功加入集合
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果成功加入集合,就启动新线程,然后记下成功启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果没成功启动,就执行添加失败的善后操作
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 添加线程失败之后的善后操作,失败可能是就没加入线程集合,也可能是加入集合后没启动成功。
* 我们执行的操作如下:
* 1. 从线程集合中移除对应的记录线程对象
* 2. 线程数计数器-1
* 3. 检查是否可关闭线程池,万一是这个线程在阻止关闭线程池这样就顺手关上它了
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
/* 用户使用的任务队列相关操作 */
/**
* 返回此线程池中的任务队列的引用,此方法主要用来调试和监控。
*
* @return the task queue
*/
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
/**
* 从任务队列中删除指定任务【如果它在队列中存在的话】,注意,如果该任务已经从队列中被取出执行,此操作就没啥
* 用了。
*
* 此方法可以用作取消任务,但是请注意,有的任务可能会在提交时做包装,比如我们之前 submit()的 Runnable
* 任务包装成我们自己的 Runnable【其实是 RunnableFuture 】 然后进入任务队列、返回用户。记得传包装后
* 的就好。
*
* 当然,你也可以在用 Future.cancel() 之后,使用下面的 purge() 统一清理队列中已取消的任务。
*
* @param task 要移除的任务
* @return 移除成功则返回 true
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // 因为只是一个小操作不确定具体状态,所以所有的使任务减少、线程减少的操作都要检测
// 一下看能否关闭
return removed;
}
/**
* 尝试删除任务队列中所有取消的任务,可以使用这个方法来释放线程池任务队列中的垃圾,此操作对线程池的正常
* 运行没有影响。
* 当然,如果有其他线程的干扰,此线程可能没法删除某些任务。【影响不大,反正执行到了也会丢弃的,只是没清
* 干净多占点内存而已】
*
* 注意:
* 并不是从任务队列中取出来的都是包装好的,只有通过 submit()调用的任务是 RunnableFuture ,如果用户
* 自己调用 execute() 传入 Runnable ,那么从队列中得到的就是 Runnable 。故我们只判断任务是否是
* Future ,因为只有 Future 我们可以判断是否取消,也只有 Future 的实现类才能动态取消任务。
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) { // 一一遍历,删除已被取消的任务
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// 我们在移除任务时发现有多个线程都在执行此队列的写操作,我们就把任务队列复制一份
// 遍历复制的,然后在原队列中做删除,反正删除操作重复删除也无所谓是吧
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // 所有使任务减少/线程减少的操作都要检测一下,看能不能关闭
}
/**
* 如果线程池情况允许,就把线程池状态转化成 TERMINATED。【例如:SHUTDOWN 状态下,但是把任务队列中的任
* 务执行完了,也把线程集合清空了;STOP 状态喜爱,把线程集合清空了。】
*
* 当然,如果线程的状态对得上,但是线程池中的线程数不为0,就打断闲置线程来促使线程陆续关闭。【在闲置线程
* 关闭时,又会调用此方法检测情况,并继续传播线程关闭信号,从而实现线程池中线程的安全稳定的关闭】
*
* 在所有的可能造成线程池关闭的操作之后都要调用此方法,【减少线程、减少任务等操作】,以保证线程池能及时的
* 走向 TERMINATED 状态。
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || // 正常运行,不用转换状态
runStateAtLeast(c, TIDYING) || // 已经有线程调用了 tryTerminate() ,不必重复调用
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 还有要执行的任务,无法转换
return;
// 走到这一步的有以下可能:
// 1. STOP 状态
// 2. SHUTDOWN 状态且任务队列中的任务已经执行完
if (workerCountOf(c) != 0) { // 如果线程还没全部关闭,就打断空闲线程将信号传递下去
// 不必担心没有空闲线程导致信息传不过去,因为每当有线程执行完任务时都会调用 tryTerminate()
// 方法
interruptIdleWorkers(ONLY_ONE);
return;
}
// 所有条件全满足,开始转换状态,调用关闭的钩子
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
/**
* 打断正在等待任务的线程,让他们重新检测线程池当前的状态,从而使他们作出针对当前线程池状态的判断和对应的
* 行为,具体怎么判断,作出什么行为我们后面详细介绍。我们没有做权限检查,这样的话某些线程可能无法打断而调
* 用者也不知道。
*
* @param onlyOne 是否只打断一个空闲线程
* 如果是 true : 只打断一个线程,一般在 tryTerminate() 中发现就差把线程集合置空就能关闭了的时候调用,
* 这时候就调用此方法用来传递关闭的信号。使线程有序的关闭。
*
* 如果是 false: 打断所有空闲线程, shutdown() 方法调用时就传 false ,打断所有空闲的线程以求尽快关闭。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* 封装了一层,专门传false,用来提供 shutdown() 使用
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
// 打断所有线程,后面详细介绍
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
线程池关闭
/**
* 调用此方法使线程池进入 SHUTDOWN 状态,不再接受新来的任务,执行完任务队列中的现有任务后会陆续关闭
* 子线程,关闭结束后会关闭线程池。
*
* 此方法只是调整状态,不会阻塞等待关闭完成。【其实我们在看 ThreadPoolExecutor 时会发现它是通过线程池
* 的状态量完成和子线程的沟通的,而子线程的动作完全是子线程检测状态量后自发的行为,调用线程池的用户线程和
* 执行任务的子线程之间没有直接的交互(除了执行 execute() 时是用户线程创建的子线程)】
*
* 如果想阻塞等待关闭完成,请调用awaitTermination()方法。
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // 调用 shutdown 时的钩子
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* 尝试打断所有正在执行任务的线程,移除所有排队的任务,然后将未执行的任务返回。
*
* 此方法即时返回,不会阻塞,想等待完成请调用 awaitTermination() 。
*
* 我们通过打断线程阿里使其停止执行,当然,如果你写的代码考虑的很少,不会对中断作出反应,那么可能就不能及时
* 结束你的任务执行了。如有问题可以看看前面记录的线程中断机制的东西。
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
/**
* 判断是否正在结束中
*
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
/**
* 判断是否已结束
**/
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
/**
* 使用 Lock / Condition 完成检测状态和阻塞等待
*
*
**/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* 覆写了 Object.finalize()
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
调用原理总结
线程池创建、初始化及调整
没啥说的,总结起来其实就是一个套路:
- 该检查入参就检查
- 设置
- 设置完了如果
- 会影响线程池中现有线程的操作策略,就该打断打断,让子线程自己刷新;
- 涉及新加线程的话,已有线程无法操作,就自己加线程。
子线程、任务的相关操作
对任务的处理总结起来也就是我们开始时介绍的套路了,对线程的套路也很明确,因为子线程是打散各干各的,所以每次线程终结都会调用tryTerminate()
方法做线程池状态检测及关闭信息的传递。
线程池关闭相关操作
线程池的关闭很简单:
- 对任务队列来说,该清空就清空即可,很简单
- 对线程集合来说,修改了线程池状态信息后,子线程会自己根据阅读到的线程池状态作出应对,但是阻塞等待任务的可能不能及时反应,我们就使用打断机制进行信息传递而已。
线程池中的工作线程
角色介绍
我们本节主要介绍线程池中的工作线程是如何:
- 闲置等待任务、期间响应线程池状态/参数变化,并能合理根据线程池状态调整自己生命周期
- 获得任务
- 执行任务并记录
源码解析
/**
* Worker 主要用来维持子线程的中断状态,并进行一些运行数据的维护。
*
* 本类继承自 AQS ,通过锁来协调:
* 1. 本 worker 中对应线程的阻塞等待、运行子任务的操作
* 2. 其他线程对本 worker 的操作
*
* 协调如下:
* * 本 worker 中的线程在获得锁的情况下才能运行取到的任务,否则保持在闲置的阻塞等待状态
* 【方便接收通知更改自身行为】
* * 其他线程在通知本 worker 的线程更新操作策略时,如果在阻塞状态就打断并通知,否则如果在运行新任务
* 状态,为防止线程中断对子任务的执行逻辑造成影响,就不打断。从代码上看就是其他线程如果想判断此线程在中
* 断状态,就竞争到锁即可,还能保证在你通知之前不放弃锁而使得本线程不开始运行任务。
*
*
*
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
/**
* 创建任务
*
* 之所以初始化状态为-1是为了防止在 ThreadFactory 还没创建完时有人来打断
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/**
* 子线程一直在循环的逻辑,主要思路就是不停的从任务队列中取出任务并执行,同时也处理一些问题:
* 1. 初始化时会带进来一个 firstTask,如果不为空就先执行这个,否则调用 getTask() 从任务队列中取,
* 退出的原因可能如下:
* a). 如果 getTask() 取到的任务是 null , 则结束本线程。
* 【可能是因为线程池状态变化或者配置参数变化】
* b). 在执行外部代码时抛出了异常,此时我们将创建一个新线程来替换原来的线程。
* 2. 我们在执行任务前获得本线程对应的锁,并保证除非调用了 shutdownNow(),否则不能在运行任务期间打断
* 本线程。【通过 Worker 的锁来控制】
* 3. 我们会先调用 beforeExecute() 钩子,如果这个报错会直接导致任务执行失败。
* 4. 我们在执行完任务后会调用 afterExecute() ,用来处理调用任务时的抛出异常,因为 Runnable 的调用
* 线程是子线程,我们没法通过线程堆栈把错误抛出去,所以采用 Error 包装一下,抛出给 Thread 的
* UncaughtExceptionHandler 。
* 5. 在执行完 task.run() 后,执行 afterExecute() 处理错误时会将错误包装成 Error 抛出,虽然保证能
* 通过 UncaughtExceptionHandler 打印堆栈信息展示出用户代码的错误地方,但是会导致线程死亡,需要
* 有新的线程来替补。
*
*
*
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 也可以用 w.thread()
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 解开线程的锁,后面可以通过获取锁来进行执行任务、打断线程了
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 获取任务
w.lock();
// 取到任务了,判断一下:如果线程池是 STOP ,不想让本任务跑的话确保线程被打断;否则的
// 话确认线程没有被打断。
//
// 目的是:
// 1. 如果该让此任务停,就给内部的代码足够的暗示【中断】
// 2. 如果该让此任务正常执行,就帮他清理好环境
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
// catch 中确保一定包装成 Error 再抛出,是为了让线程
// 能够停止,使错误信息能从报错堆栈信息中正常打出来
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 处理task.run()抛出的异常
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/**
* 阻塞等待任务,可能限时,根据线程池中的任务数和核心线程是否有闲置限制时间决定。此方法可能返回 null ,原
* 因总结起来就一句话: 通过对线程池当前状态的判断,你这个线程不要继续存在下去了。具体来说可能是:
* 1. 重新设置了边界,要减少线程数,恰好你符合要求,就让你这个线程关闭
* 2. 线程池已经是 STOP 状态
* 3. 线程池是 SHUTDOWN 状态而且任务队列中的任务都执行完了
* 4. 这个队列有闲置时间限制,而且时间到了
*
* 注意,在此 getTask() 返回空时已经宣判了一个线程的死刑,所以顺手就把线程计数器-1了。
*/
private Runnable getTask() {
boolean timedOut = false; // 用来判断是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断是否符合上面的2,3情况
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 为4的判断做一下计算
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 判断是否符合1,4
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 阻塞等待,同时关注4的问题
// 注意,这里是有一点问题的,最大空闲时间是专只空闲,这里默认把中断唤醒不算做是空闲了,所以每中断
// 一次就重新开始空闲时间的计时。
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* 执行线程自行退出后的清理操作,注意是自行退出,所以调用此方法的都是要退出的线程自己。
*
* 如果入参 completedAbruptly 设置成 true ,表示此线程的死亡是在执行任务时【可能是 Task.run() ,
* beforeExecute(), afterExecute()】抛出了异常。如果入参是 false ,表示是由于 getTask() 返回了
* null 导致线程退出。
* 这两者情况是不一样的,如果是抛出异常,我们需要重新添加一个线程来补充本线程的空;如果是 getTask()
* 返回空,我们就不再补充新线程,因为这种情况就是想关闭此线程。
* 也由此,我们可以明白,在向线程池提交代码时尽量不要大量提交抛异常/错误的代码,这样线程池要不断的执行线程
* 退出——新建,会大大降低其执行效率。
*
* 因为本线程涉及线程减少,所以也会调用 tryTerminate() ,如果条件合适的话有可能造成线程池状态转
* 到 TERMINATED
*
* @param w 要退出的线程
* @param completedAbruptly 本线程退出的原因【如果是执行时抛异常则为 true 】
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 抛异常导致退出,先-1,后面加线程时再加回来
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 完成信息归档
workers.remove(w); // 删除本 worker 的引用
} finally {
mainLock.unlock();
}
tryTerminate(); // 看是否符合条件关闭
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果不是异常抛出导致的关闭,理论上是不需要加线程来实现替换的,但是担心特殊情况下影响线程池
// 运行:假如线程池中的线程数比我们计算出的最小值还小,就加一个线程,从关闭操作改成替换操作
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 不必替换
}
addWorker(null, false); // 增加新线程完成退换
}
}
调用原理总结
其实核心就是自己实现一个Runnable
,内容就是不断从队列中取任务并执行,同时监控线程池中的一些控制变量,并完成相应。
总结
线程池的实现原理概述
线程池的实现原理其实很简单:就是封装了一些子线程和任务队列,用户通过调用线程池暴露出的方法来操作任务并修改变量设置值,并通过中断机制传递信息给需要的子线程;子线程就是一个个的死循环,在从任务队列中取任务时检测那些控制线程池状态的标志量,并根据自身情况和线程池的状态选择自己的操作。
和自己的实现例子比较
自己实现的例子和线程池还是有出入的,主要在于:
- 捕获所有线程中断作为子线程退出的信号,比较宽泛,容易导致线程无意中退出【毕竟使用中断机制的很多】
- 线程池中线程数量不可变化
- 子线程支持捕获异常后的恢复
总体看来,我们自定义的线程池除了粗糙一些,其他的问题不大。
线程池定制的方向及注意的问题
我们统一介绍一下线程池中的钩子:
-
onShutdown()
:调用shutdown()
时用(状态:RUNNINT—>SHUTDOWN) -
beforeExecute()
:线程池中的每个线程执行取到的 task 之前用 -
afterExecute()
:线程池中的每个线程执行取到的 task 之后用 -
terminated()
:线程池完成关闭时用(状态:TIDYING—>TERMINATED)
使用示例
示例一、自行定制线程池
用构造函数
示例二、使用工厂方法
用Executors.xxxxxx
,具体自己了解吧。
问题
扩展
Java的安全管控
后面详细介绍吧,我们现在还是把主要目标放在多线程上,等后面看JVM时再细讲。
网友评论