引入
来着于一个实际场景案例中遇到的问题:
“某应用中的几个节点,jstack查看有大量的timed_wait worker线程,阻塞在任务队列poll()。说明没有任务分配给这些worker,但是为啥这些worker不释放。是不是springboot配置出了问题?minSpareThreads默认10(corePoolSize也是这个数),整个系统最空闲的时候应该只有10个worker线程处于WAITING状态。那这个“非繁忙时段有121个TIMED_WAIT状态的worker线程”是咋回事?
附带目前设置:
acceptCount=200 maxThreads=800 connectionTimeout、keepAliveTimeout都是默认的60s
也即,还没有完全搞清楚tomcat线程池是如何判定idle的(判定池中一个线程为空闲线程的标准),以及如何对超时的idle线程进行关闭的。”
分析
为了搞清楚上面这个问题,我们需要深入分析tomcat的线程池、任务队列、任务封装,底层的jdk线程池的实现,以及这两个线程池的区别与联系。如果对线程池还没一点认知的话,可以先看看《动手实现一个简单的线程池来理解其原理》这篇文章对线程池有个最简单的直观认识。有了这一点概念之后,开始分析之旅。
tomcat的worker线程池总的来说是基于jdk线程池ThreadPoolExecutor的,线程池里边用到的同步阻塞队列使用的是基于LinkedBlockingQueue但优化了的TaskQueue
tomcat线程池的创建
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
任务队列TaskQueue
作为任务队列,专门设计用于与线程池Executor一起运行。任务队列经过优化,以正确利用线程池Executor中的线程。如果使用普通队列,当存在空闲线程时,Executor将生成线程,并且您无法将item force到队列本身。
@Override
public boolean offer(Runnable o) {
//we can't do any checks 线程池为空,啥也判断不了
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
//达到最大线程数了,只能无奈的先放队列试试了,排队等着呗
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
//如果提交的执行中的任务数<池中线程数,说明有空闲线程,很安心的放队列里、马上就会被执行了。
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
//如果线程数还没达到最大,那么返回false,意思是接着创建一个新线程
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
//如果到这了,说明前面的情况都不满足,那么就入队。
return super.offer(o);
}
jdk线程池的构造方法
org.apache.tomcat.util.threads tomcat线程池基于jdk ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心常驻线程数
int maximumPoolSize, //最大允许线程数
long keepAliveTime, //超过coreSize的线程最多可以空闲时间,之后会被终止
TimeUnit unit, //keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue, //基于LinkedBlockingQueue但经过优化的任务队列
ThreadFactory threadFactory //线程工厂
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();
}
corePoolSize the number of threads to keep in the pool, evenif they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize the maximum number of threads to allow in thepool
keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit the time unit for the keepAliveTime argument
workQueue the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
threadFactory the factory to use when the executor creates a new thread
handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
请求任务如何提交给tomcat线程池
是Poller将SocketProcessor提交给executor.executr()的。
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
其中sc是SocketProcessorBase<S> implements Runnable
从上面可以看出,构造好线程池之后,执行runnable从是execute()方法开始:
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread.
If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current {@code RejectedExecutionHandler}.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
execute中的处理共分3步:
-
If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn't, by returning false. -
If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none. -
If we cannot queue task, then we try to add a new
thread. If it fails, we know we are shut down or saturated
and so reject the task.
addWorker是worker启动的方法,启动以后执行runWorker方法,这个方法就是线程池中每个线程的核心自旋loop,循环得到firstTask或者从getTask()得到的task,然后执行task.run(),当task != null || (task = getTask()) != null
条件不满足时将跳出loop并执行processWorkerExit(w, completedAbruptly)
然后线程就tryTerminate()
结束了。代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法,这个方法返回不为null时,worker线程的自旋loop才能继续,也是这个方法从同步阻塞的任务队列拿任务的:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling? 当前worker线程是否应该最终被释放
//超出corePoolSize了或者指定了core线程也不常驻的情况下为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超过core线程数了
workQueue.take(); //不超过core线程数的时候一直阻塞等着
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
至此,我们场景案例里问题的答案就在这里了!当timed=true也就是线程数超过corePoolSize的时候、线程执行完上一次任务后从队列里阻塞等待poll最多keepAliveTime时间仍没有拿到任务时,将会timeout=true,下一次for循环会走到如下代码块:
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
这个if条件里(timed && timedOut)
为true,workerCount>1,所以workQueue.isEmpty()
也不影响最终结果,都是true,执行compareAndDecrementWorkerCount(c)
,返回null,前面的runWoker里的自旋Loop就跳出了,线程terminate结束掉。
问题的答案
现在回答一下案例里的问题,如果有超出corePoolSize数量的线程存在,通过jstack查看还都处于TIMED_WAIT状态在workQueue.poll方法上,那么根据上面的分析,每个这样的线程一定是在60秒之内都从队列里拿到了一个任务来执行,不然早被结束掉了。也就是说tomcat还是有一定的请求量的,1分钟里121个线程都至少执行了一个task。
我们需要做什么吗?
要根据tomcat的确切的请求量来决定,如果是每个线程一刻不停的在不断的去队列里边获取任务执行(只不过jstack观察的时候刚好看起来这些woker很空闲),那么可以保持现状,等请求量下去了线程数也就跟着下去了。如果是每个线程较为空闲,比如刚好真的1分钟只执行了1两次任务,那么其实可以减少maximumPoolSize、比如只设置默认的200个,或者通过配置tomcat线程池的keepAliveTime、比如改为30s,加快释放空闲线程。
网友评论