Java线程池监控小结

作者: 程序熊大 | 来源:发表于2018-03-28 22:13 被阅读929次

    最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。

    从这个问题中,我们学到了两点:

    • 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;
    • 线程池的运行状况,也需要监控

    关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:

    public class AsyncThreadExecutor implements AutoCloseable {
    
        private static final int DEFAULT_QUEUE_SIZE = 1000;
    
        private static final int DEFAULT_POOL_SIZE = 10;
    
        @Setter
        private int queueSize = DEFAULT_QUEUE_SIZE;
    
        @Setter
        private int poolSize = DEFAULT_POOL_SIZE;
    
        /**
         * 用于周期性监控线程池的运行状态
         */
        private final ScheduledExecutorService scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
    
        /**
         * 自定义异步线程池
         * (1)任务队列使用有界队列
         * (2)自定义拒绝策略
         */
        private final ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),
                                   new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),
                                   (r, executor) -> log.error("the async executor pool is full!!"));
        private final ExecutorService executorService = threadPoolExecutor;
    
        @PostConstruct
        public void init() {
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                /**
                 * 线程池需要执行的任务数
                 */
                long taskCount = threadPoolExecutor.getTaskCount();
                /**
                 * 线程池在运行过程中已完成的任务数
                 */
                long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                /**
                 * 曾经创建过的最大线程数
                 */
                long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                /**
                 * 线程池里的线程数量
                 */
                long poolSize = threadPoolExecutor.getPoolSize();
                /**
                 * 线程池里活跃的线程数量
                 */
                long activeCount = threadPoolExecutor.getActiveCount();
    
                log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",
                         taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
            }, 0, 10, TimeUnit.MINUTES);
        }
    
        public void execute(Runnable task) {
            executorService.execute(task);
        }
    
        @Override
        public void close() throws Exception {
            executorService.shutdown();
        }
    }
    

    这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。

    在查看监控日志的时候,看到下图所示的监控日志:


    屏幕快照 2018-03-28 21.55.19.png

    这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:

        /**
         * Returns the largest number of threads that have ever
         * simultaneously been in the pool.
         *
         * @return the number of threads
         */
        public int getLargestPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                return largestPoolSize;
            } finally {
                mainLock.unlock();
            }
        }
    

    注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;//这里这里!
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    发现两点:

    • largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;
    • largestPoolSize<=maximumPoolSize。

    PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。

    相关文章

      网友评论

      • Peter潘的博客:晕啊,largestPooSize为39是你写错了,你把完成的任务的数量写成了largePoolSize。 :cold_sweat: largePoolSize应该为线程池中线程创建到达最大值的数量。
        程序熊大:@Peter潘的博客 嗯,参考这个:http://flyfoxs.iteye.com/blog/2289931,又加深了理解,感谢
        Peter潘的博客:@杜琪 恩,那个理解没啥问题,不够我觉得描述为线程池中曾经创建线程到达的阈值,这样更加精确吧,而且这个是值是个只增不减的值,largestPoolSize和maximumPoolSize的关系是:largestPoolSize<=maximumPoolSize。
        程序熊大:确实如此,感谢指出,不过误打误撞我后面那个分析是对的吧?
      • daemon4wrm:大佬能分享下具体因为啥 hang 死了主线程吗,CallerRunsPolicy 这个策略一般到了最大线程池还有任务加进来的话会是主线程来执行,主线程执行完任务会继续原本主线程的工作,我这边也用过这种策略,貌似没毛病
        程序熊大:@deamon4wrm 你想复杂了,简单点:假设异步的任务是要将一个数据写入到另一个数据库,但是另一个数据库网络出问题,导致数据库操作超时了
        daemon4wrm:@杜琪 触发了CallerRunsPolicy拒绝策略之后,会由主线程来完成新的准备加入线程池的任务,您这边主线程 hang 死是因为主线程本来的工作还没做完,然后异步任务加入线程池的时候由于池满被执行拒绝策略,由主线程挂起当前主线程的工作执行异步任务,而主线程的工作是不能长时间挂起造成的吧?我的妈,这段话好长:smile:
        程序熊大:@deamon4wrm 主线程中,将一部分工作扔到线程池做的原因是这部分工作依赖的中间件是新准备上的一个,当该中间件性能出现问题时,就会导致异步线程池处理不过来,然后触发拒绝策略,就会hang死主线程。
      • IT人故事会:赞,找了好久才找到!

      本文标题:Java线程池监控小结

      本文链接:https://www.haomeiwen.com/subject/dmtpcftx.html