本文章出处 线程池ThreadPoolExecutor 了解
转载请说明
常用线程池类型
Java通过Executors
静态方法创建4种不同类型线程池。
- newSingleThreadExecutor 创建单例的线程池,保证执行任务顺序,超出线程任务将会在任务中等待,所有的任务都按照FIFO队列顺序执行。
- newFixedThreadPool 创建一个固定大小的线程组,指定工作线程数量,当任务超过指定工作数量时,在队列中排队等待执行。
- newCachedThreadPool 创建一个可以缓存线程池,这个线程池活动线程是0,最大线程Integer.MAX,当不断有新的任务添加到线程池中,池内线程数量不够时,可以立刻创建新的线程执行任务。当空闲的线程超过60s就被系统回收掉。
- newScheduleThreadPool 创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。
- newWorkStealingPool 会创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。
像newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool都时内部封装ThreadPoolExecutor
生成线程池的,下面具体分析ThreadPoolExecutor这个类。
ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize 线程核心线程数,不会被回收的线程。
- maximumPoolSize 线程池能够申请最大线程数量
- workQueue 同步性队列转载执行的任务
- keepAliveTime 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。
- threadFactory 线程工厂
- handler 当任务数量超过队列容量时,需要处理这种情况,饱和策略,主要有4种处理策略
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:使用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
线程池疑问
创建线程池基本核心构造参数我们已经知道了,但是我们还有很多问题没有搞明白的。怎么知道线程池内每个线程运行状态,是在工作中还是空闲呢?是不是有一个专门线程去标记空闲线程活动时间?线程是如何实现共用线程。 带着这些问题去阅读代码。
线程池内线程状态
以下内容都是来自ThreadPoolExecutor
代码注释。
线程池内的线程状态都是有一个AtomicInteger ctl
保持的,是一个原子整数,包装了两个领域含义。
-
workerCount
有效的线程数 ,线程总数2 ^ 29 -1 ,线程启动数量不包括线程停止的数量,而该值可能是
与活动线程的实际数量暂时不同。例如当ThreadFactory创建线程失败时,线程正在执行退出,统计线程数量依然包括退出的线程。 -
runState
线程状态-
RUNNING
正在接受新的任务并且处理队列中的任务 -
SHUTDOWN
不接受新的任务,但是能处理任务 -
STOP
不能接受新的任务,不能处理队列中的任务,但是可以中断正在执行的任务。 -
TIDYING
所有的任务终止,workerCount为0 ,线程全部过渡到TIDYING状态,即将运行terminated() 钩子方法 -
TERMINATED
terminated() 钩子方法执行完成
-
这些状态都有一个转换顺序
-
RUNNING -> SHUTDOWN
执行shutdown() -
(RUNNING or SHUTDOWN) -> STOP
执行shutdownNow() -
SHUTDOWN -> TIDYING
当任务队列和线程池都是空 -
STOP -> TIDYING
线程池都是空 -
TIDYING -> TERMINATED
当 terminated()钩子方法执行完
这些状态具体代码实现
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
execute 方法解析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 处理3个步骤
* 1. 如果正在运行的线程数量小于核心线程数,直接创建一个新的线程去执行任务
* 调用addWorker 方法自动检查 线程状态和数量,避免在不能添加线程时添加线程出现错误警报
*
* 2. 如果任务可以成功进入队列,我们仍然需要双重检查是否添加一个线程
* 因为存在上次检查时有线程死亡或者当我们进入方法时线程池正在关闭
* 因此,我们重新检查状态,如果停止,则回滚排队,如果没有,则启动新线程。
*
* 3. 添加任务失败,则尝试创建一个线程,如果失败了,使用拒绝策略
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //当前线程数量小于核心线程数
if (addWorker(command, true)) // 创建线程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //线程池状态RUNNING 并且 任务添加成功
int recheck = ctl.get(); // 第二重检查
if (! isRunning(recheck) && remove(command)) //判断线程池状态 删除任务修改状态
reject(command);
else if (workerCountOf(recheck) == 0) //线程池数量为0
addWorker(null, false);
}
else if (!addWorker(command, false)) //线程池状态不为RUNNING 或者 队列已满,用于开启非核心线程拉取任务
reject(command);
}
下一步我们进入addWorker
创建线程的核心方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //retry标记,第一次看到 😓
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) //至少SHUTDOWN
&& (runStateAtLeast(c, STOP) // 至少STOP 都是不合法
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) { //状态合法
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) //大于核心线程或者最大线程都不需要创建线程,和掩码相与防止最大线程数超过2 ^ 29 - 1 细节啊
return false;
if (compareAndIncrementWorkerCount(c)) // ctl 自增成功,跳出整个循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN)) //状态至少SHUTDOWN 重新进入循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //创建线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//在加锁期间重新检查线程池状态
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 刚创建线程已经开始执行任务,这是有问题
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //启动任务
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()
主要流程检查线程池状态是否合法,创建新的线程,加入workers中,调用start()
执行任务。我们去了解下Worker 类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
}
Worker其实就是Runnable包装类,但是增加了任务中断功能,他的主要任务就是维护中断状态,继承AQS可以简化获取和释放围绕每个任务执行的锁定,防止旨在唤醒等待任务的工作线程的中断。
了解Worker怎么执行任务的进入runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //取出任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //如果当前worker没有任务,从队列中获取任务,直到队列为空
w.lock();
//处理线程中断机制
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //前置处理,类似拦截器机制,需要子类去实现
try {
task.run(); //调用任务方法
afterExecute(task, null); //后置处理
} catch (Throwable ex) {
afterExecute(task, ex); //异常处理
throw ex;
}
} finally {
task = null;
w.completedTasks++; //执行任务数量+ 1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //线程生命周期走完,执行回收工作
}
}
结合Worker构造函数,Worker在初始化就自己给自己上锁了,避免线程在任务还没有开始的情况下就被中断了 。启动线程执行runWorker方法,取出任务,释放锁,如果Worker中的任务为空,从队列中拉取任务。处理线程中断,主要依据第一线程状态已经至少STOP状态,然后清除中断状态,在判断线程没有中断信号了,再发送中断信号。按照作者注释的意思就是当线程池已经在停止过程中,线程应该中断,但是必须双重检查防止关闭过程中竞争发送中继信号。调用run方法执行任务。为什么要上锁执行任务,主要是执行任务过程,必须要获取锁才能中断线程的,但是Worker本身不支持重入锁的,只有在任务开始关闭过程才能中断。
在这里我们终于看到线程共用方式了,通过线程不断从队列中获取任务,然后再进行调用run方法执行任务,当线程退出获取队列循环,线程生命周期就结束了。
geTask()
private Runnable getTask() {
boolean timedOut = false; //上一次拉取是否超时
for (;;) {
int c = ctl.get();
//检查线程池状态是SHUTDOWN 不接受新的任务
// 任务队列为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount(); //核心线程数workerCount -1
return null;
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 空闲情况下是否回收核心线程数 默认是false
// 当前线程数大于 核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc 大于最大线程数 ,先处理线程数量
// 线程在存活的时间内没有获取到任务,则需要回收掉,上一个循环的,线程数-1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) { //wc 不要为0,任务队列为空的情况
if (compareAndDecrementWorkerCount(c)) //线程-1成功没有其他线程竞争,没有新增任务
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超时会返回空
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) { //中断等待获取任务,放弃执行任务
timedOut = false;
}
}
}
这里我们知道空闲时间是怎么回收线程的,通过同步性队列poll() + 超时时间知道一个线程在这个时间内没有任务执行,线程池处于空闲状态的,返回null给调用方法,跳出while循环,结束整个线程的生命周期。
进入processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //如果没有执行到任务,核心线程-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //移除当前worker ,线程会被回收掉
} finally {
mainLock.unlock();
}
tryTerminate(); //判断线程池内状态,是否对线程池发出关闭信号
int c = ctl.get();
if (runStateLessThan(c, STOP)) { //线程池在RUNNABLE或者SHUTDOWN状态,线程池任然可以执行任务或者接受任务
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) //线程池内线程已经被回收完了并且任务还没有执行完
min = 1;
if (workerCountOf(c) >= min) //线程池内线程数量大于核心线程池,不需要新建线程去处理
return; // replacement not needed
}
addWorker(null, false); //创建新的线程处理任务
}
}
进入 tryTerminate()
在线程池SHUTDOWN状态线程为0和任务队列为空的情况,或者STOP状态核心队列为空情况,线程池状向TIDYING转移,传播关闭池信号。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || //RUNNING 状态不需要处理
runStateAtLeast(c, TIDYING) || //已经进入TIDYING,也不做处理
(runStateLessThan(c, STOP) && ! workQueue.isEmpty())) //任务队列不为空,不满足条件
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); 尝试去中断一个worker
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加锁修改线程池状态
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 进入TIDYING状态
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //执行完terminated() 进入TERMINATED状态
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown()
再去了解下线程池终止方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //修改线程池状态为SHUTDOWN
interruptIdleWorkers(); //中断线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
进入interruptIdleWorkers() 怎么中断线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加锁主要是workers 是一个不安全集合
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { //没有中断和 能够获取到锁,说明此线程池没有在执行任务,Worker 是不支持重入的
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
处理方法挺简单的,修改线程池状态不要接收新的任务,将works中空闲线程取出发出中断信号。
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue(); //删除队列中的任务,返回给tasks
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow 会将队列中还没有来得及处理任务全部删除掉,直接调用tryTerminate()终止线程池生命周期。
总结
现在我们知道线程池内部机制是如何创建线程,共用线程,空闲回收,线程池的生命周期。调用execute()提交任务,如果当前线程池数量小于核心线程数,调用addWorker()创建一个新的线程池去执行任务,否则直接加入到队列中。在addWorker()启动一个线程去不断从队列拉取任务,直到一个队列存活时间没有任务执行或者队列为空,线程才会被回收掉。设置线程池时注意参数设置主要一些细节,核心线程数根据任务情况进行设置,大部分情况下都是核心数在处理任务,只有当任务队列超出容量大小的时候,才会创建新的任务去执行任务。所以在设置最大线程数时,注意设置队列容量大小,如果是Integer.MAX,线程数量永远不会超过核心线程数。只有当任务超出队列容量+线程最大值的情况才会触发饱和策略,根据任务需求选择合适处理方法。
网友评论