一:前言
1.1:什么是线程
在讲些线程池前,我们需要说一下什么是线程,因为线程是进程中得一个实体,线程本身是不会独立存在的。进程是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,线程则是进程的一个执行路径,一个进程中至少有一线程,进程中的多个线程共享进程的资源。
1.2:线程池的作用
- 【降低线程的创建销毁开销】
- 【提供一种资源限制和管理的手段】
二:ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程限制
long keepAliveTime,//空闲存活时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
//拒绝策略
RejectedExecutionHandler handler) {
//判断参数是否越界
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池构造参数入参含义:
- corePoolSize :线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
- maximumPoolSize|: 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
- keepAliveTime :线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
- unit: 时间单位
- workQueue: 用于保存等执行的任务的阻塞队列,比如基于数组的有界队列ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、和最多只有一个元素的同步队列SynchronousQueue等
- threadFactory : 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
-
handler : 线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1、AbortPolicy:直接抛出异常,默认策略;2、CallerRunsPolicy:用调用者所在的线程来执行任务;3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;4、DiscardPolicy:直接丢弃任务;当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务
三:线程池工作原理
通过上述构造函数实例化ThreadPoolExecutor后,当调用execute,首先会判断当线程池中线程数量是否小于核心线程数,如小于则创建线程。如果此时核心线程数已经等于corePoolSize,则将提交的任务存放到workQueue队列中。当队列已经存满,会进行线程创建,创建失败则会执行具体的拒绝策略。(线程池在运行中的前提下)
四:线程池运行状态
线程池运行状态转换,可以使用ThreadPoolExecutor#shutdown()和shutdownNow()两个方法进行改变。两者的区别在于shutdown执行 会将线程池状态设置为SHOTDOWN,然后当线程池线程数量和工作队列为空才会进入TIDYING状态。而shutdownNow执行后线程池状态会设置为STOP,然后当线程池线程数量为空时才会进入到TIDYING状态。如图:
五:源码分析
5.1:关键属性、方法含义说明
//高3位:表示线程池的运行状态 除去高3位之后的低位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示在ctl,低COUNT_BITS位 是用于存放当前线程数量的位
private static final int COUNT_BITS = Integer.SIZE - 3;
//低COUNT_BITS位 所能表达的最大数值 000 1111..111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//运行中 -1 (111...111) 左移29位,1110000..00 仍为负数
private static final int RUNNING = -1 << COUNT_BITS;
//0左移29位
private static final int SHUTDOWN = 0 << COUNT_BITS;
//0010 00..00
private static final int STOP = 1 << COUNT_BITS;
//0100 00..00
private static final int TIDYING = 2 << COUNT_BITS;
//0110 00..00
private static final int TERMINATED = 3 << COUNT_BITS;
//获取当前线程池运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前线程池线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//用在重置线程池ctl值时会用到
//rs 表示线程池在状态 wc表示 worker数量(可以理解我线程池数量)
private static int ctlOf(int rs, int wc) { return rs | wc; }
//比较当前线程池ctl所表示的状态 是否小于某个状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//线程池全局锁 增加worker 减少 worker 时需要持有mainLock 修改线程池运行状态时 也需要
private final ReentrantLock mainLock = new ReentrantLock();
//包含池中所有工作线程的集合。仅在持有mainLock时访问。
private final HashSet<Worker> workers = new HashSet<Worker>();
//控制核心线程数量内的线程, 是否可以被回收 true 可以 false不可以
private volatile boolean allowCoreThreadTimeOut;
//任务队列,当线程池中线程达到核心线程数时 再提交任务时 就会直接提交到workQueue
private final BlockingQueue<Runnable> workQueue;
//条件队列 (本人简单理解为 线程进入到阻塞队列前的临时队列,主要作用就是为了辅助阻塞队列)
private final Condition termination = mainLock.newCondition();
5.2:execute方法分析
execute方法的作用是提交任务command到线程池进行执行。
public void execute(Runnable command) {
//判断是否为空
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//workerCountOf(c) 获取任当前线程数量
// 条件成立 代表线程数量小于核心线程数量 此次提交 新建一个线程,对应线程池多了一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//执行到这条语句说明addWorker一定是失败了
//失败原因:
// 1.存在并发现象, execute 方法是可能有多个线程同时调用的,当workerCountOf(c) <corePoolSize成立后
//其他线程也可能成立了,并向线程池中创建了worker.这个时候线程池中的核心线程已经达到
//2.当前线程池状态发生改变。 RUNNING SHUTDOWN STOP TIDYING TERMINATION
//当线程池非RUNNING状态时,addWorker(firstTask!=null,true|false) 一定会失败
//SHUTDOWN状态下 也有可能创建成功。 前提firstTask==null 而且当前 queue 不为空, 特殊情况
c = ctl.get();
}
//执行到此处的情况
//1.当前线程池线程数已经达到corePoolSize
//2.addWorker失败
//条件成立: 说明线程池处于 running状态,workQueue.offer(command) 则尝试将task放入到workQueue中
if (isRunning(c) && workQueue.offer(command)) {
//再次获取ctl 保存到recheck
int recheck = ctl.get();
//条件1: isRunning(recheck) 成立 说明你提交到队列之后,线程状态被外部线程给修改了, 比如 shutdown() shutdownNow() 这种情况需要把刚刚提交的任务给删除掉
//条件2:remove(command) 有可能失败
//成功:提交之后,线程池中的线程还未消费处理
//失败:提交之后,在shutdown() shutdownNow之前 就被线程池中的线程给处理了
if (! isRunning(recheck) && remove(command))
//提交之后线程池状态非 running 且任务出队成功 则走拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行到这里说明核心线程创建 和 队列追加失败 执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
具体执行链路图:
5.3:内部类 Worker分析
在了解addWorker方法前,我们先来了解一下线程池内部类Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// state: 0 表示未被 占用 > 0时表示被占用, <0 时表示初始化状态 这种情况下不能抢锁
//ExclusiveOwnerThread:表示独占锁的线程 (后续分析AQS是展开)
//Worker 内部封装的线程 linux 进程 win 线程
final Thread thread;
//假设firstTask不为空,那么当worker启动后 内部的线程启动,会优先执行firstTask,当执行完firstTask 回到queue中获取下一个人任务
Runnable firstTask;
//完成任务数量
volatile long completedTasks;
//firstTask 可以为null , 为null启动后回到queue中获取
Worker(Runnable firstTask) {
//设置AQS独占模式为初始化状态,这个时候不能被抢占锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用线程工厂创建一个线程,并且将当前的worker 指定为Runnable,也就是说当Thread启动的时候,会将worker.run()为入口
this.thread = getThreadFactory().newThread(this);
}
//当worker启动时 会执行run 方法
public void run() {
//ThreadPollExecutor ->runWorker() 这个是一核心方法 需要重点看
runWorker(this);
}
5.4:addworker方法分析
上面execute方法三处调用了addWorker,通过上述方法和内部类Worker可知addWorker主要负责创建新的线程并执行任务。
//firstTask 任务
//core true 代表使用核心线程数进行线程的判断 false 代表使用最大线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋 判断线程池状态是否允许创建线程的事情
for (;;) {
//获取ctl
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
)
//当线程池状态大于等于SHUTDOWN 但是队列中已经没有任务了 或者rs ==SHUTDOWN 且firstTask!=null
return false;
//上面代码 就是判断线程池状态是否允许添加线程
//内部自旋
for (;;) {
//获取当前线程池线程数量
int wc = workerCountOf(c);
//条件1:wc >= CAPACITY 永远不会成立
//条件2: wc >= (core ? corePoolSize : maximumPoolSize 判断线程数量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//达到线程限制
return false;
//通过cas操作 线程数+1 相当于申请了一块创建线程的令牌
if (compareAndIncrementWorkerCount(c))
//直接跳出外部自旋 来到了A处
break retry;
//CAS失败 没有成功的申请到令牌
c = ctl.get();
if (runStateOf(c) != rs)
//调到外层循环
continue retry;
}
}
//来到了A处 说明获取到了创建线程指令
//表示创建的workerStarted是否已经启动, false 未启动 true 启动
boolean workerStarted = false;
//表示创建的worker是否添加到池子中 默认false 未添加 true 添加
boolean workerAdded = false;
Worker w = null;
try {
//创建worker 执行完后 线程已经创建好了
w = new Worker(firstTask);
//将新创建的worker节点的线程 赋值给t
final Thread t = w.thread;
//t != null 判断 为例防止ThreadFactory 实现有bug ,因为ThreadFactory 是一个接口 谁都可以实现
if (t != null) {
//获取全局锁的引入
final ReentrantLock mainLock = this.mainLock;
//持有全局锁 可能会阻塞,直到获取成功为止,统一时刻操作线程池内部相关的操作 都必须持有锁
mainLock.lock();
try {
//再次获取线程池运行状态
int rs = runStateOf(ctl.get());
//条件1:rs < SHUTDOWN 代表当前线程为正常状态
//条件2:前置当前线程池不是running状态
//rs == SHUTDOWN && firstTask == null 线程池状态为SHUTDOWN firstTask为null
//其实判断的就是SHUTDOWN的特殊情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//t.isAlive() 当线程start 后 线程isAlive会返回 true
//防止ThreadFactory 中直接start
if (t.isAlive())
throw new IllegalThreadStateException();
//将创建的worker 添加到线程池中
workers.add(w);
//获取线程数量
int s = workers.size();
//条件成立:说明当前 线程数量是一个新高 ,更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//表示线程已经追加到线程池中
workerAdded = true;
}
} finally {
//释放线程池全局锁
mainLock.unlock();
}
//添加成功 启动线程
//条件失败: 说明线程池在lock 之前,线程池状态发生了变化 导致添加失败
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//条件成立 说明添加失败 做一下清理操作
if (! workerStarted)
//失败工作
//1.释放令牌
//2.将worker从集合中清理
addWorkerFailed(w);
}
return workerStarted;
}
使用ReentrantLock锁的一部分原因,是因为workers非线程安全
5.5:线程池内部类 Worker#runWorker分析
addWorker方法在添加woker到workers集合中后,会执行t.start();因为Worker重写了run方法,所以后许会执行到 Worker#runWorker。
//W 就是启动的worker
final void runWorker(Worker w) {
//wt == t.thread
Thread wt = Thread.currentThread();
//将初始化执行的task 赋值给task
Runnable task = w.firstTask;
//情况当前w.firstTask 引用
w.firstTask = null;
//调用原因: 就是为了初始化worker state== 0 和exclusiveOwnerThread == null
w.unlock(); // allow interrupts
//是否是突然退出 true ->发生异常,当前线程是突然退出的,需要再特殊处理
// false 正常退出
boolean completedAbruptly = true;
try {
//getTask这个是一个会阻塞线程的方法, getTask返回null 说明当前线程需要执行结束逻辑
while (task != null || (task = getTask()) != null) {
//加锁原因:shutDown 时会判断当前worker状态,根据独占锁是否空闲来判断线程是否空闲
w.lock();
// 如果池正在停止,请确保线程被中断; 如果没有,请确保线程没有中断。这需要在第二种情况下进行重新检查,以在清除中断的同时处理Shutdownow race
//条件1:runStateAtLeast(ctl.get(), STOP)
//成立:说明当前线程处于STOP/TIDYING/TERMINATION 此时线程一定会给他一个中断信号
//情况1:runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
//说明:当前线程池状态是》=STOP 且当前线程是中断状态,此时需要进入if 里面,给当前线程一个中断
//情况2:runStateAtLeast(ctl.get(), STOP)==false
// (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// Thread.interrupted()获取中断状态,且设置中断为false ,连续调用两次interrupted 第二次一定返回false
// runStateAtLeast(ctl.get(), STOP)) 大概率任然为false
//其实 它在强制刷新线程中的中断标记为false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位设置为了true,且没有处理,这里进行强制刷新一下 不会影响到后面的task
//假设:Thread.interrupted() == true 且 runStateAtLeast(ctl.get(), STOP))== true
//仍然有可能发生,因为外部线程在第一次 runStateAtLeast(ctl.get(), STOP))== false后,有机会调用shutDown.shutDownNow 方法,将线程池状态修改 这个时候 也会将当前线程中断标记位再次设置为中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//钩子方法 留给子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//钩子方法
afterExecute(task, thrown);
}
} finally {
//将局部变量task 设置为null
task = null;
//更新worker 完成任务数量
w.completedTasks++;
//worker 处理完成一个任务 会释放独占锁,然后再次去queue 获取任务
//1.正常情况下会再次回到 getTask()获取任务 while(getTask!=null ..)
//2.task.run 是内部抛出异常了‘。。
w.unlock();
}
}
//getTask 方法返回null 时,说明当前线程应该执行退出逻辑
//1.正常情况
completedAbruptly = false;
} finally {
//task.run ()时抛出异常时,直接从 w.unlock()调到这里
//正常退出:completedAbruptly === false
//异常退出:completedAbruptly== true
processWorkerExit(w, completedAbruptly);
}
}
执行任务期间加锁原因 是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)
5.6:getTask方法分析
gettask方法主要的作用就是从workQueue中获取待执行任务
//返回null的情况
//1.(rs >= STOP )
//2.前置条件 状态是 SHUTDOWN 且 workQueue.isEmpty()
//3.线程池中的线程数量 超过最大限制,会有一部分返回null
//4.线程池中的线程数超过核心线程数量, 会有一部分线程 超时后返回null
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
//条件一 rs >= SHUTDOWN 说明线程 非RUNNING状态 可能是 SHUTDOWN STOP
//条件二 (rs >= STOP || workQueue.isEmpty()
// 2.1 rs >= STOP 说明当前的状态最毒也是STOP 一定返回null
//2.2 前置条件 状态是 SHUTDOWN 且 workQueue.isEmpty() 成立 说明当前线程池状态为 SHUTDOWN 且任务队列已经空
//返回 null runWorker方法将会返回 null 的线程执行线程退出的逻辑
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//使用cas 将ctl 减一
decrementWorkerCount();
return null;
}
//执行到这里 几种情况
//1.线程running 状态
//2.线程池SHUTDOWN 但是队列不为空 此时可以创建线程
int wc = workerCountOf(c);
//timed 表示当前线程 获取task时 是否支持超时机制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//条件一:(wc > maximumPoolSize || (timed && timedOut)
//1.1: wc > maximumPoolSiz ? setMaximumPoolSize()方法 可能外部线程将线程池最大的线程数调小
//1.2: (timed && timedOut) 条件成立当前线程使用 poll 方式获取task .上一次循环使用poll获取任务已经超时
//条件一 为true 表示线程可以被回收,达到回收标准
//条件二:(wc > 1 || workQueue.isEmpty())
//2.1: wc>1 条件成立:说明线程池中还有其他线程, 当前线程可以直接回收,返回null
//2.2:workQueue.isEmpty() 前置条件wc==1 成立:说明当前任务队列已经空了,可以放心退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
5.7:processWorkerExit 线程退出分析
当线程调用getTask()无法再次获取任务,此时会进行线程销毁操作。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止操作
tryTerminate();
//获取最新ctl
int c = ctl.get();
//条件成立: 当前状态是running 或者SHUTDOWN
if (runStateLessThan(c, STOP)) {
//线程正常退出
if (!completedAbruptly) {
//allowCoreThreadTimeOut == true 核心线程数内的线程也会超时被回收
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1.completedAbruptly ==true 线程异常退出
//2.当前任务队列有任务,当前状态为RUNNING 或SHUTDOWN
//3.当前线程数量《corePoolSize ,维护线程池线程数
addWorker(null, false);
}
}
5.8:tryTerminate分析
tryTerminate主要尝试线程池的中断操作。
final void tryTerminate() {
for (;;) {
//获取最新ctl 值
int c = ctl.get();
//判断线程池工作状态
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//来到这里情况
//1.线程池状态》=STOP
//2.线程池状态是 SHUTDOWN 且 workQueue.isEmpty()
if (workerCountOf(c) != 0) {
//中断一个空闲线程
//空闲线程在: queue.task() | queue.poll()
//因为线程状态是 》=STOP || (SHUTDOWN && 队列为空) 最终调用addWorker 会失败
//最终空闲线程都会执行退出, 非空闲线程执行完当前TASK 时也会调用tryTerminate 有可能执行到这
interruptIdleWorkers(ONLY_ONE);
return;
}
//执行到这里的线程 此时workerCountOf(c) == 0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination()方法线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
六:总结
线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中得线程个数。通过线程池状态来控制任务的执行,每个worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销
作者:海边奔跑的蜗牛
链接:https://juejin.cn/post/7079605753094340644
来源:稀土掘金
网友评论