线程池基础概念
线程池是什么
- 线程池是一种基于池化思想的线程管理工具
线程池解决了哪些问题
- 降低资源消耗(操作系统创建和销毁线程,维护线程状态需要很大的开销,会影响系统性能)
- 提高响应速度(任务被执行不需要重新创建线程)
- 提高线程的可管理性(线程的统一分配,统一管理)
线程池类图
截屏2021-09-21 下午7.30.34.png线程池关键变量
- ctl是一个AtomicInteger, 用这一个变量维护线程的状态和数量, 前三位是状态后29位是线程数量
- Worker是核心操作的对象,里面维护了一个thread和一个task, 它继承与AQS,实现了一个独占的不可重入锁,所以可以使用tryLock来判断当前线程状态。
- corePoolSize 代表核心线程数,线程池并不是维护特定的几个线程让他们不被释放而是选择性的留下小于核心线程数的Worker的thread。
- maximumPoolSize最大线程数,限制整个线程池里线程的最大数量,超过了就要采用拒绝策略。
- 每次执行完任务之后所有Worker需要从消息队列里取Task,如果为空就要被销毁,从Workers这个HashSet中移除,然后交给jvm回收。线程池会判断当前的核心线程数量,然后选择性的去将一部分线程彻底wait,一部分wait一部分时间,从而实现了核心线程不会被销毁的特性。
-
线程池任务调度流程图
截屏2021-09-22 上午1.49.59.png
线程的状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
####线程池源码分析
execute(执行任务)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.检查线程个数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//2.以核心线程添加的方式加入添加任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//3.检查线程池运行状态,如果为运行状态则将任务添加到队列中,如果任务队列没有满则添加成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//4.重新检查线程池状态,如果是没有在运行(线程池的shutdown逻辑可能在另一个线程被调用)则将任务移除,同时拒绝该任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//5.如果当前的线程数量为0,添加一个持有空任务的工人(也就是不知道具体要做啥的消费者,后面会对这个做具体介绍)
addWorker(null, false);
}
//6.说明线程池不在运行状态或者任务队列已经满了,尝试去执行addWorker()
else if (!addWorker(command, false))
//7.如果添加失败则执行拒绝策略
reject(command);
}
- 注意:
- 为了保证并发高效执行任务的同时,线程池的内部逻辑不会错乱,使用了ctl(AtomicInteger)这个线程安全的变量来维护线程池状态和线程数量,在每一步操作之前需要先判断线程池状态和线程的数量是否超过阈值
addWorker(添加工人,也就是添加消费者)
- 接上面的执行任务,在判断完线程的执行状态和线程的数量之后如果符合条件都会执行到这个方法,不符合的都采取了拒绝的策略
/**
* @param core 是否为核心线程
* @param firstTask 要执行的第一个任务,因为线程(消费者线程)是可复用的,后面这个task会被替换掉,所以这个消费者线程也会去执行其他的任务
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
1.开启一个死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 2. 检查线程池状态不符合条件的都返回false(如果当前线程为shutdown状态并且,当前的?worker所持有的任务为空,但是任务队列不为空,也会返回false,表示的意思是,如果当前线程池已经shutdown了,就不允许在添加新新的消费线程了)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//3. 在开启一个死循环
for (;;) {
//4.检查线程数量是否大于阈值,如果没有超过的话,通过CAS的方式去改变线程池的数量,成功了则跳出循环,否则的话一直内部循环,也是为了防止并发场景出现
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//5.校验通过,初始化一个工人,Worker的内部实现逻辑下面介绍
w = new Worker(firstTask);
final Thread t = w.thread;
//6.获取到工人的工作线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//7. 将工人添加到集合中
workers.add(w);
int s = workers.size();
//8.更新线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//9.执行工人的消费线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 注意
1.这一段代码主要是使用了一个HashSet将工人维护起来,添加成功之后启动工人的消费线程
Worker实现
- Worker构造方法和成员变量
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//worker的第一个任务,可以为null,上面我们说道runnable为空的情况
this.firstTask = firstTask;
//worker的私有线程,使用线程工厂创建,并将自己作为runnable传进去
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
//工人开始消费,执行到这里其实已经是是在子线程执行了,相当于是是在worker的消费线程执行我们提交进来的runnable,这样做的目的也是为了解耦合,把我们提交进来的任务和工人的消费线程解耦
runWorker(this);
}
}
- 注意:
- Worker继承自AbstractQueuedSynchronizer, 这样可以使Worker本身实现一把不可重入锁,同时也可以通过tryLock的方式去检测当前worker的消费线程是否正在执行。
Worker runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
1.当任务非空时直接执行任务,否则需要通过getTask去取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
1. 不符合执行条件触发中断,需要开发这自己去响应和判断这个中断,否则线程不会结束
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
2.触发在执行任务之前的操作,给开发这回调
beforeExecute(wt, task);
Throwable thrown = null;
try {
//3.执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//4.将工人从工作集中删除,导致的原因可能是任务异常或者任务正常执行结束,正常执行结束的话,如果线程池状态没有stop, 需要在替换一个worker。
processWorkerExit(w, completedAbruptly);
}
}
- 注意:
- 在1处的while循环处如果task为空,需要通过getTask方法去重新获取一个task,重新获取的task为空的条件有
1)线程池的线程数量超过了maximumPoolSize
2)线程池的状态为STOP
3)线程池的状态为SHUT_DOWN并且任务队列为空
4)工人的消费线程超时未取到任务,allowCoreThreadTimeOut字段决定了是否核心线程和普通线程一样会有超时逻辑(超时未取到任务也会退出),采用的逻辑是{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),如果当前的线程数量小于核心线程数,在根据allowCoreThreadTimeOut配置决定时候要执行核心线程的超时策略,源码如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果不允许核心线程超时的话,当前线程是否采取超时策略取决于当前线程是否小于核心线程数,否则的话就都采取超时策略
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//定时任务采用的poll,在keepAliveTime时间之后如果没有取到任务就退出
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//take会无限期的wait当前线程,在allowCoreThreadTimeOut为false且当前的线程数量小于核心线程数的时候,直到有新任务进入队列才能重新signal(具体要看队列的实现)。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
shutDown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
//中断所有工人的消费线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
- 使用interruptIdleWorkers来尝试中断线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- interruptIdleWorkers中会通过tryLock来判断Worker是否正在执行
- 与shutDownNow不同的是,shutDownNow使用的是下面的方法
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
- 没有判断当前工人的消费线程是否在执行,直接中断。
网友评论