首先看ThreadPoolExecutor的核心方法execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//workerCountOf(c) 就是当前线程数
if (addWorker(command, true))//步骤1
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//步骤2
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//这里注意为传入的task为null
}
else if (!addWorker(command, false))//步骤3
reject(command);
}
这个方法很简明,就是线程池的基本原理:
1.线程池数量小于核心线程池数量,则通过addWorker(稍后分析该方法)将任务加入线程池中,如果成功则返回。
2.如果步骤1返回失败,则看线程池是否在running状态,如果在则把任务送进等待队列。如果这一步成功,再检查一次线程池状态,如果线程池不是running状态并且当前任务从队列移除成功,则执行拒绝策略,否则如果worker数量等于0的话,则相当于新建一个线程。如果没有这个调用,当你把coreSize设置为0时,往线程池里添加任务,任务会被放在任务队列了,永远得不到执行。
3.如果addWoker失败,即超过了最大线程池数量,则执行拒绝策略。
BTW,线程池中的有个ctl变量,这个变量的定义是
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
道格老爷子用的这个变量的前3位定义线程池的状态,后29位作为worker的数量。
我们可以看出,整个流程的核心方法就是addWoker,在看它的代码前,先来看一看Worker这个类,该类定义如下
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
先看它的成员变量和构造方法:
Worker的变量中有thread和runnable,那么大概可以猜出Worker就是一个Thread的包装类,负责运行任务。
构造方法中,就使用线程池的ThreadFactory来new了线程。
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); //不希望在runWorker之前中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
看到这个类也继承了AQS,看一下实现的方法,使用的互斥模式,也就是很正常的并发访问控制。state=0代表解锁状态,state=1代表上锁状态。但是要注意的是这个并不是像ReentrantLock一样的重入锁。因为当执行interruptIdleWorkers时(shutdown等会调用),会获取Worker的锁,而我们不希望这时候Worker能获取锁中断线程,因为会增大线程管理和中断控制的难度。
再看为什么初始化Worker时要setState(-1),就是要避免开始执行之前的Worker不会被中断。那么什么时候会中断Worker的线程?就是调用shutdownNow时(shutdownNow不像shutdown需要提前获取worker的锁才能中断线程),里面会调用interruptIfStarted方法,判断了state>=0才会被中断(见下interruptIfStarted方法)。
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//state=-1不会被中断
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
ok,这个类也实现了Runnable接口,看下它的run方法,一个while循环,先运行自己传进来的任务,如果传进来的任务为null,则从队列里面取任务运行。代码中看到如果要走出这个循环的话,要么Worker的线程被中断,要么getTask=null。顺便一提的是,Worker本身不运行run,而是里面thread通过start运行这个方法。再进去看下getTask方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 因为Worker初始化state=-1,这里先设置为0,否则获取不了锁
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//getTask是从队列取出任务,取到就继续运行。
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//线程池shutdownNow后中断worker
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//运行自己的业务方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法就是不断从队列里面取任务。如果是核心线程,就一直取任务;如果是非核心线程,在keepAliveTime没取到任务就返回null。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//...省略代码
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //看到如果当前线程数大于coreSize,则启动从队列取任务时采用超时的方法取。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {//可以看到如果线程池只有一个线程,那么这个线程就算是非核心线程,也不会销毁的。
if (compareAndDecrementWorkerCount(c))
return null;//如果当前线程数大于coreSize,并且队列是空,返回null
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://这里终于看到keepAliveTime的作用了。
//如果是非核心线程,在keepAliveTime时间内没有任务进来,那么根据上面的runWorker方法,取到的值是null,那么就跳出循环,线程自动销毁。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
分析完Worker后,又回到addWorker方法。每一步的注释如下
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 省略代码...
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))//这里就是看是否是添加的核心线程池数的任务还是核心线程之外的任务。
return false;
// 省略代码...
}
}
//经过一系列校验后
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//这个就是线程池中的核心内部类。
final Thread t = w.thread;//取出new好的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());//线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {//再次检查线程池状态
if (t.isAlive()) //如果线程池SHUTDOWN或者RUNNING,但是线程被启动了,抛异常
throw new IllegalThreadStateException();
workers.add(w);//缓存worker
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//更新池中最大线程数量
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {如果添加成功
t.start();//最终调用runWorker的地方。
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//如果添加失败就会尝试关闭线程池。
}
return workerStarted;
}
总结:首先大致分析了线程池运行的基本流程,简单来说execute就是一直往队列中扔任务,创建好的Worker不断从中取任务运行。这里要注意的是,线程池在初始化时并没有将核心线程数的线程一起初始化,而是来一个任务,创建一个线程。还有就是当线程数超过核心线程数并且开始销毁多出核心线程数的线程时,有可能销毁的是在小于核心线程数时创建出来的旧线程。
然后是内部类Worker,这里Worker就是thread和task的一个包装类,它的职能就是控制中断和任务的运行。Worker是一个集成了AQS,实现了Runnable方法的内部类。Worker创建好后,通过new好的线程来运行任务。核心Worker通过while不断从队列中取出任务,任务队列为空线程就阻塞;非核心Worker也是通过while不断取任务,只是有个取任务时keepAliveTime的超时时间,在时间之内取不到的任务的话线程就跳出循环,自动销毁了。
网友评论