本文总结一下对线程池源码的学习,基于jdk 1.8
什么是线程池
顾名思义线程池就是一个可以提供一组可复用线程的对象。线程池内部有阻塞队列,用来存放等待执行的任务。然后内部的线程来执行这些任务,线程会不断的从阻塞队列中获取任务来执行,而不是执行完一个任务就销毁。
线程池的作用
在高并发场景下,如果给每个任务都去创建一个线程来执行,结果就是大量的线程创建与销毁,系统的开销将会很大,影响应用的执行效率。
同时,线程池可以有效的限制应用程序中同一时刻运行的线程数量,避免CPU资源不足,造成阻塞。
线程池的使用
定义一个线程池
ExecutorService executor = new ThreadPoolExecutor(1, 4, 20,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
该线程池,核心线程数为1,最大线程数为4,非核心线程空闲存活时间20s,阻塞队列是长度为10的 ArrayBlockingQueue,线程工厂和饱和拒绝策略没有定义,采用默认实现
线程池中添加任务
Executor接口提供了execute方法,传入Runnable接口的实现(任务),线程池将会调度执行这些任务
for (int i = 0; i < 12; i++) {
executor.execute(() ->
System.out.println(Thread.currentThread().getName()));
}
ThreadPoolExecutor源码分析
定义线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这是线程池最终执行的构造方法,共有7个参数,分别是
- 核心线程数
- 最大线程数(核心线程+非核心线程)
- 非核心线程空闲存活时间
- 空闲存活时间单位
- 阻塞队列
- 线程工厂
- 饱和拒绝策略
在定义时前5个参数是必须传递的,后两个参数不传递表示使用默认提供
注意看第三个参数,默认它是作用在非核心线程上的,如果希望同时作用在核心线程上,可以调用如下方法设置
allowCoreThreadTimeOut(true);
线程池的状态
下面来看一下线程池内部的一些状态,以及工作线程数的封装
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable.
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
节选了部分关键注释说明
ctl是一个原子的Integer类型,包含了workerCount和runState,为了把这个两个值拼到一个int中,限制了workerCount最大为2^29 -1,大约为500多万,而不是2^31-1。
也就是说作者把工作线程数和状态值拼接到了一个int中,这些属性含义如下
属性 | 含义 | 值 |
---|---|---|
COUNT_BITS | 2进制计数位数 | 29 |
CAPACITY | 线程数容量 | (2^29)-1 |
RUNNING | 运行状态 | -2^29 |
SHUTDOWN | 关闭状态(不接受新任务,把已有任务执行完) | 0 |
STOP | 停止(不接受新任务,终止正在执行的) | 2^29 |
TIDYING | 所有任务终止,工作线程数为0 | 2^30 |
TERMINATED | terminated()方法执行完成 | 2^29 + 2^30 |
通过上表可以看到,线程池的5个状态数值是递增的
所以只要状态是>=SHUTDOWN,就代表线程池不会再接受新的任务
三个静态方法解释
- ctlOf(int rs, int wc)
线程池状态与线程数拼成一个int,高3位为状态,低29位为工作线程数 - runStateOf(int c)
获取线程池状态 - workerCountOf(int c)
获取工作线程数
执行任务—execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到,Doug Lea老爷子已经将该方法的流程注释的很清晰了,我这里就通俗的描述一下:
- 如果运行的线程数小于核心线程数,那么就新启动一个线程,并将该任务作为此线程的firstTask
- 若线程池的核心线程数已经满了,就将任务添加到阻塞队列中,需要二次检查(因为有可能在上一次检查之后死掉,或者是进入该方法时线程池被关闭),若线程池不是运行状态,则将该任务从队列中移除,并进行拒绝处理。如果二次检查后没有工作的线程了,那么就新启动一个线程执行该任务
- 如果阻塞队列也满了,就新启动一个非核心线程,如果失败的话,说明线程池被shutdown或者是队列容量和最大线程数都已达到上限,将此任务拒绝掉
添加工作线程—addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
// 如果大于CAPACITY最大容量,或者core为true,与corePoolSize比,
// 否则与maximumPoolSize比较,如果大于允许的线程数则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// worker + 1成功,跳出retry外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// cas操作失败,如果线程池状态改变,跳出内层循环,继续判断状态
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 拿到锁以后二次检查
int rs = runStateOf(ctl.get());
// 如果在运行状态,或者是SHUTDOWN状态且firstTask为空(取queue中任务)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到workers中
workers.add(w);
// 记录最大的worker数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 添加失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
firstTask
addWorker方法的第一个参数是firstTask,firstTask是线程池中Worker对象的一个属性,该对象代表新启动线程的第一个任务。
在execute方法源码中可以看到,只有在新增线程时才会给firstTask赋值,如果任务被添加到queue中,将其置为null,线程会去阻塞队列中获取任务执行。
在添加worker前,会在有必要的情况下检查阻塞队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
1、如果状态大于SHUTDOWN,不接受新任务,直接返回false;
2、如果状态等于SHUTDOWN,firstTask != null,返回false,不允许新增任务;
2、如果状态等于SHUTDOWN,firstTask == null,说明该线程会去队列中取任务执行,如果此时workQueue.isEmpty(),则返回false;
addWorkerFailed
添加线程失败时,会将刚创建的worker对象移除掉
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// HashSet中移除worker
if (w != null)
workers.remove(w);
// 线程数减一
decrementWorkerCount();
// 尝试关闭线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
内部线程包装对象—Worker
Worker是ThreadPoolExecutor中的内部类,包含了执行任务的线程(节选了部分属性和方法)
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
/** 执行任务的线程 */
final Thread thread;
/** 初始化运行的任务,可能为空 */
Runnable firstTask;
/** 每个worker完成任务的计数器 */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 无法获取锁,从而禁止 interrupt worker
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 线程工厂初始化线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
Worker对象通过继承AbstractQueuedSynchronizer队列同步器,来控制worker的同步状态,
新建worker时,setState(-1) ,设置状态为 -1 使得其他线程无法获取到worker的锁,禁止interrupt该线程(只有当前状态为 0 时才有机会获得锁)
执行任务——runWorker
线程池真正执行任务的地方就在这里了
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 标记线程是否是因为发生异常中断的
boolean completedAbruptly = true;
try {
// 获取要执行的任务, firstTask为空就从队列中取
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子函数,可以在执行前自定义些操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,任务执行完成后调用的方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 执行成功,将异常标记置为 false
completedAbruptly = false;
} finally {
// 执行worker退出操作
processWorkerExit(w, completedAbruptly);
}
}
线程池状态检查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
1、如果runState >= stop(stop状态线程池要中断正在运行的任务),且线程未被设置为中断,则interrupt线程
2、如果runState < stop,进行二次检查(有可能在第一次获取状态后,调用了shutdownNow方法),此时线程如果有中断标记,则清除(Thread.interrupted()返回线程中断状态,并将其清除),再次查看状态,runSate >= stop 则interrupt线程
获取任务——getTask
private Runnable getTask() {
// 上一次循环取task是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 线程空闲了是否需要退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 检查线程池workerCount
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1、循环取任务,直到取到任务,或者是不需要返回任务为止;
2、如果线程池是 > stop状态,则workerCount减1,返回null,
如果是shutdown状态,且队列为空,则workerCount减1,返回null
3、(wc > maximumPoolSize || (timed && timedOut))
wc > 最大线程数 或者是 线程空闲了keepAliveTime 且 空闲需被销毁
(wc > 1 || workQueue.isEmpty())
wc > 1 或者 队列为空
同时满足上述两个条件,说明该线程不需要获取任务来执行,则workerCount减1,返回null
- timedOut代表上一次循环中,取task时候是否超时(代表了该线程空闲了keepAliveTime时间)
- timed代表该线程空闲了是否需要销毁
4、如果timed == true,则调用poll方法,等待keepAliveTime时间,
否则调用take方法阻塞直到获取到任务(到达这一步,说明线程池状态为running或者是shutdown且workQueue不为空)
worker退出——processWorkerExit
任务执行完成后,在finally语句中执行worker的退出操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果线程异常退出,则workerCount减 1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// set 中异常执行完的worker对象
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试停止线程池
tryTerminate();
int c = ctl.get();
// 状态为running或者shutdown
if (runStateLessThan(c, STOP)) {
// 异常退出直接新增加一个worker
if (!completedAbruptly) {
// 计算最小线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 当前工作线程大于min,则无需新增,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
该方法中有个很重要的操作就是调用tryTerminate方法,尝试终止线程池
接下来就来分析线程池的关闭操作
tryTerminate + awaitTermination
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 1、stop状态,则往下执行
// 2、shutdown且队列为空则往下执行,其余情况直接return
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程数不为空,则中断一个worker线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 执行到这里,说明worker为0,且没有任务需要执行
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置线程池状态为tidying
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 调用钩子函数,需继承在子类中实现
terminated();
} finally {
// 设置线程池状态为terminated
ctl.set(ctlOf(TERMINATED, 0));
// 线程池终止完成信号通知,通知awaitTermination方法
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 设置的阻塞超时时间
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果线程池已经关闭,直接返回
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 阻塞,如果tryTerminate方法关闭成功的话,会唤醒这里
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
该方法中比较重要的一步操作就是中断空闲线程interruptIdleWorkers(ONLY_ONE)
/**
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程未中断,且可以获取到锁,则interrupt
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果该值为true,则跳出循环
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
粗略翻译一下方法上面的注释,如果onlyOne参数被设置为true的话,该方法最多只会中断一个worker线程,为了把shutdown信号传播下去,保证线程池最终的关闭,最多就只中断一个空闲线程。
线程阻塞的话就是阻塞在getTask方法中,这里中断一个线程后,getTask --> processWorkerExit --> tryTerminate --> interruptIdleWorkers --> getTask
其实tryTerminate方法中,为什么要设置onlyOne为true,如果那个地方是false会是什么结果,没有思考的很明白,后续多查阅些资料实践一下。
其实上面已经涉及到了线程池的关闭流程,下面还有两个比较重要的方法来分析一下
关闭线程池——shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 将线程池状态设置为shutdown
advanceRunState(SHUTDOWN);
// 中断所有空闲的线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
}
这个地方的巧妙之处就在于最后的tryTerminate方法,因为线程池shutdown状态时,是要把剩下的任务执行完的,如果调shutdown方法的时候恰好所有线程都在执行任务,那么就无法中断。
关闭线程池——shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为stop
advanceRunState(STOP);
// 中断所有worker
interruptWorkers();
// 取出队列中的任务并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
return tasks;
}
stop状态需要把所有线程中断,任务也放弃,所有shutdownNow会中断所有worker线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 此方法是内部类Worker中的方法,提到这里来便于阅读
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
持有worker锁时,state 为 1,未持有锁时为 0,所以这里就可以看出区别,shutdown方法是只能中断空闲的worker线程,而shutdownNow则是把所有worker线程都中断。
线程池的基本流程就到这里了,如果有理解的不对的地方,或者需要补充的地方,还望各位小伙伴不吝赐教 ^-^ ~
网友评论