在程序开发中,为了实现异步和并发经常用到多线程,我们知道CPU能同时处理的资源是有限的,CPU处理任务通常是通过切换时间片来轮流执行的,如果无限制的使用多线程就会形成很多线程同时竞争CPU的资源,就会导致CPU经常切换时间片,如果线程执行的时间很短,那么CPU切换时间片的开销可能就比处理线程的还大,甚至会出现开的线程过多把CPU的资源占满了,导致系统卡死的现象。
所以就有了线程池,线程池就是用于管理和调度多线程的,它通过复用线程来实现处理多线程的任务。因为复用了线程所以减少了创建线程的开销以及减少CPU资源的开销。
下面通过解析线程池源码来探索它的实现原理。
线程池的实现类是ThreadPoolExecutor,它的类关系图如下所示:
类图中有几个比较重要的属性,它们是线程池的实现核心,分别是:
- ctl: 既表示线程池的状态,也表示线程池的容量
- workers: 用于创建线程并复用线程来调度缓存的线程任务
- workQueue: 它是用来缓存线程,等待worker进行调度的
- largestPoolSize: 线程池当前的最大容量
- threadFactory: 线程工厂类,用于创建线程
- handler: 当线程池满的时候的处理策略,是抛异常还是丢弃还是其他的逻辑等等
- keepAliveTime: 用于当线程池超过核心线程数时或启用了允许核心线程超时时,核心线程或工作线程即Worker从队列中获取下一个要执行的线程任务的超时时间
- allowCoreThreadTimeOut:是否允许核心线程超时,默认不允许,如果启用了表示核心线程从队列中获取下一个要执行的线程任务的超时时间
- corePoolSize: 核心线程的大小
- maximumPoolSize: 指定线程池的最大容量
我们平常中构建线程的时候一般可以指定corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler来实现自己期望的线程池策略。
在介绍线程池的实现原理之前,我们先了解下线程池的状态,线程池也有自己状态,线程池的调度会依据线程池当前的状态,下图是线程池的状态演变图:
4Znvt0.png - RUNNING: 运行中状态,默认是此状态。在此状态下线程池可以接收新的线程并可以调度处理队列中的任务
- SHUTDOWN:关闭中状态,路径: RUNNING—》SHUTDOWN,当线程池处于RUNNING状态下调用shutdown()方法之后,状态会变成SHUTDOWN。在此状态下线程池不会接收新的线程,但会处理队列中的任务
- STOP:停止状态,路径:RUNNING or SHUTDOWN—》STOP,当线程池处于RUNNING或者SHUTDOWN状态下调用shutdownNow()方法之后,状态会变为STOP。在此状态下线程池不会接收新的线程,且不会处理队列中的任务,并中断正在处理中的任务。
- TIDYING:资源释放状态(个人理解),路径:STOP—》TIDYING,或者 SHUTDOWN—》TIDYING,当STOP状态下线程池已为空时就会变为TIDYING状态,当SHUTDOWN状态下线程池为空并且任务队列为空时就会变为TIDYING状态。TIDYING的状态下就会调用terminated()方法。在此状态下线程池不会接收新的线程,且不会处理队列中的任务。
- TERMINATED:已终止状态,路径:TIDYING—》TERMINATED,terminated方法执行完之后状态就为变为TERMINATED。在此状态下线程池不会接收新的线程,且不会处理队列中的任务。
线程池的实现原理其实主要是将线程加入线程池时,先判断当前核心线程数是否已达最大值,如果核心线程数没有已达到最大值,那么创建任务work并直接调度并处理任务;如果核心线程数已达到最大值,那么就会加入队列中等待调度。这其中会涉及检测线程池的状态,依据不同状态做不同的处理。我们来看看调用线程池的execute方法发生了什么:
public void execute(Runnable command) {
if (command == null)
//如果线程任务为空,直接抛出异常
throw new NullPointerException();
//获取线程池状态
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果线程池的当前工作线程即核心线程输入小于指定的核心线程数,
//则添加核心线程
if (addWorker(command, true))
return;
//添加核心线程失败,重新获取线程池状态
c = ctl.get();
}
//核心线程数已达最大值或添加核心线程失败时
if (isRunning(c) && workQueue.offer(command)) {
//如果线程池处于运行中,则将线程缓存到队列中
//重新获取线程池状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果线程池不是运行中状态,那么尝试从队列中删除线程
//调用reject方法处理拒绝线程的策略
reject(command);
else if (workerCountOf(recheck) == 0)
//如果线程池处于运行中状态,且没有核心线程存在,那么添加一个工作线程去调度队列中的任务
addWorker(null, false);
}
else if (!addWorker(command, false))
//如果线程池不是运行中状态或者加入队列失败,
//调用reject方法处理线程无法加入线程池的策略
reject(command);
}
execute方法中先检查核心线程是否已达到最大,如果没有则调用addWoker添加核心线程并直接调度处理线程任务;如果线程是达到最大加加入队列中等待调度处理。大家可以看到在加入对了中是有对线程池的状态就绪判断的,如果不是运行中状态则不会加入队列中,在addWork中也是有对状态的判断的,我们看下下addWork的源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//获取线程池当前状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//这里就是检测是否是运行中的状态或者SHUTDOWN 状态下队列是否为空
//如果非运行中状态,且是>SHUTDOWN状态,则不添加核心或工作线程,如果=SHUTDOWN 状态且队列为空那么也不添加核心或工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取当前正在运行的任务(含工作线程和核心线程)的数量
int wc = workerCountOf(c);
//如果正在运行的任务的数量大于最大容量,或者当前添加的是核心线程,则如果大于核心线程允许的最大数量或者当前添加的是非核心线程,则如果大于线程池允许的最大数量,就直接返回添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS自增运行中的任务的数量
if (compareAndIncrementWorkerCount(c))
//添加成功,退出循环,进行下一步操作
break retry;
//CAS自增运行中的任务的数量失败
//重新获取线程此状态
c = ctl.get(); // Re-read ctl
//如果状态发生了编号,则重新尝试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//任务已开始运行的标识
boolean workerStarted = false;
//任务已经添加的标识
boolean workerAdded = false;
//任务
Worker w = null;
try {
//创建一个线程任务
w = new Worker(firstTask);
//获取当前任务所属的线程,当前线程还未start
final Thread t = w.thread;
if (t != null) {
//线程不为空
//对任务容器的操作进行加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取线程池的状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果线程池是运行中状态,或者=SHUTDOWN 且此次添加的线程为空(表示可以SHUTDOWN 单队列中还有线程在等待,可以调度处理)
if (t.isAlive()) // precheck that t is startable
//判断线程是否还活着,意味着线程已经start了,线程已经start的情况下不允许再次start
throw new IllegalThreadStateException();
//将工作任务加入容器中
workers.add(w);
//获取工作任务容器大小
int s = workers.size();
if (s > largestPoolSize)
//如果工作容器的大小比当前最大的线程池大小还大,更新最大的线程池大小
largestPoolSize = s;
//标识工作任务添加成功
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
if (workerAdded) {
//工作任务已添加成功
//则启动线程
t.start();
//标识工作任务已经开始
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果工作任务没有开始,那么从调用addWorkerFailed从容器中删除任务w,同时CAS自减工作任务的数量
addWorkerFailed(w);
}
//返回工作任务是否已开始
return workerStarted;
}
addWork中允许添加工作任务的条件是线程池是允许中状态或者是SHUTDOWN状态且队列中有任务在等待,如果条件不满足则返回添加失败,如果添加满足则创建工作任务Worker,将工作任务加入任务容器中,并通过Worker来创建的线程,并启动线程调度并处理任务。
addWork中我看到了创建Woker并通过Worker获取线程,然后再启动线程,我们并不知道线程是怎么创建,也看不到怎么调度队列,所以让我们来看看Worker的源码,让我们揭开这神秘的面纱:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//继承AQS,并实现Runnable
/** Thread this worker is running in. Null if factory fails. */
//任务所创建的线程
final Thread thread;
/** Initial task to run. Possibly null. */
//线程Runnable,代表工作任务中,当线程启动之后允许的第一个任务
Runnable firstTask;
/** Per-thread task counter */
//完成任务的数量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//创建任务,并标识状态为-1
setState(-1); // inhibit interrupts until runWorker
//设置第一个任务
this.firstTask = firstTask;
//获取外部类线程池的线程工厂类来创建线程,创建线程池传递的Runnable是Worker自己,所以启动线程之后
//会调用Worker自己的run方法
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
//调用外部类的runWorker方法,并传递Worker自己的实例作为参数
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
//判断是否持有排它锁,如果=1表示持有排它锁,如果=0表示解锁状态
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
//尝试获得锁
if (compareAndSetState(0, 1)) {
//CAS成功,设置排它锁的持有者为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
//是否锁
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//下面是加上的逻辑,可以参考Java锁机制的文章
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Woker类继承AQS,并实现了Runnable。在构造函数设置线程执行的第一个任务未execute时传递的Runnable,通过线程工厂类创建了线程并传递给线程的Runnable为Worker自身,那么当在addWorker中调用线程t.start时就会回调Worker的run方法,Worker的run方法调用的是外部类的runWorker方法,我们可以猜测runWorker才是线程池调度和处理线程的核心,我们看下runWorker的源码:
inal void runWorker(Worker w) {
//获取当前允许的线程
Thread wt = Thread.currentThread();
//获取Worker中引用的第一个线程,作为线程启动的第一个执行任务
Runnable task = w.firstTask;
//设置Work的firstTask的null,防止Work一直持有导致内存泄漏
w.firstTask = null;
//解锁
w.unlock(); // allow interrupts
//调度发生意外的标识
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//循环从队列中获取任务,我们知道核心线程的第一个任务不为空,所以第一次执行的始终是Worker中的firstTask
//加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//如果(线程池已经>=STOP或者(当前线程已经中断且线程池的状态>=STOP)且当前Woker还没有中断
//则中断Worker
wt.interrupt();
try {
//调用beforeExecute,表示开始执行线程task前需要处理的事情,此方法线程池中默认没有做任何事
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用加入线程池中的线程的Runnable的run方法,表示线程开始运行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//调用afterExecute,表示执行线程task后需要处理的事情,此方法线程池中默认没有做任何事
afterExecute(task, thrown);
}
} finally {
task = null;
//Worker调度处理的任务完成数加1
w.completedTasks++;
//解锁
w.unlock();
}
}
//标识没有发送意外
completedAbruptly = false;
} finally {
//调用processWorkerExit处理Worker退出时的逻辑
processWorkerExit(w, completedAbruptly);
}
}
runWorker的核心就是先执行Worker自身的引用的第一个线程,再通过循环不断调用getTask从队列中获取等待中的线程(达到了一个Worker可以调度多个线程的目的,即复用线程),获取到下一个要处理的线程之后,如果条件允许(即线程未中断,且线程池是运行中或者SHUTDOWN但队列不为空),那么就会调用线程的run方法运行线程。上面的代码我们看到在调用线程的run前调用了beforeExecute,run执行完之后调用了afterExecute,但是这两个方法都没有做任何事情,线程池为什么还要这两个方法呢,岂不是多余?非也,其实这是线程池留作拓展用的,你可以继承线程池,通过这两个方法可以实现线程在执行前后的一些逻辑。
上述是worker的整体工作流程,你以为到这里就完了吗?no。还有一个重要环节,那就是getTask,runWorker确实实现了复用线程的作用,但是如果getTask返回空那runWorker就结束了,如果很快又调用线程池的execute执行线程,又得要创建新的线程,如果经常发生这样的事,那就达不到真正的线程复用。所以这就是getTask的重要性了。我们接着来分析getTask的代码流程:
private Runnable getTask() {
//是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//线程池状态>=STOP 或者 = SHUTDOWN 且队列为空
//工作任务数量自减1
decrementWorkerCount();
//返回空
return null;
}
//获取当前工作任务的数量
int wc = workerCountOf(c);
// Are workers subject to culling?
//是否要超时,条件是启用了允许核心线程或者工作任务的数量 > 核心线程池的最大数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//(如果工作任务的数量 > 允许的最大线程池数 或者 核心线程已经超时)并且(工作任务的数量>1 或者队列为空)
if (compareAndDecrementWorkerCount(c))
//CAS自减工作任务的数量成功,返回空
return null;
//CAS自减工作任务的数量成功,则继续循环
continue;
}
try {
//从队列中获取下一个要执行的线程任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//获取到下一个要执行的任务直接返回
return r;
//获取不到,标识为超时
timedOut = true;
} catch (InterruptedException retry) {
//获取发生中断一次,标识为未超时
timedOut = false;
}
}
}
getTask的核心是通过循环不对的从线程缓存队列中获取下一个要执行的线程任务,获取到就直接返回,获取不到则继续循环,大家主要到一个细节没有:即只有启用允许核心线程池超时或者工作任务的数量大于核心线程池数时,队列中等待的线程才采用超时策略,否则是不会使用有超时的策略。这个超时策略是线程池从队列中取线程时多长没有取到线程的超时时间,而不是加入线程池中的线程多长时间没有得到调度运行的超时时间,这点要做好区分啊。我们拿LinkedBlockingDeque中的poll方法来做说明:
//----------------------------LinkedBlockingDeque
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//调用pollFirst从队首开始获取
return pollFirst(timeout, unit);
}
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
//计算超时时间
long nanos = unit.toNanos(timeout);
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
//循环调用unlinkFirst从队首获取下一个任务,知道取到不为空为止
while ( (x = unlinkFirst()) == null) {
//取不到下一个任务,表示队列为空
if (nanos <= 0)
//已超时,直接返回空
return null;
//队列为空的情况下,通过Condition的awaitNanos方法,最终调用unsafe的park,挂起线程,等待队列不为空的信号
nanos = notEmpty.awaitNanos(nanos);
}
//返回取到的任务
return x;
} finally {
//解锁
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
//队首节点
Node<E> f = first;
if (f == null)
//表示队列为空
return null;
//取到队首的下一个节点
Node<E> n = f.next;
//获取队首的引用的任务
E item = f.item;
//设置队首引用的任务未空,释放资源,防止内存泄漏
f.item = null;
//将队首设的下一个节点设置为队首
f.next = f; // help GC
//将n设置为队首,表示是否掉原来的队首
first = n;
if (n == null)
//队首为空,则置队尾也为空
last = null;
else
//设置队首的前一个节点为空,队首没有前一个节点
n.prev = null;
//数量建议
--count;
//发出取到任务即队列不为空的信号,唤醒等待获取下一个任务的线程
notFull.signal();
//返回取到的任务
return item;
}
public E take() throws InterruptedException {
//调用takeFirst取队首的任务
return takeFirst();
}
public E takeFirst() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
//循环调用unlinkFirst获取队首的任务
while ( (x = unlinkFirst()) == null)
//取不到队首个任务,表示队列为空
//队列为空的情况下,通过Condition的await方法,最终调用unsafe的park,挂起线程,等待队列不为空的信号
notEmpty.await();
//返回取到的任务
return x;
} finally {
//解锁
lock.unlock();
}
}
通过LinkedBlockingDeque的poll和take我们知道,getTask通过workQueue取下一个要执行的线程任务时,如果队列不为空,那么就直接返回并开始调度执行;如果队列为空那么就会挂起等待队列不为空的信号。runWorker结合BlockingDeque的逻辑实现了真正意义上的线程复用。而且keepAliveTime是用于表示工作任务Work在启用了allowCoreThreadTimeOut之后或者当前核心线程数已满,等待从队列中获取下一个线程任务的超时时间,而不是表示线程加入线程池之后多久没有执行的超时时间。allowCoreThreadTimeOut表示核心线程在待从队列中获取下一个线程任务的超时时间。
线程池还有好几个知识点,比如线程池满了之后无法加入新的线程的处理策略,线程池中默认自带了4中策略,分别是:
直接在当前线程执行策略CallerRunsPolicy:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
抛出异常策略AbortPolicy:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
丢弃什么都不做策略DiscardPolicy:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
从队列中丢弃最早加入的任务,并重新加入线程池策略DiscardOldestPolicy:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
其他的知识点,如shutdown等逻辑感兴趣同学自行研读源码,这里只探索线程池的核心实现部分。
综上所述,我们可以大概做下线程池的总结:
线程池execute时,如果当前核心线程没满,则直接创建Work创建核心线程直接运行;如果当前核心线程已满且队列未则将线程放入队列中排队等待核心线程的调度。Work中创建线程并回答Work的run,然后调用runWorker循环不断的从队列中获取等待中的线程进行运行,如果从队列队列为空就会挂起等待队列不为空的信号,这样实现了线程的复用。
能加入接收新线程加入的只有RUNNING状态下;能调度并处理线程任务只有RUNNING状态或SHUTDOWN状态且队列不为空。
注意:keepAliveTime是表示的是在启用了allowCoreThreadTimeOut之后或者当前核心线程数已满,等待从队列中获取下一个线程任务的超时时间,而不是表示线程加入线程池之后多久没有执行的超时时间。allowCoreThreadTimeOut表示核心线程在待从队列中获取下一个线程任务的超时时间。
以上是草根个人对线程池探索的理解,如果不对的地方希望大家可以帮忙指正。
结合上面的探索:
对应面试来说可以问:
1、线程池的原理是什么?
2、线程池有哪些状态,每种状态分别能做什么?每种状态的走向是怎么样的?
3、线程池最大的容量是多少?
4、keepAliveTime用来干嘛?是线程多长时间没有得到调度的超时时间吗?
5、线程池是怎么实现线程的复用的?
6、线程池的核心线程是否是常住的?怎么取消核心线程的常住?
网友评论