美文网首页
线程池queueCapacity踩坑了

线程池queueCapacity踩坑了

作者: 丑人林宗己 | 来源:发表于2022-05-21 21:11 被阅读0次

    最近上线的某个项目突然开始偶现线程拒绝。

    背景

    内部有个框架主要用于做最终一致性,通过分布式JOB+DB的方式来处理一些存在分布式事务,比如DB+MQ。方案的思路是运行时先往数据库落一个任务,然后异步执行任务。

    通过定时任务异步的捞取任务来检查状态决定是否重试,以此达到最终一致性。

    任务的处理通过线程池来控制。

    排查

    先看线程池的参数设置:

    • 核心线程数 x
    • 队列长度为 x/2
    • 最大线程数 x + x/2
    • keepAliveSeconds 10h

    粗略一看,感觉对于当前的任务处理没什么问题,但是其实埋藏着一些风险。

    检查是否有运行阻塞的任务

    通过arthas trace跟踪,得到的结果非常正常。

    看线程堆栈

    通过jstask 打印出线程堆栈来查看,发现所有的线程都处于阻塞,等待任务的状态。(线程是x + x/2)

    java.lang.Thread.State: TIMED_WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000821ce058> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
            at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    --
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000842cf2b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    

    这明显跟现在的任务运行不符合,现在的任务是每次定时任务触发时,会触发x个任务,但是会有一定数量的异常,具体多少个没有细算。

    下意识的假定,现在所有的线程都处于等待任务的状态,说明队列是空的,那么此时任务至少应该是x+x/2+x+x/2 的数量才会开始抛出拒绝状态。

    因为这个错误的假定,耗费了非常多的时间去排查其他问题。后来反复看堆栈,才摸到了思路,源自于parking to wait for这个提示。

    如果线程池是刚创建出来的,那么毫无疑问,假定是正确的,可是运行了一段时间的线程池,如果没有及时回收的话,线程会处于阻塞,等待队列任务的状态,看线程池的提交任务的代码:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { // 如果线程数量小于核心线程数
                if (addWorker(command, true)) // 创建工作线程,该任务为此工作线程的第一个任务执行
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) { // 添加到队列
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) // 尝试创建工作线程,false表示已最大线程数来作为条件
                reject(command);
        }
    

    所以当工作线程都存在时,任务首先要做的是入队列。队列为阻塞队列:

    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
            return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
    }
    

    offer方法都有啥,核心包括入队列,还有唤醒阻塞在队列上的线程,而线程在运行时,从队列中poll任务时,需要获取锁。

    意味着,不是所有的任务进去就能立刻被线程消费,而回去看队列的值queueCapacity = x/2,显而易见,每次进来x个任务,队列肯定放不下,于是就会看到每次都有一定数量的任务被拒绝。

    设置队列值queueCapacity = 2x,异常已没有出现,暂定为已处理。

    简单的问题也容易一叶障目。

    相关文章

      网友评论

          本文标题:线程池queueCapacity踩坑了

          本文链接:https://www.haomeiwen.com/subject/ybxhprtx.html