美文网首页
线程池系列(4)tomcat实现ThreadPoolExecut

线程池系列(4)tomcat实现ThreadPoolExecut

作者: 小胖学编程 | 来源:发表于2021-11-01 22:31 被阅读0次

    高频面试题:tomcat的线程池和JDK的线程池有什么区别?解答这个问题前,我们需要分析下tomcat如何重写JDK的ThreadPoolExecutor的。

    源码分析:Tomcat8.5源码构建 可以去下载的是Tomcat9的版本。

    1. 概论

    1.1 tomcat线程池和juc线程池流程

    JDK线程池策略:

    1. 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务。
    2. 当前线程池线程数量大于等于corePoolSize,则每来一个任务。会尝试将其添加到任务缓存队列中,若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行。
    3. 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理。

    tomcat线程池策略:

    1. 当前线程数小于corePoolSize,则去创建工作线程;
    2. 当前线程数大于corePoolSize,但小于maximumPoolSize,则去创建工作线程;
    3. 当前线程数大于maximumPoolSize,则将任务放入到阻塞队列中,当阻塞队列满了之后,则调用拒绝策略丢弃任务;

    1.2 tomcat线程池和juc线程池的区别

    tomcat线程池是在juc线程池的基础上,修改少量代码来实现。在1.1中,我们看到其和juc的线程池执行流程有着很大的区别。那么tomcat为什么要这样设计?

    实际上,还是tomcat和juc的场景不同:

    使用线程池的任务有两种:

    1. IO密集型任务(如调用接口、查询数据库);
    2. CPU密集型任务;

    JDK的线程池ThreadPoolExecutor主要目的解决的便是CPU密集型任务的并发处理。但是Tomcat若使用原生的JDK线程池,一旦接收的请求数量大于线程池的核心线程数,这些请求就会被放到队列中,等待核心线程处理。这样会降低请求的总体处理速度,所以Tomcat并没有使用JDK原生线程池的策略。

    JDK线程池:当线程数达到corePoolSize后,任务首先被放到queue。发挥CPU多核的并行优势,减少多个线程导致的上下文切换。适合的场景是:CPU密集型任务

    Tomcat线程池:当大量请求达到时,接收的请求数量大于核心线程池的corePoolSize时,会继续创建worker线程去处理请求。而后续请求量变少时,只会销毁maximumPoolSize线程数。适合的场景是IO密集型。

    1.3 IO密集型和CPU密集型任务核心参数的设置

    场景:JDK线程池执行IO密集型的任务:

    解决方案:可以提高corePoolSize的大小。

    引入问题:系统中线程数过多。

    线程数过多问题的解决方案:指定ThreadPoolExecutor的allowCoreThreadTimeout=true,那么核心线程若处于闲置状态的话,超过一定的时间(KeepAliveTime),就会销毁掉。

    引入问题:当核心线程数被销毁时,而有大量请求到达时系统重新创建worker线程,会使得前期接口响应时间变长。(这也是为什么系统上线需要预热)。

    所以JDK线程池无法完美的去处理IO密集型的任务,这也就是为什么Tomcat需要重写JDK线程池的原因。

    2. Tomcat线程池执行流程

    Tomcat是重写了JDK线程池(即改动的是少量的源码)实现的功能的增强。

    思想:主要流程还是JDK线程池流程,即先开启corePoolSize线程,然后在queue,最后在开启maximumPoolSize线程。Tomcat重点改造的是queue的offer()。即在向queue放入任务时,若发现未达到最大线程数,那么offer()返回false,即放入队列失败。此时,便继续开启maximumPoolSize线程。

    2.1 自定义队列

    Tomcat主要是通过实现自定义队列来完成逻辑的改造。

    /**
     * 实现Tomcat特有逻辑的自定义队列
     */
    public class TaskQueue extends LinkedBlockingQueue<Runnable> {
        private static final long serialVersionUID = 1L;
    
        private transient volatile ThreadPoolExecutor parent = null;
    
        private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;
    
        /**
         * 强制遗留的容量
         */
        private int forcedRemainingCapacity = -1;
    
        /**
         * 队列的构建方法
         */
        public TaskQueue() {
        }
    
        public TaskQueue(int capacity) {
            super(capacity);
        }
    
        public TaskQueue(Collection<? extends Runnable> c) {
            super(c);
        }
    
        /**
         * 设置核心变量
         */
        public void setParent(ThreadPoolExecutor parent) {
            this.parent = parent;
        }
    
        /**
         * put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。
         * offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。
         *
         * @param o 当任务被拒绝后,继续强制的放入到线程池中
         * @return 向阻塞队列塞任务,当阻塞队列满了之后,offer会返回false。
         */
        public boolean force(Runnable o) {
            if (parent == null || parent.isShutdown()) {
                throw new RejectedExecutionException("taskQueue.notRunning");
            }
            return super.offer(o);
        }
    
        /**
         * 带有阻塞时间的塞任务
         */
        @Deprecated
        public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if (parent == null || parent.isShutdown()) {
                throw new RejectedExecutionException("taskQueue.notRunning");
            }
            return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected
        }
    
        /**
         * 当线程真正不够用时,优先是开启线程(直至最大线程),其次才是向队列填充任务。
         *
         * @param runnable 任务
         * @return false 表示向队列中添加任务失败,
         */
        @Override
        public boolean offer(Runnable runnable) {
            if (parent == null) {
                return super.offer(runnable);
            }
            //若是达到最大线程数,进队列。
            if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
                return super.offer(runnable);
            }
            //当前活跃线程为10个,但是只有8个任务在执行,于是,直接进队列。
            if (parent.getSubmittedCount() < (parent.getPoolSize())) {
                return super.offer(runnable);
            }
            //当前线程数小于最大线程数,那么直接返回false,去创建最大线程
            if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
                return false;
            }
            //否则的话,将任务放入到队列中
            return super.offer(runnable);
        }
    
        /**
         * 获取任务
         */
        @Override
        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
            Runnable runnable = super.poll(timeout, unit);
            //取任务超时,会停止当前线程,来避免内存泄露
            if (runnable == null && parent != null) {
                parent.stopCurrentThreadIfNeeded();
            }
            return runnable;
        }
    
        /**
         * 阻塞式的获取任务,可能返回null。
         */
        @Override
        public Runnable take() throws InterruptedException {
            //当前线程应当被终止的情况下:
            if (parent != null && parent.currentThreadShouldBeStopped()) {
                long keepAliveTime = parent.getKeepAliveTime(TimeUnit.MILLISECONDS);
                return poll(keepAliveTime, TimeUnit.MILLISECONDS);
            }
            return super.take();
        }
    
        /**
         * 返回队列的剩余容量
         */
        @Override
        public int remainingCapacity() {
            if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {
                return forcedRemainingCapacity;
            }
            return super.remainingCapacity();
        }
    
    
        /**
         * 强制设置剩余容量
         */
        public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
            this.forcedRemainingCapacity = forcedRemainingCapacity;
        }
    
        /**
         * 重置剩余容量
         */
        void resetForcedRemainingCapacity() {
            this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
        }
    }
    

    2.2 自定义线程池ThreadPoolExecutor

    Tomcat线程池ThreadPoolExecutor是继承的AbstractExecutorService类。但是很多代码依旧使用的是JDK的ThreadPoolExecutor。只是稍微改造了一部分。

        public void execute(Runnable command, long timeout, TimeUnit unit) {
    
            /**
             * 提交任务的数量+1
             */
            submittedCount.incrementAndGet();
            try {
                /**
                 * 线程池内部方法,真正执行的方法。就是JDK线程池原生的方法。
                 *
                 * 因为重写了阻塞队列,才完成Tomcat特有逻辑的实现。
                 *
                 * 1. 重写队列方法;达到核心线程数,然后向阻塞队列中放置,阻塞队列直接返回false
                 * 2. 返回false,则开启maximum pool size;
                 * 3. maximum pool size到达极限时,会抛出RejectedExecutionException方法。
                 *
                 */
                executeInternal(command);
                /**
                 * 任务被拒绝
                 */
            } catch (RejectedExecutionException rx) {
                /**
                 * 在将被拒绝的任务放入到队列中。
                 */
                if (getQueue() instanceof TaskQueue) {
    
                    //如果Executor接近最大线程数,应该将任务添加到队列中,而不是拒绝。
                    final TaskQueue queue = (TaskQueue) getQueue();
                    try {
                        //强制的将任务放入到阻塞队列中
                        if (!queue.force(command, timeout, unit)) {
                            //放入失败,则继续抛出异常
                            submittedCount.decrementAndGet();
                            throw new RejectedExecutionException("threadPoolExecutor.queueFull");
                        }
                    } catch (InterruptedException x) {
                        //被中断也抛出异常
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(x);
                    }
                } else {
                    //不是这种队列,那么当任务满了之后,直接抛出去。
                    submittedCount.decrementAndGet();
                    throw rx;
                }
            }
        }
        
        /**
         * JDK线程池的任务执行的逻辑
         */
        private void executeInternal(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
            int c = ctl.get();
            //未达到corePoolSize数量,则去开启线程
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true)) {
                    return;
                }
                c = ctl.get();
            }
            //开启到corePoolSize数量的工作线程,则将任务放入队列。
            //但是Tomcat重写了阻塞队列。
            //当放入workQueue.offer(command)返回false,则继续开启线程数
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (!isRunning(recheck) && remove(command)) {
                    reject(command);
                } else if (workerCountOf(recheck) == 0) {
                    addWorker(null, false);
                }
            } else if (!addWorker(command, false)) {
                //开启最大线程失败,则任务被拒绝。
                reject(command);
            }
        }
    

    注意事项:https://bz.apache.org/bugzilla/show_bug.cgi?id=65454

    注意事项.png

    tomcat的优化:

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) {
                decrementWorkerCount();
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    //获取剩余的最小线程数,若配置了允许最小线程数关闭的参数,min=0
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    //当配置了允许最小线程关闭参数,且队列不为空的情况下,允许保留的最小线程数为1
                    /**
                     * 这是关闭空闲核心线程数时的判断。
                     */
                    if (min == 0 && !workQueue.isEmpty()) {
                        min = 1;
                    }
                    // https://bz.apache.org/bugzilla/show_bug.cgi?id=65454
                    // If the work queue is not empty, it is likely that a task was
                    // added to the work queue between this thread timing out and
                    // the worker count being decremented a few lines above this
                    // comment. In this case, create a replacement worker so that
                    // the task isn't held in the queue waiting for one of the other
                    // workers to finish.
                    /**
                     * 若当前工作数量>允许的最小线程数,那么关闭改线程。
                     * Tomcat对此处进行了优化。
                     */
                    if (workerCountOf(c) >= min && workQueue.isEmpty()) {
                        return; // replacement not needed
                    }
                }
                /**
                 * 没有终止线程,又重新开启线程
                 */
                addWorker(null, false);
            }
        }
    

    2.3 测试代码

        public static void main(String[] args) {
            TaskQueue taskQueue = new TaskQueue(10);
    
            TaskThreadFactory tf = new TaskThreadFactory("tomcat-", false, 10);
    
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 8, 5000,
                    TimeUnit.MILLISECONDS, taskQueue, tf);
    
    
            //填充parent变量
            taskQueue.setParent(threadPoolExecutor);
    
            for (int i = 0; i < 10; i++) {
                CompletableFuture.runAsync(() -> {
                    try {
                        log.info("poolSize:" + threadPoolExecutor.getPoolSize() +
                                ",queueSize:" + threadPoolExecutor.getQueue().size() + "," + Thread.currentThread()
                                .getName());
                        Thread.sleep(5000);
                    } catch (Exception e) {
                    }
    
                }, threadPoolExecutor);
    
                threadPoolExecutor.execute(() -> {});
    
            }
        }
    

    推荐阅读

    Tomcat线程池与Java原生线程池的区别

    Tomcat 源码解析之线程池

    原生线程池这么强大,Tomcat 为何还需扩展线程池?

    https://bz.apache.org/bugzilla/show_bug.cgi?id=65454

    相关文章

      网友评论

          本文标题:线程池系列(4)tomcat实现ThreadPoolExecut

          本文链接:https://www.haomeiwen.com/subject/lkzialtx.html