Tomcat 线程池

作者: 程序员札记 | 来源:发表于2022-03-14 08:30 被阅读0次

    tomcat的线程池扩展了jdk的executor,而且队列用的是自己的task queue,因此其策略与jdk的有所不同。JDK 原生线程池可以说功能比较完善,使用也比较简单,那为何 Tomcat/Jetty 却不选择这个方案,反而自己去动手实现那?

    image.png

    tomcat线程池策略

    image.png

    可以看到,tomcat 其实是分为两个线程池, 一个是acceptor 来接受外部的请求,一个是Executor 来执行请求。 为什么要分两个线程池,这个问题在前面已经讲过。 大家不理解可以看看前面的文章。

    Tomcat 作为一个老牌的 servlet 容器,处理多线程肯定得心应手,为了能保证多线程环境下的高效,必然使用了线程池。

    但是,Tomcat 并没有直接使用 j.u.c 里面的线程池,而是对线程池进行了扩展,首先我们回忆一下,j.u.c 中的线程池的几个核心参数是怎么配合的:

    • 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
    • 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
    • 如果 BlockingQueue 内的任务超过上限,则创建新的线程来处理任务。
    • 如果创建的线程超出 maximumPoolSize,任务将被拒绝策略拒绝。

    Tomcat 的线程池其实封装不厚重,只是对 jdk 线程池做了简单优化:

    • 任务执行失败时不会直接抛出错误,而是装回队列里再次尝试执行;
    • 当线程池没有达到最大执行线程的时候,会优先开线程再使用任务队列;
    • 扩展计数用于追踪任务的执行情况;
    • 将线程池融入 Catalina 的生命周期组件中。

    StandardThreadExecutor

    StandardThreadExecutor 是 Catalina 结构中的一部分,是 Tomcat 生命周期中的池化线程资源的封装。类总览:

    public class StandardThreadExecutor extends LifecycleMBeanBase
            implements Executor, ResizableExecutor {
    
        // 统一的 String 的管理类,用来防止编码等一系列的问题
        protected static final StringManager sm =
                StringManager.getManager(Constants.Package);
    
        // 创建出来的线程的优先级
        protected int threadPriority = Thread.NORM_PRIORITY;
    
        // 线程是否是守护线程,默认为是
        protected boolean daemon = true;
    
        // 线程名称
        protected String namePrefix = "tomcat-exec-";
    
        // 默认的最大线程数量
        protected int maxThreads = 200;
    
        // 默认的最小线程数量
        protected int minSpareThreads = 25;
    
        // 存在时间
        protected int maxIdleTime = 60000;
    
        // 真实工作的 ThreadPoolExecutor
        // 本质是 StandardThreadExecutor 只是 ThreadPoolExecutor 的装饰器
        // 此处的对象类型是 org.apache.tomcat.util.threads.ThreadPoolExecutor
        protected ThreadPoolExecutor executor = null;
    
        // 线程池名称
        protected String name;
    
        // 是否开启线程池最小线程数量,如果此处为 false 的话,minSpareThreads 就没有意义
        protected boolean prestartminSpareThreads = false;
    
        // 默认的任务队列长度
        protected int maxQueueSize = Integer.MAX_VALUE;
    
         // 重建线程的时间间隔
        protected long threadRenewalDelay =
            org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;
    
        // 任务队列
        private TaskQueue taskqueue = null;
        
        // 其它方法暂时忽略
    }
    

    生命周期

    /**
     * 初始化线程池
     */
    @Override
    protected void initInternal() throws LifecycleException {
        super.initInternal();
    }
    
    
    /**
     * 开始线程池
     */
    @Override
    protected void startInternal() throws LifecycleException {
    
        // 任务队列
        taskqueue = new TaskQueue(maxQueueSize);
        
        // 线程工厂
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        
        // 初始化线程池
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        
        // 重建线程的时间间隔
        executor.setThreadRenewalDelay(threadRenewalDelay);
        
        // 设置线程池最小线程数量
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        
        // 线程池任务队列的 parent
        taskqueue.setParent(executor);
    
        // 设置组件的生命周期状态
        setState(LifecycleState.STARTING);
    }
    
    
    /**
     * 关闭线程池
     */
    @Override
    protected void stopInternal() throws LifecycleException {
    
        setState(LifecycleState.STOPPING);
        if (executor != null) {
            executor.shutdownNow();
        }
        executor = null;
        taskqueue = null;
    }
    
    /**
     * 清除线程池
     */
    @Override
    protected void destroyInternal() throws LifecycleException {
        super.destroyInternal();
    }
    
    /**
     * 关闭线程池
     */
    public void contextStopping() {
        if (executor != null) {
            executor.contextStopping();
        }
    }
    
    
    

    任务执行

    /**
     * 加入一个带超时的任务
     **/
    @Override
    public void execute(Runnable command, long timeout, TimeUnit unit) {
    
        // 调用 executor 对象去执行
        // 如果 executor 对象是空的,则抛出异常
        if (executor != null) {
            executor.execute(command,timeout,unit);
        } else {
            throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
        }
    }
    
    /**
     * 加入一个不带超时的任务
     **/
    @Override
    public void execute(Runnable command) {
        
        // 逻辑基本同上
        if (executor != null) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                
                // 此处会再尝试将任务加入一次等待队列中
                // TaskQueue.force(...) 方法底层会调用 Queue.offer(...) 方法
                // 如果仍然失败,会抛出异常
                if (!((TaskQueue) executor.getQueue()).force(command)) {
                    throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
                }
            }
        } else {
            throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
        }
    }
    
    

    其它方法基本都是 get / set 方法,可以忽略。

    ThreadPoolExecutor

    org.apache.tomcat.util.threads.ThreadPoolExecutor 是 Tomcat 中的线程池类。
    类和变量总览

    public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
        
        // 统一的 String 的管理类,用来防止编码等一系列的问题
        protected static final StringManager sm = StringManager
                .getManager("org.apache.tomcat.util.threads.res");
    
        // 任务提交数量的计数
        private final AtomicInteger submittedCount = new AtomicInteger(0);
    
    
        private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    
         // 线程自杀计数
        private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
    
         // 重建线程的时间间隔
        private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
    
        // 其它方法暂时忽略
        // ...
    

    构造器

    /**
     * 基本是沿用了 java.util.concurrent.ThreadPoolExecutor 的构造方法
     **/
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        // 调用 juc 中的 ThreadPoolExecutor 进行线程池的初始化
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        // 用于初始化常驻线程
        prestartAllCoreThreads();
    }
    
    // 下列的构造方法都差不多
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        prestartAllCoreThreads();
    }
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
        prestartAllCoreThreads();
    }
    
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
        prestartAllCoreThreads();
    }
    

    拒绝策略

    // ThreadPoolExecutor.class
    private static class RejectHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r,
        java.util.concurrent.ThreadPoolExecutor executor) {
            // 直接抛出错误
            throw new RejectedExecutionException();
        }
    
    }
    

    任务执行

    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }
    
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        // 提交计数加一
        submittedCount.incrementAndGet();
        try {
            // 此处调用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // 如果调用父类中的方法执行错误,会尝试将任务再一次放入到等待队列里
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    // 此处尝试放入等待队列
                    // 如果也失败了,就回滚提交计数,并抛出异常
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }
    
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 计数加一
        submittedCount.decrementAndGet();
    
        // 如果没有报错,那么此处尝试关闭多余的线程
        // 抛出错误的方式停止线程
        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }
    
    /**
     * 判断是否需要关闭线程
     **/
    protected void stopCurrentThreadIfNeeded() {
        // 如果线程存活时间超过了 delay 值,那么此处会抛出一个错误,使线程停止
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                System.currentTimeMillis() + 1)) {
    
                    final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                    Thread.currentThread().getName());
    
                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }
    
    protected boolean currentThreadShouldBeStopped() {
        // 如果当前线程并非工作线程,或者不存在线程存活 delay 值,那么此处返回 false
        // 如果当前线程是工作线程,且设置了 delay 时间,且当前线程的存活时间已经超过了设置值,那么此处返回 true
        if (threadRenewalDelay >= 0 
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            if (currentTaskThread.getCreationTime() <
                this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }
    
    

    从execute 方法可以看出,当提交线程的时候,如果被线程池拒绝了,Tomcat 的线程池,还会厚着脸皮再次尝试,调用 force() 方法"强行"的尝试向阻塞队列中添加任务。

    优雅关闭

    
    public void contextStopping() {
            this.lastContextStoppedTime.set(System.currentTimeMillis());
    
            // 保存 corePoolSize 的值
            int savedCorePoolSize = this.getCorePoolSize();
        
            // 获取队列
            TaskQueue taskQueue =
                    getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
            // 将 taskQueue 中的 forcedRemainingCapacity 置为零
            // 不太清楚 forcedRemainingCapacity 有什么作用
            if (taskQueue != null) {
                taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
            }
    
            // corePoolSize 置为零
            this.setCorePoolSize(0);
    
            // 将 taskQueue 中的 forcedRemainingCapacity 置空
            if (taskQueue != null) {
                taskQueue.setForcedRemainingCapacity(null);
            }
        
            // 恢复 corePoolSize
            this.setCorePoolSize(savedCorePoolSize);
        }
    
    

    TaskQueue

    TaskQueue 是 Tomcat 中对任务队列的增强和封装:

    public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    
        // 序列编码
        private static final long serialVersionUID = 1L;
        
        // 字符串管理类
        protected static final StringManager sm = StringManager
                .getManager("org.apache.tomcat.util.threads.res");
                
        // 任务队列关联的线程池
        private transient volatile ThreadPoolExecutor parent = null;
    
        // 不太清楚是做什么用的一个容量计数
        private Integer forcedRemainingCapacity = null;
    
        // 其它方法暂时忽略
        // ...
    

    加入、获取任务的相关方法

    // 不带超时的添加任务方法
    public boolean force(Runnable o) {
        // 关联线程池不可为空
        if (parent == null || parent.isShutdown())
            throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
        // 调用 LinkedBlockingQueue 的 offer(...) 方法添加任务
        return super.offer(o);
    }
    
    // 带超时的添加任务方法
    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (parent == null || parent.isShutdown())
            throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
        return super.offer(o,timeout,unit);
    }
    
    // 本质上是调用父类的 offer(...) 方法
    // 非阻塞添加任务的方法
    @Override
    public boolean offer(Runnable o) {
    
        if (parent == null) 
            return super.offer(o);
        
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
            return super.offer(o);
        
        if (parent.getSubmittedCount() <= (parent.getPoolSize())) 
            return super.offer(o);
        
        // 这种情况下线程池可以直接消费任务,无需放入任务队列等待
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) 
            return false;
        
        return super.offer(o);
    }
    
    // 带超时的阻塞方式获取任务
    @Override
    public Runnable poll(long timeout, TimeUnit unit)
        throws InterruptedException {
        
        // 获取一个任务,如果获取到的为空,则停止当前线程
        // 能获取到就返回任务
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }
    
    // 阻塞方式获取任务
    @Override
    public Runnable take() throws InterruptedException {
        // 线程池存在的情况下,会使用限时的方式去获取任务
        if (parent != null 
            && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                        TimeUnit.MILLISECONDS);
        }
        return super.take();
    }
    
    

    TaskQueue 在设计的时候都考虑了关联线程池存不存在的情况,笔者认为这应该是 Tomcat 的作者考虑到开发者可能会需要复用 TaskQueue 到其它的场景中。
    在提交任务的时候,增加了几个分支判断。

    首先我们看看 parent 是什么:

    private transient volatile ThreadPoolExecutor parent = null;
    这里需要特别注意这里的 ThreadPoolExecutor 并不是 jdk里面的 java.util.concurrent.ThreadPoolExecutor 而是 tomcat 自己实现的。

    我们分别来看 offer 中的几个 if 分支。

    首先我们需要明确一下,当一个线程池需要调用阻塞队列的 offer 的时候,说明线程池的核心线程数已经被占满了。(记住这个前提非常重要)

    要理解下面的代码,首先需要复习一下线程池的 getPoolSize() 获取的是什么?我们看源码:

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }
    

    需要注意的是,workers.size() 包含了 coreSize 的核心线程和临时创建的小于 maxSize 的临时线程。

    先看第一个 if

    // 如果线程池的工作线程数等于 线程池的最大线程数,这个时候没有工作线程了,就尝试加入到阻塞队列中
        if (parent.getPoolSize() == parent.getMaximumPoolSize()){
            return super.offer(o);
        }
    

    经过第一个 if 之后,线程数必然在核心线程数和最大线程数之间。

    if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
        return super.offer(o);
    }
    

    对于 parent.getSubiitedCount() ,我们要先搞清楚 submiitedCount 是什么

    /**
     * The number of tasks submitted but not yet finished. This includes tasks
     * in the queue and tasks that have been handed to a worker thread but the
     * latter did not start executing the task yet.
     * This number is always greater or equal to {@link #getActiveCount()}.
     */
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    

    这个数是一个原子类的整数,用于记录提交到线程中,且还没有结束的任务数。包含了在阻塞队列中的任务数和正在被执行的任务数两部分之和 。

    所以这行代码的策略是,如果已提交的线程数小于等于线程池中的线程数,表明这个时候还有空闲线程,直接加入阻塞队列中。为什么会有这种情况发生?其实我的理解是,之前创建的临时线程还没有被回收,这个时候直接把线程加入到队里里面,自然就会被空闲的临时线程消费掉了。

    我们继续往下看:

    //if we have less threads than maximum force creation of a new thread
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
        return false;
    }
    

    由于上一个 if 条件的存在,走到这个 if 条件的时候,提交的线程数已经大于核心线程数了,且没有空闲线程,所以返回一个 false 标明,表示任务添加到阻塞队列失败。线程池就会认为阻塞队列已经无法继续添加任务到队列中了,根据默认线程池的工作逻辑,线程池就会创建新的线程直到最大线程数。

    回忆一下 jdk 默认线程池的实现,如果阻塞队列是无界的,任务会无限的添加到无界的阻塞队列中,线程池就无法利用核心线程数和最大线程数之间的线程数了。

    Tomcat 的实现就是为了,线程池即使核心线程数满了以后,且使用无界队列的时候,线程池依然有机会创建新的线程,直到达到线程池的最大线程数。

    总结一下:

    Tomcat 线程池的逻辑:

    • 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
    • 如果线程数大于 corePoolSize了,Tomcat 的线程不会直接把线程加入到无界的阻塞队列中,而是去判断,submittedCount(已经提交线程数)是否等于 maximumPoolSize。
    • 如果等于,表示线程池已经满负荷运行,不能再创建线程了,直接把线程提交到队列,
    • 如果不等于,则需要判断,是否有空闲线程可以消费。
    • 如果有空闲线程则加入到阻塞队列中,等待空闲线程消费。
    • 如果没有空闲线程,尝试创建新的线程。(这一步保证了使用无界队列,仍然可以利用线程的 maximumPoolSize)。
    • 如果总线程数达到 maximumPoolSize,则继续尝试把线程加入 BlockingQueue 中。
    • 如果 BlockingQueue 达到上限(假如设置了上限),被默认线程池启动拒绝策略,tomcat 线程池会 catch 住拒绝策略抛出的异常,再次把尝试任务加入中 BlockingQueue 中。
    • 再次加入失败,启动拒绝策略。

    相关文章

      网友评论

        本文标题:Tomcat 线程池

        本文链接:https://www.haomeiwen.com/subject/jzrjdrtx.html