美文网首页
重新审视JDK线程池

重新审视JDK线程池

作者: WhiteBase | 来源:发表于2019-07-15 21:21 被阅读0次

JDK 线程池使用过程中,很多人都知道有一些关键参数需要配置,

​    public ThreadPoolExecutor(int corePoolSize, 

​                              int maximumPoolSize, 

​                              long keepAliveTime, 

​                              TimeUnit unit, 

​                              BlockingQueue<Runnable> workQueue, 

​                              RejectedExecutionHandler handler)  

也大致知道线程池的大致原理,但是不一定能解释某些现象。

有个系统,设计大致是这样的:服务A 发送消息到 MQ(具体来说是 kafka),消费端调用服务 B 实际执行消费操作。公司的中间件对 kafka 做了一层封装,能够动态配置一些参数,动态重建 consumer 应用新的配置,其中有一个参数就是并行度 parallelCount ,含义是:对于同一个 partition 分配多少个线程并行处理消息。

系统设计之初,就考虑了使用这个配置来动态调整整个系统的承载能力,因为流量弹性比较高,少的时候一天没有调用量,多的时候可能需要在较短时间内处理几十万到上百万的消息,处理时间甚至可能需要根据下游系统性能调整。所以这个参数的动态调整至关重要。

Snipaste_2019-07-15_21-14-06.png

但是实际上线之后,一次大批量调用,观察到并行度调整似乎没有达到预期效果,默认 parallelCount = 1,如果业务能保证不依赖消息顺序,则可以调整并行度提高吞吐量

有一天线上收到告警,消息积压。于是赶紧调整并行度,

n=1,

n=2,

一切有序进行中,消息处理速度整体不断增加

n=8,

n=9,

n=10

n=16

但是观察线上监控,似乎处理能力不再增加了?这是怎么回事?

我记得下游系统 actual service 配置了最大 64 线程,这还差很多呢,怎么就不线性增长了?

Snipaste_2019-07-15_20-07-45.png

下游系统响应变慢?

开始的时候猜想,是不是下游系统处理能力不够了?请求的响应速度变慢,所以请求堆积起来了?

由于下游系统是个外部的 HTTP 服务,所以无从得知,但是从历史经验来看,远远达不到这个系统的瓶颈,因为这个系统其实有很多的外部调用方,我们的请求量不见得算很大。

而且从 actual service 的内部打点来看,实际执行 HTTP 调用的地方 TP99 并没有变慢,和平时一样。

下游系统限流?

这是有可能的,因为下游系统这个HTTP服务本身有对各个接入放有限流,但是查了文档,当前调用量还远没有达到限流阈值。

系统内部分析

那么问题只会出现在系统内部了

查看监控

consumer 应用内部,当时的线程堆栈采样可以看出来,kafka consumer 端线程数量确实有 16 个

这样也就排除了 consumer 并行度调整不生效的问题。

actual service 内部,查看当时的线程堆栈采样,对应 consumer 并行度 16 的时候,service 内部用于处理任务的专用线程池,thread-count == 10

起初很奇怪,细想一下终于明白了。JDK 线程池确实是这个逻辑

image

简单来说,就是 threadCount > coreSize ,先开始排队,队列满再扩充线程池

//java.util.concurrent.ThreadPoolExecutor#execute
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

JDK 代码注释中有解释,代码短小精悍。

优化方案

知道问题所在了,那么怎么解决呢?

初步想,有两种方案。

方案一

直接把 coreSize 设置到一个足够大的值,比如 64,或者干脆配置一个 fixed size 的线程池

优点:简单直接,能解决问题

缺点:请求量低的时候大量线程闲置,浪费系统资源

方案二

这也是本篇的精髓所在了,改造 JDK 线程池。

既然缺陷在于先排队后扩容,延迟了扩容的时机,那就改成先扩容后排队,这样就能确保在一定空间下处理能力线性增长了。

怎么做呢?分析上面的代码,第二个 if 语句,isRunning(c) && workQueue.offer(command) 如果入队成功了就不会创建线程,所以只要重载 Queue,判断当前 threadCount > coreSize && threadCount < maxCount 的时候返回 false,就可以了,等到 threadCount > maxSize 的时候再实际执行入队操作。

其实这就是 tomcat 线程池的做法,细节上需要注意:queue 需要感知到 threadPool 当前的 count,需要做一些改造。

看源码:tomcat 8.0.30 版本

//org.apache.tomcat.util.threads.TaskQueue#offer
    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

创建的时候持有 Pool 的引用

// org.apache.catalina.core.StandardThreadExecutor#startInternal
    @Override
    protected void startInternal() throws LifecycleException {

        taskqueue = new TaskQueue(maxQueueSize);
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        taskqueue.setParent(executor);

        setState(LifecycleState.STARTING);
    }

剥离开 tomcat 的一些不相关的参数,简单改造一下就可以用了。

感谢 tomcat ,随便一看都是宝藏

相关文章

  • 重新审视JDK线程池

    JDK 线程池使用过程中,很多人都知道有一些关键参数需要配置, 也大致知道线程池的大致原理,但是不一定能解释某些现...

  • spring 线程池和java线程池

    jdk线程池就是使用jdk线程工具类ThreadPoolExecutor 创建线程池spring线程池就是使用自己...

  • 线程池

    线程池的文章:JDK线程池(一):体系结构JDK线程池(二):ThreadPoolExecutor深入分析java...

  • 线程以及java线程池实现分享

    线程以及java线程池实现分享 线程简介 JDK线程池的工作原理 JDK线程池的实现细节 1.线程简介-由来 1....

  • 线程池查漏补缺

    tomcat线程池和jdk线程池区别 概述 线程池是什么,为什么要线程池 jdk有哪些线程池和原理 第三方中间件的...

  • Spring线程池ThreadPoolTaskExecutor学

    ThreadPoolTaskExecutor线程是Spring的线程池,其底层是依据JDK线程池ThreadPoo...

  • 线程池

    JDK线程池 为什么要用线程池 线程池为什么这么设计 线程池原理 核心线程是否能被回收 如何回收空闲线程 Tomc...

  • 多线程知识点总结(3)

    线程池 基本线程池应用 JDK自带线程池Executor接口,子接口是ExecutorService,最常用的实现...

  • 2020-07-28JDK5.0之后新增的创建多线程的方式2

    新增方式二:使用线程池 线程池相关API JDK 5.0起提供了线程池相关API:ExecutorService ...

  • 线程池

    JDK自带线程池 Executors.newFixedThreadPool(int) :一池N线程 Executo...

网友评论

      本文标题:重新审视JDK线程池

      本文链接:https://www.haomeiwen.com/subject/ccgokctx.html