线程池是java中的重要知识点,今天研究下,首先来看下线程池是怎么使用的,然后在使用的基础上再进行原理剖析:
public class MyTask implements Runnable{
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
上面首先创建一个线程子类,线程池就是首先预创建指定的数量,当我们需要执行某个逻辑的时候,把这些逻辑封装到一个Runnable对象里面,然后丢给线程池,线程池就会自动去执行,所以首先要创建一个线程子类,封装我们的业务逻辑。
public class Test {
public static void main(String[] args) {
//创建一个线程池:第一个参数表示核心线程数;第二个参数表示最大线程数
//第三个参数表示线程存活的时间;第四个参数是存活的时间单位;最后一个
//数表示一个任务队列,当没有足够线程去执行任务时,任务将被添加到队列
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5));
//循环将一个个Runnable对象丢给线程池去执行
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
pool.execute(myTask);
System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+
pool.getQueue().size()+",已执行完的任务数目:" + pool.getCompletedTaskCount());
}
//关闭线程池
pool.shutdown();
}
}
上面首先创建一个线程池对象pool,然后丢给线程池对象pool的execute方法区执行,输出结果如下:
//核心线程执行任务
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完的任务数目:0
正在执行task 4
//一共就5个核心线程,已经用完了,别的任务只能去队列里面等待了
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完的任务数目:0
//队列的容量是5,现在已经满了,所以要创建新的非核心线
//程执行任务;队列不满,是不会创建非核心线程去执行的
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完的任务数目:0
//非核心线程执行任务
正在执行task 10
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 11
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 12
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 13
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完的任务数目:0
正在执行task 14
task 1执行完毕
task 13执行完毕
task 2执行完毕
正在执行task 6
task 10执行完毕
正在执行task 8
task 11执行完毕
task 4执行完毕
task 3执行完毕
正在执行task 9
正在执行task 7
task 0执行完毕
正在执行task 5
task 14执行完毕
task 12执行完毕
通过上面的例子可以看出,简单的使用线程池还是很方便的,下面从线程类ThreadPoolExecutor开始剖析线程池的原理,先从ThreadPoolExecutor的类信息开始看:
public class ThreadPoolExecutor extends AbstractExecutorService {
//原子整数,他是一个复合型数据,低29位表示线程池中活动的线程数量,
//用workerCount表示;高3位代表线程池运行状态:RUNNING、
//SHUTDOWN、STOP、TIDYING和TERMINATED,用runState表示
//这个变量特别重要,后面很多操作都要获取他才能决定下一步
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程池中活动的线程数量workerCount占的位数:32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池中获得的线程的最大数量,2的29次幂 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//线程池状态,RUNNING表示正在运行,接受新任务并处理队列中的任务;-1在底层
//由32个1表示,左移29位就是111 00000 00000000 00000000 00000000
//也就是低29位全部为0;高3位全部为1的话,表示SHUTDOWN状态
private static final int RUNNING = -1 << COUNT_BITS;
//线程池状态,不接受新任务,但是会处理队列里面的任务。0不管怎么
//左移都是0所以低29位是0,高3位全部是0的话,表示SHUTDOWN状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
//线程池状态,不接受新任务,不会处理队列里面的任务;而且会中断掉正在处理的任务;
//1由31个0和1个1组成,左移29位就是001 00000 00000000 00000000 00000000,
//也就是低29位全部是0的,高3位是001的话,表示STOP状态
private static final int STOP = 1 << COUNT_BITS;
//线程池状态,表示所有任务已经结束,workerCount为0,线程池过渡到TIDYING
//状态;2在底层由30个0和一个10组成,左移29位就是010 00000 00000000
// 00000000 00000000,也就是低29位全部为0,高3位为010就是TIDYING状态
private static final int TIDYING = 2 << COUNT_BITS;
//线程池状态,表示terminated()方法已经完成;3在底层由30个0和一个11组成
//左移29位就是011 00000 00000000 00000000 00000000,也就是低29位
//为0,高3位是011的话,线程池此时是TERMINATED状态
private static final int TERMINATED = 3 << COUNT_BITS;
//任务队列,当线程池的核心线程不够用时,新添加
//的任务将会被放入此队列,待以后执行这些任务
private final BlockingQueue<Runnable> workQueue;
//重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
//线程池中的运行单元是不是就是他呢?
private final HashSet<Worker> workers = new HashSet<Worker>();
//一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
//线程池中的运行单元是不是就是他呢?
private final HashSet<Worker> workers = new HashSet<Worker>();
//线程池中曾经创建过的最大的线程数量
private int largestPoolSize;
//任务已经完成的数量
private long completedTaskCount;
//线程工厂类,创建线程用的
private volatile ThreadFactory threadFactory;
//拒绝策略。当任务队列已满,而且线程数量达到
//最大值时,如果还添加任务,就会采取此拒绝策略
private volatile RejectedExecutionHandler handler;
//空闲线程存活的时间,时间一到,直接终止
private volatile long keepAliveTime;
//是否终止处于空闲的核心线程,如果是true,时间到了就终止核心线程,否则不终止
private volatile boolean allowCoreThreadTimeOut;
//核心线程数量
private volatile int corePoolSize;
//此线程池最大的线程数,创建线程池的时候手动指定
private volatile int maximumPoolSize;
//此线程池最大的线程数,创建线程池的时候手动指定
private volatile int maximumPoolSize;
}
上面就是ThreadPoolExecutor类的一些成员变量。下面分析下他的构造函数,ThreadPoolExecutor类有4个构造函数,3个构造函数最终调的是第4个,所以直接来看第四个构造函数:
/**
* 参数解析
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 空闲线程存活的时间
* @param unit 上面那个时间的单位
* @param workQueue 任务队列
* @param threadFactory 线程工厂对象
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//一波判断,各个参数必须要在正常,不能残疾
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//一波赋值,不解释
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池里面有两个线程非常重要,一个是核心线程corePoolSize,另一是最大线程maximumPoolSize;当有任务到来时,先用核心线程去执行;如果核心线程用完了,那么任务放入队列里面存放起来,此时就算设置了非核心线程,非核心线程也是懒得动的;当队列满了以后,才会起非核心线程去执行任务(非核心线程有点飘)。其执行流程如下:
线程池执行流程
下面就来看下线程池执行的方法execute吧:
public void execute(Runnable command) {
//非空判断,不解释
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//获取原子整数的值,这个整数聚合了当前活动的线程总数和线程池状态
int c = ctl.get();
//如果核心线程没有用完,那么用核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
//addWorker就是创建一个Worker对象来执行此任务,众多
//的worker是放在集合里面统一管理的,如果新建的worker
//成功加入此集合,那么说明addWorker成功;注意addWorker
//成功并不代表此任务就执行成功了。Worker对象是Runnable
//的子类,里面有个Thread的成员变量,我们传进来的任务就是
//由这个变量来运行,所以Worker是任务执行的载体,很重要
if (addWorker(command, true))
return;
//addWorker失败,说明worker创建失败,或者
//没有添加进集合里面,这里再次获取线程池的信息
c = ctl.get();
}
//如果线程池处于运行状态,而且任务成功被添加进队列,重新检查。因
//为在判断线程池状态和添加任务之后,线程池的状态可能再次发生变化
//workQueue.offer就是把我们传进来的任务添加进任务队列
if (isRunning(c) && workQueue.offer(command)) {
//再次获取线程池状态
int recheck = ctl.get();
//若干线程池状态发生变化,不再运行了,
//那么从任务队列移除此任务,相当于回退操作
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//如果线程池正在运行,但是活跃线程数量为0,那么再addWorker一次
else if (workerCountOf(recheck) == 0)
//这里传入null的原因是目标任务在外层if已经添加进去了。注意
//最后一个参数,没有核心线程了,说明只能用非核心线程执行任务
addWorker(null, false);
}
//如果添加队列失败,那么手动新增线程去执行;如果还是失败
//说明线程池挂了或者处于饱和状态,没得救了,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
execute就是这样了,调用addWorker去新增一个Worker对象,然后通过此对象来运行我们的任务,所以先简要分析下Worker类,他是ThreadPoolExecutor的内部类:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置同步状态,-1代表在调用runWorker前禁止中断
setState(-1); // inhibit interrupts until runWorker
//给成员变量赋值,这个值可能是空,如果是空
//,就从任务队列里面获取(当然不是在这里获取)
this.firstTask = firstTask;
//构建Thread对象,最终就是他去执行我们传进来的任务
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//这里才是执行任务的地方
public void run() {
runWorker(this);
}
......
}
Worker本身不是很难,可以简单的理解成我们常说的线程线程,简单了解下即可,下面研究重要的方法addWorker:
//根据线程池当前的状态和边界值(核心线程数和最大线程数)来就检查一个Worker是否
//可以添加到线程池。如果可以,workerCount将会增加;如果有可能,此Worker将被
//创建和运行传进来的任务。如果线程池挂了,或者线程数量超出边界值,此方法将会返
//回false;如果线程工厂创建线程失败,那么也会返回false,然后回退
private boolean addWorker(Runnable firstTask, boolean core) {
//for循环的标记位
retry:
//双重for循环,外层for循环用来判断线程池
//状态;内层for循环用来增加线程数的CAS操作
for (;;) {
//获取线程池信息
int c = ctl.get();
//获取线程池活动运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池当前状态大于等于SHUTDOWN,也就是SHUTDOWN
// 、STOP、TIDYING和TERMINATED其中之一,那么继续后面的
// 判断,后面的判断不好理解,主要在于外层有个取反操作,可以
// 转换如下:(rs!=SHUTDOWN || firstTask!=null ||
// workQueue.isEmpty())。分析:1.如果线程池状态大于SHUTDOWN
//,说明不宜接受新任务,也不会处理队列里面的任务,返回false;2.如
//果传进来的firstTask不为空,说明是添加新任务,此时不会处理,返回
//false,注意firstTask是可以为空的,为空代表着新起线程执行队列里面
//的任务;workQueue.isEmpty()代表没有等待的任务了,此时线程池的状
//是大于等于SHUTDOWN的,当然不会新起线程去执行任务了。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程数量
int wc = workerCountOf(c);
//如果线程数量超出边界,那么返回false,Worker添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果没有超出边界,原子操作增加线程数量
if (compareAndIncrementWorkerCount(c))
//增加完成后,跳出外层for循环,添加worker
//成功,addWorker方法成功了一半,可喜可贺
break retry;
//如果自增线程数量失败,再次获取线程池运行时信息
c = ctl.get(); // Re-read ctl
//如果当前的状态不等于之前的状态,跳出内层循环,执行外循环,再次判断
//状态;因为执行下次循环的时候,线程池的状态可能会发生变化。反正这里
//就是往死里去增加线程池记录的线程数量,不成功誓不罢休,除非超出边界了
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// CAS操作失败要么是因为线程池状态改变了,要么是因为workCount改变了
// ,如果workCount改变了,说明存在竞争,继续内循环自增workerCount
}
}
//流程执行到这里,说明活跃的线程数已经成功自增
//定义两个变量,
boolean workerStarted = false;
boolean workerAdded = false;
//线程池中的一个线程对象就是一个Worker对象
Worker w = null;
try {
//构建一个Worker对象
w = new Worker(firstTask);
//拿到Worker对象里面的线程类
final Thread t = w.thread;
//如果线程不为空
if (t != null) {
//重入锁
final ReentrantLock mainLock = this.mainLock;
//先锁起来
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//拿到锁有重新检查线程池状态
int rs = runStateOf(ctl.get());
//如果线程池处于运行状态,或者线程池SHUTDOWN了,同时
//firstTask为空,说明要起线程去执行任务队列里面的任务
//,其他情况一律不执行。这也说明了SHUTDOWN状态下的线程
//池,是会去执行任务队列里面的任务的,SHUTDOWN比较厚道
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//检查刚起的线程是不是已经被执行了,如果执行了,死给你看
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers是一个HashSet,里面保存的是Worker对象
workers.add(w);
//HashSet的元素个数
int s = workers.size();
//如果HashSet里面的Worker数量大于曾经创建过
//的最大的线程数量,那么对这个数量进行重新赋值
if (s > largestPoolSize)
largestPoolSize = s;
//Worker对象添加成功
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果Worker添加成功,启动线程去执行
if (workerAdded) {
t.start();
//设置状态
workerStarted = true;
}
}
} finally {
//任务执行失败的话,把Worker从HashSet里面移除
if (! workerStarted)
//调用addWorkerFailed处理失败的后续操作
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法有点难。这个方法上来就是一个双重for死循环,外层for循环的作用是判断线程池的状态,如果当前线程池的状态大于SHUTDOWN,那么此线程池不接受新任务;如果大于等于SHUTDOWN,而且传进来的任务不为空(说明是新任务),那么也不接受;如果线程池状态大于等于SHUTDOWN,而且任务队列为空,没的说,也不接受;三种不接受都导致返回false,这就是外层for循环到的作用;一旦线程池状态校验通过,那么就进入第二层for循环,增加线程池活跃线程的数量,通过死循环,气而不馁的增加数量,除非数量超过线程池的边界,此时才返回false。如果线程池的线程数量自增成功,那么构建一个Worker对象,此时再次检查线程池状态,因为上面两个for循环是线程不安全的,一个线程通过了上面两个for循环检查状态和自增数量后,另外一个线程也可以去自增,此时可能导致线程池的状态发生变化,所以需要再次检查;这次检查通过后,将新建的Woker对象加入集合workers,接着就执行Worker对象里面的Thread的run方法;如果失败,调用addWorkerFailed。这段代码逻辑很简单,但是实现起来是比较复杂的。下面看下Worker的Thread的run方法是怎么执行的:
/** Delegates main run loop to outer runWorker */
//这里才是执行任务的地方
public void run() {
runWorker(this);
}
简单的调用runWorker,runWorker方法是一个核心方法,很难,下面尽量分析:
final void runWorker(Worker w) {
//拿到当前的线程,也就是调用runWorker方法的线程
Thread wt = Thread.currentThread();
//拿到Task
Runnable task = w.firstTask;
//将firstTask置空
w.firstTask = null;
//遥想当年创建Worker的时候设置了一个状态-1,代表
//不允许打断,这里设置成1代表可以打断
w.unlock(); // allow interrupts
//标记线程是不是异常终止的
boolean completedAbruptly = true;
try {
//如果Task不为空,说明用户传入了一个任务;如果为空,说
//明要从任务队列里面获取任务来执行,否则跳出while循环
while (task != null || (task = getTask()) != null) {
//加锁,目的是为了在线程池shutdown
//的时候,对于正在执行的任务不被中断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果线程池正在stop,那么要保证当前线程是中断状态
//如果不是,要保证当前先不是中断状态
//runStateAtLeast(ctl.get(), STOP)的作用就是拿到线程池
//的运行状态,然后和STOP比对,大于等于STOP就返回true;
//Thread.interrupted是设置调用runWorker方法的线程的状态为中断,仅仅是设置
//线程中断状态,不会真的中断运行runWorker方法的线程,除非runWorker方法的线程
//里面有sleep、wait或者join,此时就抛出InterruptedException异常
//!wt.isInterrupted()是指任务线程没有被中断
//所以if的作用是如果线程池的状态大于等于STOP,而且任务线程没有中断,那么给任务线程设置一个中断
//标记位;或者运行runWorker方法的线程别设置了中断位,而且线程池的状态大于等于STOP,那么任务线程也要设置标记位
//否则就不会给任务线程设置标记位,这就是为什么线程池shutdown之后,任务还能运行的原因
if ((runStateAtLeast(ctl.get(), STOP)
|| (Thread.interrupted()
&& runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
//设置任务线程的中断状态,也就是说interrupt会发出中断信号,这个信号只能被wait()、sleep()和join()方法
//捕捉并产生中断。interrupt本身仅仅只会设置线程中断标记位,并不会真正产生中断
wt.interrupt();
try {
//任务执行前需要做的工作,默认空实现,可自己扩展
//beforeExecute可能会导致线程异常而死亡
beforeExecute(wt, task);
//创建一个Throwable对象,捕获异常用
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后需要做的工作,默认空实现,可自己扩展
//这里把抛出的异常都传给了afterExecute
afterExecute(task, thrown);
}
} finally {
//任务置空
task = null;
//完成的任务自增
w.completedTasks++;
//释放锁
w.unlock();
}
}
//不是异常中断的
completedAbruptly = false;
} finally {
//退出Worker
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法不太好理解,主要是在锁的那里理解的有偏差。这个方法的主要任务就是执行我们传进来的Runnable对象的run方法。接下来看下getTask方法:
//获取执行任务
private Runnable getTask() {
//上次从队列里面获取任务是否超时导致未能获取到任务
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//获取线程池运行时状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果线程池状态大于等于SHUTDOWN,任务队列是空的,那么返回null同时将
//workerCount自减(CAS操作),因为addWorker中已经将他自增了,所以这里要退货。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//拿到活跃线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
//allowCoreThreadTimeOut默认是false,代表核心线程是否有超时限制
//如果wc > corePoolSize,说明用到了非核心线程,此时就要有超时限制了
//timed的主要用于获取任务,如果有超时限制,那么调用poll方法获取任务
//在keepAliveTime时间内没有获取到任务,就继续下次循环获取任务;如果
//是false,那么调用take方法获取任务,此时线程就会卡在这,阻塞式获取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//这个判断分解后更好理解,他的判断依据是如果当前活跃线程数大于最大线程数,那么直接返回,不搞了;
//不管任务队列是不是空的,也不管wc到底是不是大于1,统统不玩了。如果获取任务有超时限制,而且上次
//获取任务还搞砸了,超时了,那么如果活跃线程数大于1,或者队列是空的,那么也不玩了;队列是空的时候
//不玩很好理解,那么如果允许超时,而且上次还超时了,那么也不玩了呢,应该是队列里面没有任务了吧(不确定)
if ( (wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()) ) {
//将活跃线程数自减
if (compareAndDecrementWorkerCount(c))
//返回空
return null;
//如果自减失败,继续for循环,因为下次for循环的时候,
//线程池的状态可能有所改变,如果没变,那么再次自减
continue;
}
try {
//允许超时,那么调用poll获取任务,否则调用take阻塞式获取,也就是会一直卡在这
//poll只在keepAliveTime这个时间内获取任务,如果获取不到,那么进行下一次循环
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//阻塞式方式获取,一直等到队列有任务
if (r != null)
//返回结果
return r;
//如果没有拿到任务,说明获取超时,修改状态,继续下次循环
timedOut = true;
} catch (InterruptedException retry) {
//如果被打算,那么继续下次循环
timedOut = false;
}
}
}
getTask方法也比较难,主要是if条件写的太简洁了,分析起来有点吃力。下面分析线程退出的时候线程池做了什么:
//处理线程的退出。两种场景,一种是由于异常而退出,一种是由于任务执行完毕而退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是异常退出,那么将线程池的线程数量自减
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//正常退出的话,那么将执行成功的任务自增
completedTaskCount += w.completedTasks;
//从集合里面删除此worker
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池,不一定能成功
tryTerminate();
//获取线程池运行时信息
int c = ctl.get();
//如果此时状态小于STOP
if (runStateLessThan(c, STOP)) {
//如果不是异常退出的话
if (!completedAbruptly) {
//计算核心线程的最小数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果没有核心线程了,但是任务队列里面还有任务,那么将min置为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果线程数量不小于最小值,那么退出,队
//列里面的任务就由现有的线程去负责执行
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//若干是异常退出的话,那么再增加一个非核心线程
//如果队列里面还有任务,但是线程数不够,也会走这里
addWorker(null, false);
}
}
这个方法还算比较简单,首先判断此线程是不是异常退出,是的话将线程数量自减。接着,不管是正常退出还是异常退出,都要将线程从worker集合里面移除掉;接着判断线程池状态,如果小于STOP,说明线程池此时正在正常运行,此时判断核心线程的最小数量,如果核心线程的最小数量为0,但是此时队列里面还有任务的话,就将核心线程的最少数量置成1;如果此时线程池的线程数量大于等于这个核心线程池的最小数量,那些队列里面的任务就让这些线程去执行吧。如果是异常退出的线程,或者此时线程池的线程数量小于核心线程的最小值,那么就需要添加一个线程进去。
下面分析线程池退出机制:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//安全检查,线程池不是谁都可以关掉的,必须具备相应的权限,不管
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲中的线程,注意,正在运行的线程是不会中断的
interruptIdleWorkers();
//shutdown后的回调,空实现,根据自己的需求扩展
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//最后尝试终止线程池
tryTerminate();
}
接着分析tryTerminate:
final void tryTerminate() {
//又是死循环,这么多死循环
for (;;) {
//拿到线程池的运行时信息
int c = ctl.get();
//如果正在运行,或者状态比TIDYING大,或者
//状态是SHUTDOWN但是队里里面还有任务,那么就不终止
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果活跃线程数不等于0,那么终止空闲的线程,此时就不能直接退出了,要返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//if里面的逻辑是将状态TIDYING和线程数0聚合在一起
//然后更新到ctl,如果状态是TIDYING而且线程数是0
//了,那么这个线程就没什么用了,可以退出了
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//空实现,根据自己的需要扩展
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
接下来看下空闲线程是怎么退出的:
private void interruptIdleWorkers(boolean onlyOne) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历Worker
for (Worker w : workers) {
//拿到线程
Thread t = w.thread;
//如果线程没有别中断,而且能够获取worker的锁,那么
//就将此线程的中断标记位置为true,如果获取不到,说明
//此worker正在执行,此时就不能中断这些线程了。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//如果终止空闲线程的操作只执行一次的话,那么就退出
//否则就往死里循环,一直到所有的执行任务的线程都空闲
//然后终止为止
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
自此,线程池的源码磕磕碰碰的分析了一遍,有些地方理解的不到位,待功力大增了再修改。
网友评论