线程池源码分析
在线程池执行任务的时候,都是主线程(可能是tomcat线程),将任务放到线程池中,再执行线程池的execute方法,那我们就从execute入手分析线程池的源码.因为是多线程,无法能分析所有的情况,只能顺着主线程分析源码
public void execute(Runnable command) {
//如果传进来的为null抛出异常
if (command == null)
throw new NullPointerException();
//获取当先正在运行的线程数
int c = ctl.get();
//如果小于核心线程数,这里可以看出如果线程池中有闲着的线程,但是每次都是先走这段逻辑,也就是不管是否有空余线程,也是会创建一个新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果大于核心线程数,并且线程池是Running状态,并尝试将任务放到阻塞队列中,如果成功则进入if条件
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//创建非核心线程,false就是非核心线程的意思,如果创建失败则进入到拒绝策略
else if (!addWorker(command, false))
reject(command);
}
主要就是addwork方法,在介绍addword方法之前,先介绍一下Worker的构造函数, Worker实现了Runnable接口
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;
//线程工厂类创建线程的时候,是将Worker本身传进来,而不是firstTask任务
this.thread = getThreadFactory().newThread(this); }
下面分析addWorker方法,firstTask是我们要执行的任务,core代表是否为核心线程,true为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
//这段for代码的主要意思就是,先判断线程池的状态,在判断数量是否已经超过了 CAPACITY 极限或者根据
//core为true/false来判断是否超过了核心线程或者超过了最大线程数,如果超过则返回false,如果没有超过,则将
//ctl利用cas加一就是线程数加一,cas一般都是配合死循环使用,然后跳出for循环,因为这里还没有创建线程执行
//任务就先加一,所以在后续的代码中执行错误的话,会在finally的addWorkerFailed方法,将ctl减一
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//下面是创建线程执行任务的代码
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker,
w = new Worker(firstTask);
//这里的t的Runnable为w本身
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//这里的t.start()执行的是w本身,执行Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
执行到了t.start()方法,就是执行Worker的run方法,最终执行到了runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//真正的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//下面就是最核心的方法,通过一个while循环达到线程的重用,在最后的finally中task赋值为null,
//task != null为false,那么跳出循环的条件主要集中在(task = getTask()) != null,如果为null则跳出while循环,
//当while循环跳出的时候,线程执行结束, 还要经过runWorker的最后一个方法processWorkerExit(w, completedAbruptly);
//最后决定是否线程执行结束,再等待被GC回收,getTask是从队列中获取的任务,后续会分析到,
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
//出现异常继续抛出
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
//执行任务后将task赋值为null
task = null;
//任务完成的总数
w.completedTasks++;
w.unlock();
}
}
//为flase和true在下面的processWorkerExit方法中使用到,是线程回收还是放到线程池的首要条件,
//如果在task.run()出现异常时,没有catch捕获,这段代码是不会执行,
//执行下一个finally通过processWorkerExit方法中的 addWorker(null, false);
//添加空任务的worker,就是添加空闲线程,等待从队列中获取任务
completedAbruptly = false;
} finally {
//推出while寻呼那后,执行该方法,看是否还能留在线程池中.
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//查看线程池中线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//是否允许回收线程,大于corePoolSize好理解,就是多余核心线程是要回收的, allowCoreThreadTimeOut的意
//思是是否回收核心线程,默认为false,可以手动设置为true,如果为true,就是不管是否线程数大于核心线程
//数,timed都为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize || (timed && timedOut)主要看timeOut是否为true, wc > maximumPoolSize基本都
//为false
if ((wc > maximumPoolSize || (timed && timedOut))
//wc是否大于1, workQueue.isEmpty()队列是否为空,如果队列为空没有执行的任务时
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
//返回null的时候,在上面的runWorker方法的while循环就会跳出,则线程执行结束,还要经过runWorker的最后一
//个方法processWorkerExit(w, completedAbruptly);最后决定是否线程执行结束,再等待GC的回收,线程也就销
//毁了,核心线程和非核心线程都在while循环中,而返回null的线程是不确定的,就是被回收的线程可能是核心线程,也可能是非核心
//线程
return null;
continue;
}
try {
//如果timed为true执行poll,从队列中获取任务是有时间要求的,如果超过规定时间则返回null
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//一直阻塞,直到有任务
workQueue.take();
//为null,则是在执行poll的时候会返回null
if (r != null)
return r;
//返回null时timedOut为true,就回到了上面说的判断(timed && timedOut)如果timed为true,则结果就返回true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//将ctl减一,线程池的线程数减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//已完成的线程数加一
completedTaskCount += w.completedTasks;
//任务从workers去除
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//当任务发生异常的时候completedAbruptly为true,跳过判断
if (!completedAbruptly) {
//如果允许回收核心线程,min为0,否则win为核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果队列不为空,min还为0 赋值为1,就是队列中有任务的时候,至少保证有一个线程在线程池中
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果大于了,就return 这回就线程就真的结束了,等待被GC回收
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加没有任务的worker,就是将线程重新放到线程池中,最终是否真的放进去,还要经过addWorker方法的一系列操作
addWorker(null, false);
}
}
线程池的几种状态
https://blog.csdn.net/qq_36031640/article/details/91448439
网友评论