美文网首页
从github的一行代码改动来分析线程池原理

从github的一行代码改动来分析线程池原理

作者: mandypig | 来源:发表于2022-05-28 13:45 被阅读0次

    引言

    最近看了一个开源库的改动,其中里面的一个代码改动引起了我的好奇


    1.jpg

    ,可以看到作者将阻塞队列从LinkedBlockingQueue换成了SynchronousQueue。那么问题来了,作者为什么要进行这种更改。对于线程池的使用,相信大家即使没看过源码,面试中也会不可避免的总会被问到一些问题,背都背会了。但是欠下的技术债迟早哪天是要还的,不明白原理,用起来心里没底。

    再比如这个问题

    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            for (int i = 0; i < 10; i++) {
                int finalI = i;
                poolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        Log.e("mandy", "run" + finalI);
                    }
                });
            }
    

    打印的结果是什么,如果你以为是随机打印,说明你对线程池的理解还没到位,运行下代码就知道结果。

    现在网上都这么多优秀的线程池源码分析文章了,为什么还要去写,其实之所以写这篇关于线程池的源码分析,主要有以下两个原因:
    (1) 证明我看过线程池的源码,没错就是装逼用,不是那种背两个面试题就说自己会线程池的老铁
    (2) 看别人的文章总是会有一些理解不了地方,还需要自己去翻源码进一步消化,不如写一篇给自己看的文章,好记性不如烂笔头。其实这不是我第一次看线程池的源码,一段时间后有些关键实现总会想不起来,把自己疑惑的点写出来,将来直接看自己的文章做到查漏补缺的作用,这才是写这篇文章的主要目的。

    线程池类之间的关系

    下面进入正题,来看下和线程池有关的类

    Executor:最底层的接口,内部只包含一个execute方法
    public interface Executor {
        void execute(Runnable command);
    }
    
    ExecutorService:接口,继承自Executor接口,内部包含了shutDown,submit等线程池使用的方法,看下它的继承关系
    public interface ExecutorService extends Executor
    
    AbstractExecutorService:实现了ExecutorService大部分方法,但没有实现execute方法,该类中提供了一系列的submit方法,线程池要提交任务最终都会调用这个方法,来随便看一个submit方法
    public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    

    通过newTaskFor生成RunnableFuture,然后调用execute执行任务。newTaskFor最终会生成一个FutureTask类,关于FutureTask不就详细说了,之前写过一篇文章分析过Asynctask源码分析,不止于此,感兴趣的可以看看,你可以将FutrueTask当成一个类似Runnable的类可以执行任务。

    所以最核心的还是execute方法的实现,最终的实现类就是ThreadPoolExecutor了。而另一个和ThreadPoolExecutor相关的类就是Executors,内部提供了各种现成的线程池使用,本质上就是一个工具类。到此就将Executor,Executors,ExecutorService,AbstractExecutorService,ThreadPoolExecutor之间的关系表述清楚了。核心还是execute方法。

    ThreadPoolExecutor成员字段

    要想看明白execute方法的源码,就得先来理解下ThreadPoolExecutor中的一些成员变量以及成员字段的含义,等理解之后再看execute方法。看下关键的成员字段

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    线程池有5种状态,分别为RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,所以需要3个比特位才能表示。一个Integer32个比特长度,线程池中通过一个Integer长度来表示工作线程数和线程池的状态,高3位表示状态后29位表示工作线程数量,这种方式类似于android中的MeasureSpec使用方式。

    runStateOf和workerCountOf就是通过位运算得到线程池状态和工作线程数量。ctlOf就是将状态和数量整合成一个Integer。可以看到初始化的时候,ctl为AtomicInteger(ctlOf(RUNNING, 0));,表示线程池处于运行状态,此时工作线程为0个。

    Worker

    上面介绍了线程池中有关的成员,接下来介绍下线程池中一个很重要的成员变量Worker,来看下源码

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            ......
    }
    

    thread成员变量:每一个Worker都可以看成一个thread,当Worker被创建时就会生成一个对应的thread保存到这个变量中。

            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    

    从Worker的继承关系可以看出它是一个AQS的子类,关于AQS可以说是实现并发的核心基础,这里不展开说,网上有很多优秀的分析文章,这里只需要知道AQS可以实现同步操作即可。Worker构造函数通过调用setState(-1)将标志位state设置为-1,在runWorker方法中又通过unlock操作重新将state设置为0。这么做的目的和shutDownNow有一定的关系。

    来看下shutDownNow源码

     public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    

    里面调用了 interruptWorkers();关闭所有的Worker,而interruptWorkers源码就是通过worker的state来决定是否调用中断操作。在worker还没真正运行起来之前state为-1,那么interruptWorkers也就不需要中断worker,而如果运行起来后state设置为0满足关闭条件。

    Worker中的另一个成员变量firstTask,从命名可以看出表示是否属于Worker的第一个task。Worker运行起来后获取的task有两个途径,其一就是在Worker创建的时候通过构造函数自带过来,另一个途径就是从阻塞队列中去取。如果从第二种途径获取到那么firstTask即为null。

    execute源码

    搞明白线程池的成员字段和Worker的作用之后,再来看execute源码就比较清楚了。

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    可以看出道格.利大佬在写代码的时候真的就是言简意赅,每个函数都显得十分的短小精悍,这点在看ReentrantLock和AQS源码的时候也体现的非常明显。回到上述源码中,首先通过ctl获取到当前线程池的状态,然后通过workerCountOf判断worker的工作数量,如果小于corePoolSize则说明worker数量少于指定核心线程数,通过addWorker再去启动线程。

    关于addWorker的大致逻辑就是生成一个Woker对象,然后将worker内部的thread启动起来去处理task,详细源码等下分析,这里知道个大概即可。如果worker数量大于corePoolSize那么直接进入

    if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    

    之前说过线程池有5种状态,只有在running状态并且阻塞队列可以存储元素的时候才会执行内部逻辑,在内部逻辑当中又会进行一次isRunning判断,这种双重判断和单例的doublecheck是不是有些类似,如果发现线程池不是running状态则直接调用拒绝策略对该command进行处理。否则进行workerCountOf(recheck) == 0的判断。这里可以看出只有在worker一个都没启动的时候才会执行addWorker操作,否则就是塞到阻塞队列就完事了。

    这段代码比较关键,可以看出只要worker启动了一个那么addWorker就不会被执行到。换句话说核心线程都已经启动的情况下,只要阻塞队列还能容纳command,那么永远不会addWorker去启动一个临时线程,或者说线程池不允许有核心线程,那么只会启动一个临时线程。直到阻塞队列offer失败才会进入到最后的那段逻辑,而阻塞队列是否能offer成功就和具体的阻塞队列实现有关系了。

     else if (!addWorker(command, false))
                reject(command);
    

    分析完execute之后就会发现addWorker才是问题关键,execute的三个分支都调用到了该方法,如果留心的话会发现一些地方调用的是addWorker(null)而另一些地方调用的是addWorker(command)那么接下来看下addWorker是如何启动一个线程,并源源不断的去执行task。

    addWorker

    先说下addWorker(null)和addWorker(command)的区别,如果阻塞队列还能容纳下的话,那么worker从队列取task执行即可,此时调用addWorker(null),如果队列满了或者说直接启动核心线程那么首个task就不会从队列去取,此时就需要调用addWorker(command)传入需要被执行的task。这也是Worker中firstTask的含义。搞明白这些区别后再看addWorker的源码

    retry:
    for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    

    源码中会首先对线程池的状态进行一些判断,如果线程池调用了shutDown关闭了,那么直接返回,shutdown之后submit的任何task都不会被执行。如果没有shutdown那么继续下面的逻辑判断确保wc是正确的范围之内,然后通过CAS操作将worker的数量加1,表示将有一个新的线程马上要被启动了。最后判断下线程池的状态是否正确。一切正常之后接下来进行启动线程的操作。

     boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
    

    就不一行行分析了,都能看懂,核心就是t.start的调用,启动了一个新的线程。新启动的线程会执行runnable中的run方法,而worker就是一个runnable实现,所以最终调用到了worker的runworker方法。

    runWorker

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    首先将firstTask复制给task,关于firstTask前面都已经分析了,不再累述,unlock方法将原本的-1重置回了0,已经在文章分析过了。核心就是启动的这个线程通过一个while循环去不断的执行task,task来源有两个地方,firstTask或者getTask方法。getTask等下分析,如果task为null则直接调用processWorkerExit结束该线程,内部将worker数量减1。

    重点看下task不为null的情况,如果线程池调用了shutDownNow方法,那么会将该线程设置为中断状态,所以接下来的task.run被执行时养成良好的习惯,判断下线程是否被中断才是正确的处理方式。每次while循环执行一遍后都会重新通过getTask去获取新的task,这也是线程池为什么能高效利用线程的关键。

    getTask

    getTask实际上就是一个可以阻塞的方法,有task直接返回,没有的话就会被阻塞住。阻塞的方法有两种分别为阻塞队列的take和poll,如果是核心线程阻塞那么调用take,该方法会一直阻塞,这也是核心线程为什么不会消亡的原因,而如果是临时线程则调用poll方法阻塞,该方法传入阻塞时间的参数,一定时间后没有获取到task就返回null,这也是临时线程为什么有存活时间的原理。j然后你再去看getTask的源码是不是就豁然开朗了。

    到此线程池的主要源码就分析完毕了,现在再去看创建线程池的那几个参数是不是印象更加深刻了,最后再回到文章开头的两段代码中,去好好理解下CC作者为什么会将LinkedBlockingQueue改成SynchronousQueue,以及下面那段代码的打印顺序为什么是顺序的。如果还不能解释的话,只能说明你在看源码的时候没有好好思考了。

    shutDown和shutDownNow

    最后说下shutDown和shutDonwNow的区别,前者是调用之后不再接受新的task,而原有线程池中的task还会继续被执行。而如果调用的是shutDownNow方法,不仅不会接受新的task,包括线程池原有的task执行都会被中断掉。这些不难理解,结合上述分析再看一次源码就明白了。

    最后再说一点,在我们通过线程池submit任务的时候,去判断下线程的isInterrupted,尽可能的避免被中断的线程执行多余的逻辑不失为一个好的习惯。AQS对于这块的处理就相当的用心。现在自己也写了一篇关于线程池的分析文章,以后想回顾下线程池的原理,再也不用到处去找别人的文章慢慢啃了。

    坚持写作不易,如果对你有帮助点个赞就是最大的支持

    相关文章

      网友评论

          本文标题:从github的一行代码改动来分析线程池原理

          本文链接:https://www.haomeiwen.com/subject/ermiprtx.html