美文网首页
java线程池原理简析

java线程池原理简析

作者: 蛋花汤汤 | 来源:发表于2022-04-12 18:53 被阅读0次

    今天看到别人写的@Async注解的文章,发现自己对java线程池的工作原理有点记不太清了,再翻出源码记录一下吧。
    jdk版本:1.8.0_191
    核心代码在ThreadPoolExecutor#execute()方法中。
    先上此方法的注释:

            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
    

    这个注释把方法入参command的工作流程写的非常清楚,看完再读代码就非常轻松了。简单的翻译一下。

    task 通过以下三步被处理:
    1. 如果线程池中当前线程数量小于 corePoolSize 的值,那么尝试开启一个新的线程,并将入参command作为它的firt task。调用addWorker方法时,会自动检查runState以及workerCount, 当它不能新增线程的时候,就返回false来避免错误。
    2. 如果一个任务可以成功的进入等待队列,那么我们仍然需要再次检查到底是需要新增一条线程(因为存在现存的线程挂掉的情况)还是说自从接收这个任务后线程池关闭了。所以再次检查state,如果线程池关闭了,就回滚入队操作,拒绝任务;如果线程数为0,就新建一条线程。
    3. 如果task并不能成功入队,那么就尝试新建一条线程。如果这个操作失败了,那么线程池就是处于关闭或者饱和的状态,所以就拒绝掉这个task。
    

    核心代码如下:

            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { // 对应step1
                if (addWorker(command, true)) // 二参为true,以corePoolSize为边界
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {  // 对应step2
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false); // 注意这里firstTask是null。因为走到这里时,task已经在queue里
            }
            else if (!addWorker(command, false)) // 对应step3。二参为false,以maximumPoolSize为边界
                reject(command);
    

    可以简单的总结下流程:
    当提交一个新的task时,

    1. 当前线程数小于corePoolSize时,尝试addWorker;
    2. 大于或者addWorker失败,且线程池没关闭就尝试入队
      入队成功,二次检查:
      2.1 线程池已关闭,回滚入队操作并拒绝task;
      2.2 线程池中没有线程就新增一条线程到线程池
    3. 入队失败则尝试新建一个线程接纳任务,还是不行就拒绝任务。

    关键方法:addWorker(Runnable firstTask, boolean core)

    分成两部分来看:

    1. retry 部分
    2. 成功增加线程数后的操作

    前半部分

    retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false; // 简单理解为线程池已关闭吧。还要再去看看线程池的生命周期
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    

    注释也挺清楚的,也可以分为内外两个循环来看。
    外层循环主要是获取线程池的生命周期状态并判断是否直接返回false;
    内层循环主要是cas来设置线程数,因为要+1嘛。如果cas成功,跳出retry执行后半部分的代码;如果cas失败,则判断下线程池的状态是否发生过变化,没变就继续内层循环cas,变化了就去外层循环判断线程池状态是否应该退出。

    后半部分

     boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
    

    这部分就是在线程数加完1后,做的后续操作。封装Worker,启动线程。注意这部分为了保证多线程下的正确性,使用了reentrantLock进行同步。有一点要说一下,按理来说workerCount+1的操作和创建worker并加入workerSet理论上应该做成原子性的过程,但这里是分开的。但是,这里通过finally中的addWorkerFailed方法进行回滚,保证了在后半部分产生异常时的正确性。
    另外我想这里这么搞,主要是为了减小锁的粒度,尽可能多的把不需要放在同步块里的代码剥离出来,提高性能。毕竟jdk并不仅仅为了能用或者正确就行。

    相关文章

      网友评论

          本文标题:java线程池原理简析

          本文链接:https://www.haomeiwen.com/subject/ydknsrtx.html