美文网首页
你了解线程池吗

你了解线程池吗

作者: MxsQ | 来源:发表于2019-09-16 15:08 被阅读0次

    前言

    如果有人问我:“你了解Java线程池吗”,我不打算回答Java中常用的几种线程池,也记不住。从线程池的上层API来看,再多种的线程池,无非是参数的不同,让它们呈现出了不同的特性,那么这些特性到底依赖什么样的原理实现,就更值得去深究,也是本文的目的。

    试着回答以下几个问题:

    • 线程池如何实现
    • 非核心线程延迟死亡,如何做到
    • 核心线程为什么不会死
    • 如何释放核心线程
    • 非核心线程能成为核心线程吗
    • Runnable在线程池里如何执行
    • 线程数如何做选择
    • 常见的不同类型的线程池的功效如何做到

    如果以上问题回答不出一二三,可以借鉴本文。

    基础知识

    要了解线程池,必然涉及到ThreadPoolExecutor。ThreadPoolExecutors实现了线程池所需的最小功能集,已能hold住很多场景。常见的线程池类型,通过Executors提供的API,屏蔽了构造参数细节来创建ThreadPoolExecutors,因为不了解具体参数含义的话,可能拿到的线程池与设想的会有偏差。

    构造参数与对象成员变量

    • corePoolSize:核心线程数,期望保持的并发状态
    • maximumPoolSize:最大线程数,允许超载,虽然期望将并发状态保持在一定范围,但是在任务过多时,增加非核心线程来处理任务。非核心线程数 = maximumPoolSize - corePoolSize
    • workQueue:阻塞队列,存储线程任务Runnable
    • keepAliveTime:在没有任务时,线程存活时间
    • threadFactory:用来构建线程
    • handler:当任务已满,并且无法再增加线程数时,或拒绝添加任务时,所执行的策略

    Worker

    线程池中的工作线程以Worker作为体现,真正工作的线程为Worker的成员变量,Worker即是Runnable,又是同步器。Worker从工作队列中取出任务来执行,并能通过Worker控制任务状态。

    ctl

    ctl用来控制线程池的状态,并用来表示线程池线程数量。在线程池中,有以下五种状态

    • RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务
    • SHUTDOWN:不再接受新任务,但要处理任务队列里的任务
    • STOP:不接受新任务,不再处理任务队列里的任务,中断正在进行中的任务
    • TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程
    • TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕

    状态转换关系如下图


    状态转换.png

    ctl类型为AtomicInteger,那用一个基础如何表示以上五种状态以及线程池工作线程数量呢?int型变量占用4字节,共32位,因此采用位表示,可以解决上述问题。5种状态使用5种数值进行表示,需要占用3位,余下的29位就可以用来表示线程数。因此,高三位表示进程状态,低29位为线程数量,代码如下:

        // 值为29
        private static final int COUNT_BITS = Integer.SIZE - 3;
        // 高三位全为0,低29位全为1,因此线程数量的表示范围为 0 ~ 2^29
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        /**
        因为ctl分位来表示状态和数量,下面几个状态仅看有效位的值
        */
        // 有效值为 111
        private static final int RUNNING    = -1 << COUNT_BITS;
        // 有效值为 000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 有效值为 001
        private static final int STOP       =  1 << COUNT_BITS;
        // 有效值为 010
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 有效值为 011
        private static final int TERMINATED =  3 << COUNT_BITS;
        
        // 默认状态为RUNNING,线程数量为0
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    既然采用了int分位表示线程池状态和线程数量,那么线程池自然提供了方法来获取状态与数量

    • runStateOf(): 获取线程池状态
    • workerCountOf(): 获取工作线程数量

    两函数均为二进制操作,代码不贴,可用下图说明:


    ctl结构与操作.png

    线程池实现

    添加任务

    线程池可以通过submit()、execute()提交线程任务,其中,submit()可以通过Future拿到执行结果,内部也是通过execute()向线程池提交线程任务.

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 获取当前ctl值
            int c = ctl.get();
            // 当前线程数少于最大核心线程数
            if (workerCountOf(c) < corePoolSize) {
                // 添加核心线程,添加线程任务
                if (addWorker(command, true))
                    return;
                // 上面的过程期间,ctl可能已被更改,获取最新值
                c = ctl.get();
            }
            // 线程池状态为RUNNABLE,向工作队列添加任务
            if (isRunning(c) && workQueue.offer(command)) {
                // 再次检查用
                int recheck = ctl.get();
                // 线程不处于RUNNABLE状态,移除任务
                if (! isRunning(recheck) && remove(command))
                    // 执行拒绝任务策略
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    // 执行到这里说明已没有可用的工作线程,创建新的工作现线程
                    // ,并从任务队列里取任务。因为在这个时刻,存在所有工作线程
                    // 都被释放的可能,为了应对这个线程池“假死”的情况,所以创建
                    // 了新的工作线程
                    addWorker(null, false);
            }
            // 添加非核心队列来执行线程任务
            else if (!addWorker(command, false))
                // 说明线程池达到饱和,或者线程池shut down,执行拒绝策略
                reject(command);
        }
    

    当有任务到来时,按照如下策略进行:

    1. 如果当前核心线程数量没达到最大值corePoolSize,创建新线程来执行此任务
    2. 如果当前核心线程到达最大,向阻塞队列添加任务
    3. 如果核心线程已满,阻塞队列已满,尝试开启非核心线程来执行任务
    4. 如果线程池不处于RUNNABLE状态,或者处于饱和状态,执行任务拒绝策略

    线程池是按照上面123的顺序来处理新进的任务的,并且在每一个过程中,会检查ctl的最新值有效性,因为在处理过程中线程池的各种状态随时可能发生了改变。

    不过是通过添加核心或是通过添加非核心线程来执行任务,都是通过addWorker()来完成,下面是代码

       private boolean addWorker(Runnable firstTask, boolean core) {
            // 这个是类似 goto 的语法,代码有效片段是下面第一for循环
            retry:
            for (;;) {
                int c = ctl.get();
                // 获取程序状态
                int rs = runStateOf(c);
    
                /**
                 这一个条件需要仔细理解。
                 1. 当线程处于STOP、TIDYING、TERMINATED时,线程池是拒绝执行任务的
                 因此不需要任务,也不添加线程
                 2. 当线程处于SHUTDOWN状态时,线程池需要把任务处理完,才会到达后面的
                 TIDYING、TERMINATED状态。因此,如果阻塞队列还有任务的话,继续添加
                 线程来加快处理。
                */
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 获取线程数
                    int wc = workerCountOf(c);
                    // 线程数超过或等于能表示的上限
                    // 或 比较 核心线程数达到上限,或比较线程池允许的最大线程数,取决于core
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // CAS操作增加线程数,跳出循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    // 上面的CAS操作没成功,检查线程池状态与开始是否一致,
                    // 如果一致,继续执行此for循环,否则重新执行retry代码块,
                    // 自旋以期CAS成功,后续才能添加线程
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 将线程任务加入Worker,新增了Worker,就是新增了线程
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
    
                        // 再次检查线程池状态
                        // 1. 处于RUNNABLE状态,继续添加线程执行任务
                        // 2. 处于SHUTDOWN状态,到这里说明队列里还有任务要执行
                        // 增加线程期望让任务执行快一点
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 这里说明发生了意外状况,新建的线程不可用
                            if (t.isAlive()) 
                                throw new IllegalThreadStateException();
                            // 添加worker进集合
                            workers.add(w);
                            int s = workers.size();
                            // largestPoolSize可以表示线程池达到的最大并发
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            // 添加线程成功    
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        // 启动新添加的线程
                        t.start();
                        // 线程启动成功
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    // 线程启动失败,移除work,销毁线程
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    以上代码做了如下几件事:

    1. 线程池处于 RUNNBALE 或者处于 SHUTDOWN 并在阻塞队列里还有任务时,需要添加新线程。自旋确保 CAS 成功,然后添加新线程
    2. 线程存于Worker,线程池存有Worker信息,就能访问线程
    3. 线程启动失败,则移除Worker,销毁线程

    addWorkerFailed()操作就不进去看了,首先是将Worker移除,然后通过CAS操作更新ctl,最后调用tryTerminate()操作尝试中止线程池。

    执行任务

    之前的代码开启了新线程并让线程执行,但是没有看到有Runnable提交。之前说过Worker本身为Runnable,并且存有为Thread类型的成员变量。线程池执行的任务的线程,也就是Workder里的Thread。

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            Worker(Runnable firstTask) {
                setState(-1);
                // firstTask就是addWorker()带来的Runnable
                this.firstTask = firstTask;
                // 通过ThreadFactory创建线程,将自己作为Runnable提交
                this.thread = getThreadFactory().newThread(this);
            }
            ......
        }
    

    因此线程执行后,执行的是Worker.run(),run()则调用了ThreadPoolExecutor.runWorker()

       final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock();
            boolean completedAbruptly = true;
            try {
                // task一开始是firstTask, 后面就通过getTask()从阻塞队列里拿任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // 线程池状态检查
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 这个方法子类可以重写,在任务执行前有回调
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 这个方法子类可以重写,在任务执行后有回调
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // 线程池已没有任务了,工作线程达到了可退出的状态,Worker退出
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    线程首个任务为firstTask,之后通过getTask()就从阻塞队列里任务。线程池提供了beforeExecute()和afterExecute()通知子类任务执行前后的回调,让子类有时机能执行自己的事情。如果线程池已没有任务了,工作线程达到了可退出的状态,则将线程退出。

    主要看getTask() 和 processWorkerExit()

        private Runnable getTask() {
            // 超时标志
            boolean timedOut = false; 
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 检查线程池和阻塞队列状态
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    // 减少线程数
                    decrementWorkerCount();
                    return null;
                }
                
                // 获取线程数
                int wc = workerCountOf(c);
    
                // 线程等待方式标志位判断依据
                // allowCoreThreadTimeOut代表核心线程是不是能退出,如果核心线程能退出,就更别说非核心线程了
                // 另一个则是看是否存在非核心线程
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                // 超时,或者并且线程超标超标,返回null,让上一层函数退出线程
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 如果timed为true,则使用poll等待最多keepAliveTime时间获取任务
                    // 如果timed为false,使用take()获取任务,阻塞线程,直到可以从阻塞队列拿到任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    // 超时
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    线程池里的线程从阻塞队列里拿任务,如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程也要在等到keepAliveTime时间后才会释放。如果当前仅有核心线程存在,如果允许释放核心线程的话,也就和非核线程的处理方式一样,反之,则通过take()一直阻塞直到拿到任务,这也就是线程池里的核心线程为什么不死的原因。

    从之前的代码一直看到这,并没有发现有明显的标志来标志核心线程与非核心线程,而是以线程数来表达线程身份。0 ~ corePoolSize 表示线程池里只有核心线程,corePoolSize ~ maximumPoolSize 表示线程池里核心线程满,存在非核心线程。然后,根据区间状态做有差异的处理。可以大胆猜测,线程池实际并不区分核心线程与非核心线程,是根据当前的总体并发状态来决定怎样处理线程任务。corePoolSize是线程池希望达到并保持的并发状态,而corePoolSize ~ maximumPoolSize则是线程池允许的并发的超载状态,不希望长期保持。

    释放线程

    在线程没有拿到任务后,退出线程,通过processWorkerExit()可以证实上述所言。

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 到这里说明线程中断,先通过decrementWorkerCount()减少线程数值
            // 否则,说明是线程没有从阻塞队列获取到线程
            if (completedAbruptly) 
                    decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // completedTaskCount记录线程池总共完成的任务
                // w.completedTasks则是线程完成的任务数
                completedTaskCount += w.completedTasks;
                // 移除Worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            
            // 线程池状态改变,尝试中止线程池
            tryTerminate();
    
            int c = ctl.get();
            // 检查线程池状态,线程池处于RUNNABLE或者SHUTDOWN则进入
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    // 线程池最小数量,取决于是否能释放核心线程
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 如果任务队列还有线程,最起码都要有一个线程来处理任务
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; 
                }
                // 因为线程中断,可能导致没有线程来执行阻塞队列里的任务
                // 因此尝试创建线程去执行任务
                addWorker(null, false);
            }
        }
    

    释放工作线程也并没有区分核心与非核心,也是随机进行的。所谓随机,就是在前面所说的区间范围内,根据释放策略,哪个线程先达到获取不到任务的状态,就释放哪个线程。

    文中多次出现tryTerminate(),但不深入去看了。里边最主要的操作是,发现可以中止线程池时,中止,并调用terminated()进行通知。如果线程池处于RUNNABLE状态,什么也不做,否则尝试中断一个线程。 中断线程则是通过interruptIdleWorker()操作,就不展开了。

    到这里就能能明白线程池的原理的,如下图


    线程池工作原理(水滴篇).png

    线程池里有容纳一定的Worker,Worker中的线程就是线程池中用来执行任务的线程。当有任务加入线程时,根据线程池状态的不同,有不同的步骤。当核心线程未满时,创建新线程来执行;否则将任务加入到阻塞队列;否则创建非核心线程来执行。而线程获取任务的方式有两种,根据线程池容量区间,以及是否可以释放核心线程来使用take()或者poll()来获取任务,其中poll()在一定时间内获取不到任务,则当前线程会被释放。

    当然,在addWorker()方法来有任务添加失败的策略,也就是RejectedExecutionHandler。ThreadPoolExecutor实现了四种策略来进行处理,简单了解即可:

    • CallerRunsPolicy: 如果线程池没有SHUTODOWN的话,直接执行任务
    • AbortPolicy: 抛出异常,说明当前情况的线程池不希望得到接收不了任务的状态
    • DiscardOldestPolicy: 丢弃阻塞队列最旧的任务
    • DiscardPolicy: 什么也不做

    需要注意的是,默认情况下策略为AbortPolicy。

    总结

    做个总结:

    1. 线程池倾向于使用核心线程来处理任务,从任务的添加策略可以看出,先考虑创建核心线程处理,再考虑放到阻塞队列,再考虑创建非核心线程处理。以上都不行,则使用任务拒绝策略
    2. 通过向阻塞队列取任务的不同操作,能确保线程的存活,take()保证核心线程不死,poll()保证非核心线程存活等待一定时间
    3. 线程池不区分核心线程和非核心线程,线程池是期望达到corePoolSize的并发状态,并允许在不得已情况下超载,达到corePoolSize ~ maximumPoolSize 的并发状态
    4. 线程池状态和线程数量用ctl表示,高三位为状态,低29位为当前线程池数量
    5. 线程池对状态的检测非常苛刻,几乎在所有稍微耗时或影响下一步操作正确性的代码前都校验ctl

    线程池中有很多值得学习的东西,线程容量调整的设计、ctl的设计、任务调度的设计等。也有需要更深的储备才能看懂的实现,这里点出,以备近一步学习,如同步器的使用,并发场景的考虑与应用等。

    下面回答开篇提出的问题。

    问答

    线程池如何实现
    总结就是这个问题的答案

    非核心线程延迟死亡,如何实现
    通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡

    核心线程为什么不死
    通过阻塞队列take(),让线程一直等待,直到获取到任务

    如何释放核心线程
    将allowCoreThreadTimeOut设置为true。可用下面代码实验

    // 伪代码
    {
        // 允许释放核心线程,等待时间为100毫秒
        es.allowCoreThreadTimeOut(true);
        for(......){
            // 向线程池里添加任务,任务内容为打印当前线程池线程数
            Thread.currentThread().sleep(200);
        }
    }
    

    线程数会一直为1。 如果allowCoreThreadTimeOut为false,线程数会逐渐达到饱和,然后大家一起阻塞等待。

    非核心线程能成为核心线程吗
    线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。

    Runnable在线程池里如何执行
    线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。

    线程数如何做选择
    这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。

    常见的不同类型的线程池的功效如何做到
    常见的线程池有:

    • CachedThreadPool:适合异步任务多,但周期短的场景
    • FixedThreadPool: 适合有一定异步任务,周期较长的场景,能达到有效的并发状态
    • SingleThreadExecutor: 适合任务串行的场景
    • ScheduledThreadPool: 适合周期性执行任务的场景

    对于如何选择线程池就要看具体的场景,其中的差异通过构造参数可以到达效果,通过之前的分析,就能知道参数的具体作用以及为什么能达到效果。取FixedThreadPool来看,抛砖引玉。

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    nThreads个数量核心线程持续并发任务,没有非核心线程,如果没有任务,则通过take()阻塞等待,不允许核心线程死亡。并且阻塞队列为LinkedBlockingQueue,容量为Integer.MAX_VALUE,可以视为无界队列,更难走到拒绝添加线程逻辑。

    参考

    线程池原理
    彻底理解Java线程池原理篇
    Java线程池---ThreadPoolExecutor中的ctl变量
    JUC锁框架_AbstractQueuedSynchronizer详细分析

    相关文章

      网友评论

          本文标题:你了解线程池吗

          本文链接:https://www.haomeiwen.com/subject/pkesyctx.html