这几天梳理了一下java线程池的设计思想,又看了一遍源码,最大的感触就是其实官方的源码设计上确实都很牛逼,但是开发者看起来多少有点费劲,主要原因是他们会严格遵守代码规范和设计原则,而导致出现很多封装的类或者接口,这就给我们这种平时写代码不规范的开发者(我就是- -)造成很多阅读障碍,从而看的过程中很容易忽略一些东西,看不到重点,每次看完了觉得好像懂了但又感觉没抓到要点,但是还好,多看一些优秀的源码和三方库就能慢慢跟上他们的设计思路,同时我们自己要多想多自我提问哪些是要点哪些还是迷糊的东西,然后重复看反复研究,我觉得这是一个提高代码能力和思维架构很好的途径。
废话不多说,在一般开发中,开启线程我们一般是继承Thread类(或直接new)并重写run()方法,然后调用start()方法,这样就开启了一个会执行run方法里面代码的异步线程:
//方法一
public class MyThread extends Thread {
public void run() {
LogUtils.d("hello thread");
}
}
new MyThread().start();
//方法二
new Thread(new Runnable() {
@Override
public void run() {
LogUtils.d("hello thread");
}
}).start();
从中我们看出来,代码的关键是要用到Thread类,这个类就是java的描述线程的类,创建一个Thread对象代码创建了一个线程,调用了start()方法就是让这个线程跑起来。所以这里就引发了一个思考,正常如果只是开一个线程做个简单的事情这样写是没任何问题,如果涉及到多线程大并发,就会频繁的创建线程开启线程,从而加大程序的性能内存损耗,由此就提出了线程池的概念。
而java线程池号称可以解决以上的问题,他的好处概括起来有以下三点:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。
ok,那我们就带着这些疑问去看看java线程池是怎么运行的,他是怎么做到线程复用降低资源消耗的,首先看看线程池的使用步骤,假设要用到异步线程执行多个任务,用线程池是这么做的:
ExecutorService pool = Executors.newFixedThreadPool(10);//创建一个核心线程数为10的线程池
for (int i =0;i<100;i++){
//执行100个线程任务
pool.execute(new Runnable() {
@Override
public void run() {
LogUtils.d("test");
}
});
}
如果不用线程池,就直接for循环100次新建100个Thrad去执行了,这样就做了100次线程的创建和执行,我们在上面的代码看不到线程的创建和执行,仅仅看到把100个Runnable放到pool里面,说明肯定是线程池维护了一个或几个固定线程去做这件事。他是怎么设计的呢,看看源码吧。
Executors类是线程池的一个创建工厂类,newFixedThreadPool()是其中一种线程池的创建方式:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
调用的还是ThreadPoolExecutor线程池的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//核心线程数,核心线程的概念是这个线程池里重点使用的线程,一般情况下(有个全局变量allowCoreThreadTimeOut控制,默认false)就算空闲的时候,核心线程也不会被回收
this.corePoolSize = corePoolSize;
//线程池允许的最大线程数
this.maximumPoolSize = maximumPoolSize;
//工作队列,暂时没有空闲线程的时候任务会放在这里
this.workQueue = workQueue;
//空闲线程超时时间,过了这个时间会回收线程
this.keepAliveTime = unit.toNanos(keepAliveTime);
//线程创建工厂,用这个类来创建开发者想要的线程
this.threadFactory = threadFactory;
//拒绝任务时的处理策略
this.handler = handler;
}
再看看他的成员变量:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 这个ctl就是用来保存 线程池的状态(runState) 和 线程数(workerCount) 的
* 这里使用AtomicInteger 来保证原子操作
* 这里的ctl的初始值其实就是-1左移29位,即3个1和29个0,
* 111 00000000000000000000000000000
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS值为29,代表着低29位用于存储线程数,高3位用于存储线程池的状态
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大的容量,值为3个 0和29个1。也就是536870911
// 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 下面这5个值代表线程池的状态,存储在高3位中
// 3个1,29个0 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 全是0 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// ~就是按位取反,CAPACITY按位取反得到111 00000000000000000000000000000,
// 再和c按位与,其实就是得到高3位,代表线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// CAPACITY就是000 11111111111111111111111111111,
// 直接按位与,其实就是得到低29位,代表线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 按位或
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 任务队列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* JAVA线程池的全局锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 这个HashSet用于存放线程池中所有的工作线程,
* 只有在持有全局锁(mainLock)的前提下,才能对这个集合进行存取操作
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 这个condition是用于支持awaitTermination的
*/
private final Condition termination = mainLock.newCondition();
/**
* largestPoolSize记录了线程池中线程数曾经达到的最大值
*/
private int largestPoolSize;
/**
* 已完成任务的数量
*/
private long completedTaskCount;
/**
* 线程工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程存活时间
*/
private volatile long keepAliveTime;
/**
* 如果这个参数为true,
* 那么核心线程数内的空闲线程 空闲时间超过keepAliveTime后,也可以被回收。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程数
*/
private volatile int corePoolSize;
/**
* 最大线程数
*/
private volatile int maximumPoolSize;
/**
* 默认的拒绝策略为AbortPolicy
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
......
}
重点关注一下线程池的5个工作状态:
1.RUNNING
能够接收新任务,以及对已添加的任务进行处理,代表是一个可用的正常的状态,线程池的初始化状态。
2.SHUTDOWN
不接收新任务,但能处理已添加的任务,即已经放在队列workQueue的任务还是会被执行。调用线程池的shutdown()接口时,线程池由RUNNING >> SHUTDOWN
3.STOP
不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) >> STOP。
4.TIDYING
当所有的任务已终止,ctl变量记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN >> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP >> TIDYING。
5. TERMINATED
线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING >> TERMINATED。
以上状态需要明确其正确的含义,光看状态名可能会误解,从而达不到自己想要的效果。
大致了解了这些变量名的含义后开始看重点方法,execute(Runnable command),正是这个方法开启了线程任务的分发和执行之旅:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 这个ctl就是用来存储线程池状态和线程数的
*/
int c = ctl.get();
// workerCountOf(),获取线程池中的线程数。
// 如果当前线程数小于核心线程数,那么添加一个Worker来执行任务。
// 所以这里会开启一个新的线程,并给线程分配这个方法传入的runnable任务(command)
if (workerCountOf(c) < corePoolSize) {
// 如果提交任务成功,代表线程池已经接到了任务,这个时候直接return
if (addWorker(command, true))
return;
// 如果提交任务失败,再次获取ctl的值
c = ctl.get();
}
// 走到这里有两种情况:
// 1.当前线程数>=corePoolSize,核心线程数满了,还可以尝试添加非核心线程
// 2.上面addWorker提交任务失败了
// 如果线程池处于RUNNING状态,将runnable任务加入到任务队列workQueue中, workQueue.offer(command)添加成功返回true
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取ctl的值
int recheck = ctl.get();
// 如果线程池不处于RUNNING状态,那么移除这个已入队的任务,执行对应的拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程是处于RUNNING状态,并且当前线程池中的线程数为0,开启一个新的线程,这时候创建的是非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果队列已满,执行addWorker尝试创建新的线程,
// 如果成功,说明当前线程数<maximumPoolSize。
// 如果失败,说明当前线程数已达到maximumPoolSize,需要执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 这个方法会创建线程并且执行任务
// 以下几种情况这个方法会返回false:
// 1.传入的core这个参数为true,代表此时要创建的是核心线程,线程数的上限为corePoolSize, 如果当前线程数已达到corePoolSize,返回false
// 2.传入的core这个参数为false,代表此时要创建的是非核心线程,线程数的上限为maximumPoolSize,如果当前线程数已达到maximumPoolSize,返回false
// 3.线程池stopped或者shutdown
// 4.使用ThreadFactory创建线程失败,或者ThreadFactory返回的线程为null
// 5.或者线程启动出现异常
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 这个rs就是线程池的状态
int rs = runStateOf(c);
// 这里的if说的是以下3种情况,直接返回false,不会创建新的线程:
// 1.rs大于SHUTDOWN,说明线程状态是STOP,TIDYING, 或者TERMINATED,
// 这几种状态下,不接受新的任务,并且会中断正在执行的任务。所以直接返回false
// 2.线程池状态处于SHUTDOWN,并且firstTask!=null。
// 因为SHUTDOWN状态下,是不接收新的任务的。所以返回false。
// 3.线程池处于SHUTDOWN并且firstTask为null,但是workQueue是空的。
// 因为SHUTDOWN虽然不接收新的任务,但是已经进入workQueue的任务还是要执行的,
// 恰巧workQueue中没有任务。所以也是返回false,不需要创建线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { // 注意:这里是个for循环
// 获取线程池中线程的数量
int wc = workerCountOf(c);
// 这里传入的core为true代表线程数上限为corePoolSize,
// false代表线程数上限为maximumPoolSize,如果线程数超出上限,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用compareAndIncrementWorkerCount()对线程计数+1,如果成功,说明已经满足创建线程的条件了,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果上面的compareAndIncrementWorkerCount()失败,说明有并发,再次获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果线程池的状态发生了变化,例如线程池已经关闭了,
// 导致的compareAndIncrementWorkerCount()失败,那么回到外层的for循环(retry)
// 否则,说明是正常的compareAndIncrementWorkerCount()失败,这个时候进入里面的循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 能到这里,说明已经做好创建线程的准备了
// worker是否已经启动的标志位
boolean workerStarted = false;
// 我们前面说了workers这个HashSet用于存储线程池中的所有线程,
// 所以这个变量是代表当前worker是否已经存放到workers这个HashSet中
boolean workerAdded = false;
Worker w = null;
try {
// 传入firstTask这个任务构造一个Worker
w = new Worker(firstTask);
// Worker的构造方法中会使用ThreadFactory创建新的线程,
// 所以这里可以直接获取到对应的线程
final Thread t = w.thread;
// 如果创建线程成功
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取线程池的全局锁,下面涉及线程池的操作都需要在持有全局锁的前提下进行
mainLock.lock();
try {
// 获取线程池的状态
int rs = runStateOf(ctl.get());
// 如果rs<SHUTDOWN,说明线程池处于RUNNING状态
// 或者 线程池处于SHUTDOWN状态并且没有新的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将包装线程的worker加入到workers这个HashSet中
workers.add(w);
int s = workers.size();
// 我们前面说了,largestPoolSize记录的是线程池中线程数曾经到达的最大值
// 线程池中worker的数量是会变化的,所以记录下worker数的最大值
if (s > largestPoolSize)
largestPoolSize = s;
// 修改标志,代表当前worker已经加入到workers这个HashSet中
workerAdded = true;
}
} finally {
// 释放全局锁
mainLock.unlock();
}
// 如果worker添加成功,启动线程执行任务
if (workerAdded) {
// 启动线程,这里才是真正的启动线程动作!!!!!
t.start();
// 代表worker已经启动
workerStarted = true;
}
}
} finally {
// 如果线程没有启动,这里还需要进行一些清理工作
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否成功启动
return workerStarted;
}
// 这个方法做下面几件事:
// 1.将worker从workers中移除
// 2.worker的数量-1
// 3.检查termination
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 要操作workers这个HashSet,先获取java线程池全局锁
mainLock.lock();
try {
if (w != null)
// 从worker中移除
workers.remove(w);
// WorkerCount -1
decrementWorkerCount();
// 处理TERMINATED状态
tryTerminate();
} finally {
mainLock.unlock();
}
}
上面的代码引入了一个重要的类,Worker类,这个类是线程池维护线程实例的,线程就封装在这个类里面,上面代码启动了线程,就会执行thread的:
/**
* 继承了AbstractQueuedSynchronizer(是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。这里不详谈)同时实现了Runnable,看到这里我是有点懵圈的,因为他还有个Thread类型的成员变量,那才是真正执行任务的线程,那它搞出这么多Runnable来干嘛呢?而且线程池接受的任务也是Runnable,说实话我当时有点被绕晕,越是这时候越要挺住,真相往往就在眼前
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 这个才是真正执行任务的线程 ,我自己一直被Runnable这个接口类蛊惑了,一看到这个就联想到线程,但其实不是,这只是个带run()方法的接口,这里的Worker和execute(Runnable runnable)方法传进来的任务,都只是实现了Runnable的封装而已*/
final Thread thread;
/** 每个线程要执行的第一个任务,这个值可以为null,线程会在BlokingQueue队列里中获取任务来执行 */
Runnable firstTask;
/** 记录每个线程已完成的任务数 */
volatile long completedTasks;
/**
* 这个构造方法传入这个线程第一个要执行的任务,当然也可以传入null
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// firstTask赋值
this.firstTask = firstTask;
// 通过ThreadFactory工厂创建线程,注意:这里传入的Runnable是this,代表当前的Worker对象,所以上面addWorker()方法里最后执行t.start()启动线程的时候,会执行worker里的run()方法。
// Worker实现了Runnable所以执行任务的时候最终会调用Worker.run()
this.thread = getThreadFactory().newThread(this);
}
/** 实现了Run方法,里面会调用runWorker */
public void run() {
runWorker(this);
}
// 下面这些方法都是Worker对AQS同步控制的实现了,要获取线程的执行权,需要先获取独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
通过上面分析知道,任务最终会在Worker里的run()方法里执行,而run()里调用的是 runWorker(this),接下来继续看看这个方法具体怎么做的:
public void run() {
// 这里调用的runWorker方法
runWorker(this);
}
// 这里就是执行任务的代码了,有一个while循环不断从队列中取出任务并执行,正是这个循环,保证了线程的复用性,只要外面添加进来任务,都会在Worker里维护的线程里执行。
// 退出循环的条件是获取不到要执行的任务
final void runWorker(Worker w) {
// 当前线程
Thread wt = Thread.currentThread();
// 前面说了new Worker的时候可以指定firstTask,代表Worker的第一个任务
Runnable task = w.firstTask;
// 这一步就已经将firstTask置为null了
w.firstTask = null;
// 释放Worker的独占锁,这里它释放锁的操作一定会成功,也就是将AQS中state设置为0
w.unlock(); // allow interrupts
// completedAbruptly这个标志位代表当前Worker是否因为执行任务出现异常而停止的
boolean completedAbruptly = true;
try {
// while循环;如果firstTask不为null那就直接执行firstTask,
// 否则就要调用getTask()从队列中获取队列,这是个很重要的方法。
// 也就是说Worker的第一个任务是不需要从队列中获取的
while (task != null || (task = getTask()) != null) {
// 给这个worker上独占锁
// Worker加锁的意义在于,在线程池的其他方法中可能会中断Worker,
// 为了保证Worker安全的完成任务,必须要在获取到锁的情况下才能中断Worker,
// 如tryTerminate(),shutdown()等都会关闭worker。
w.lock();
// 如果ctl的值大于等于STOP,说明线程池的状态是STOP,TIDYING或TERMINATED。
// 这个时候需要确保该线程已中断,否则就应该确保线程没有中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 其实这里只是设置线程的中断状态
wt.interrupt();
try {
// 这个方法留给子类去实现的
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正执行任务的地方
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置处理,留给子类去实现的
afterExecute(task, thrown);
}
} finally {
// 最后将task置为null,准备接受下一个任务
task = null;
// 这个worker已完成任务数+1
w.completedTasks++;
// 释放独占锁
w.unlock();
}
}
// 到这一步说明没抛出异常
completedAbruptly = false;
} finally {
// 执行到这里说明:要么队列中已经没有任务了,要么执行任务出现了异常。
// 这个时候需要调用processWorkerExit关闭闲置线程
processWorkerExit(w, completedAbruptly);
}
}
看看获取任务的方法getTask():
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//又是个循环,在关于线程调度的很多源码里都能看到这种死循环,要特别注意它的使用
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 不是running状态并且(大于或等于stop的状态或者工作队列为空了)返回null, 跳出循环
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//计数减一
return null;
}
int wc = workerCountOf(c);
// timed 的意思可以理解为 具备超时的条件
//核心线程允许超时或者当前线程数大于核心线程数(说明多出来的是非核心线程)就为true
//其实就是在这区分出了核心线程和非核心线程,本身worker并没有标记核心线程的字段,线程数
//大于核心线程的时候,超时机制筛选下留下来的就是核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(当前线程数大于线程池线程最大值||超时了)&&(当前线程数大于1||任务队列为空)则返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//防止并发,计数减一
return null;
continue;
}
//下面的代码就是超时机制了,workQueue的poll()方法有个入参就是超时时间,如果在时间内没有
//拿到值,则会返回空,take()方法则没有超时机制,直接拿
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//为空则超时了,然后继续循环,如果满足闲置线程回收条件上面的代码就会返回null,否则就一直阻塞循环
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
再来看看他是怎么销毁线程的:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 异常导致的销毁,要在这里做计数减一
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从线程列表里移除,之后会被垃圾回收机制回收,这就是清理超时空闲线程的操作
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池,因为可能没有任务了
tryTerminate();
//再次获取线程池的状态
int c = ctl.get();
//如果此时线程池是RUNNING和SHUTDOWM
if (runStateLessThan(c, STOP)) {
//并且不是异常进来这个方法的
if (!completedAbruptly) {
//如果核心线程也会超时回收那么min为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果为0且任务列表不为空,则置为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果此时的线程数不小于min ,则不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//否则会新增一个线程来继续执行任务
addWorker(null, false);
}
}
继续看tryTerminate()方法,这个方法也很重要,在切换线程池状态的很多地方都有用到:
//方法名很有意思,尝试终止
final void tryTerminate() {
//for循环!!!
for (;;) {
int c = ctl.get();
//线程池状态是RUNNING || 状线程池态是TIDYING或SHUTDOWN ||状线程池态是SHUTDOWN并且
//任务列表不为空,这几种情况就不继续走下去了,返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//线程池线程数量不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个线程就退出循环,这个方法也要重点看
interruptIdleWorkers(ONLY_ONE);
return;
}
//拿到全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//尝试设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//成功则调用terminated(),是个空方法,给子类实现
terminated();
} finally {
//设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
// 这个方法其实就是用来中断正在等待任务的线程,方法名直译:中断空闲线程
// 注意这里说的中断其实也只是将线程的状态置为“中断”,并不是说线程在这里就真的停止了
// 如果onlyOne为true,这里最多会关闭一个worker,因为shutdown()方法需要中断所有的worker,
// 这里中断一个worker能够帮助shutdown迅速的完成,而不用等待一些还在等待任务的worker结束
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
// 一样要获取全局锁
mainLock.lock();
try {
// 遍历所有的Worker,如果传入的onlyOne为true,那最多会终止一个Worker。
// 如果传入的onlyOne为false,终止所有的Worker
for (Worker w : workers) {
Thread t = w.thread;
// 这里要获取到worker的独占锁后才能够中断线程
//这个锁在线程拿到task执行的时候会获取,所以他无法中断正在执行任务的线程
if (!t.isInterrupted() && w.tryLock()) {
try {
//Thread的中断方法interrupt()只是做了个中断标记,真正相应中断的地方是在BlockingQueue
//的poll()和take()方法,他们会抛出中断异常来达到中断的目的。
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
至此线程池的正常使用方法就走完了,当然还有其他的一些辅助性的知识点这里没有详细讲,有兴趣的小伙伴可以自行再深究一下。这时候回过头来看看线程池的优点:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。
是不是就更能理解其背后意义了呢。
部分参考CSDN博主「Epoch-Elysian」的文章,原文链接:https://blog.csdn.net/Epoch_Elysian/article/details/107282186
网友评论