美文网首页
Java并发编程 -- 超通俗易懂的线程池源码分析

Java并发编程 -- 超通俗易懂的线程池源码分析

作者: XinAnzzZ | 来源:发表于2019-11-05 17:32 被阅读0次

    一、概述

    笔者在网上看了好多的关于线程池原理、源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都是一知半解。我想要么是笔者智商捉急,要么就是那些写博客的人以为我很懂所以就大概讲了讲,再或者是作者压根就没认真去讲述线程池。当然多线程以及并发这一块的知识点本身就比较晦涩难懂,但是也不至于找不到一篇文章解惑。于是笔者就下定决心,自己去网上收集资料,自己去买书看那些大神的讲解,然后收集百家之所长,整理一篇不仅适合初学者学习,还适合让老鸟查漏补缺的史上最通俗易懂的线程池知识相关的文章。

    写在最前

    在写文章之前我的大纲里面是有一节实战的,但是最后还是选择了删掉,不是笔者写不下去了,而是实在是文字有点太多了,我怕读者看到文章这么长望而生畏,所以删掉。又怕自己的文笔太差而让读者产生某些误解,所以很多东西都在重复的去说,以至于随便写了写就已经小一万字了,这里和各位读者说声抱歉。笔者写文章的理念就是精简、明确、表达清晰,但是这部分内容实在不太好分开来写,实际上也没有特别多的内容,而且每一小部分我都有相应的总结,如果认真看,看懂应该没问题。最后,还是希望读者能够耐心看完,能够有所收获。

    我知道还有有很多人没有那么多耐心,看不到最后,那么你们就先看总结吧,希望对你们能有一点帮助。

    本文首发于心安-XinAnzzZ 的个人博客,转载请注明出处~

    二、线程池简介

    1) 线程池是什么?

    线程池就是指管理一组同构工作线程的资源池。每次应用程序需要创建一个线程来执行任务的时候不会直接创建线程,而是从线程池中取出线程,线程结束之后也不会直接销毁线程,而是放回线程留给其他任务使用。通过重用现有的线程而不是创建新线程,这样可以避免反复的创建和销毁线程,从而达到节省系统资源的目的。

    2) 线程池的作用

    我们通过一个对比来看一下线程池的作用。假如应用程序需要同时做三件事:读取磁盘文件、分析文件内容、写入数据库。

    • 不使用线程池

      应用程序要手动的继承Thread类或者实现Runnable接口来创建三个线程,分别用于读文件、分析内容。写库。当事情完成的时候,线程结束被销毁。

    • 使用线程池

      应用程序在启动的时候创建线程池,然后这三个任务来了之后,新建三个线程放入到线程池,分别用于执行任务,任务完成不会销毁线程,而是继续放在池中,当其他任务在需要新线程来执行任务的时候可以复用这些线程。

    所以说,合理的使用线程池将会为我们带来以下好处:

    1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就可以立即执行。
    3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

    但是,要做到合理利用线程池,必须对其实现原理了如指掌。

    3) 线程池是如何实现的

    Java 中万物皆对象,线程池也是一个对象,在 Java 中使用java.util.concurrent.ThreadPoolExecutor这个类来实现线程池,这是线程池框架的最核心的类,也是后面我们分析线程池源码的核心对象,我们提前简单认识一下。既然是池,那就意味着它是一个容器,那么它是一个什么样的容器呢?阅读ThreadPoolExecutor类的源码可以发现它内部有一个类型为HashSet<Worker>workers字段,这个就是用来保存线程的容器。可以看见这个容器装的元素类型为Worker类型,这个是ThreadPoolExecutor的一个内部类,它实现了Runnable接口,也就是说它就是一个线程类。那么我们大体上就应该明白,每次需要新线程的时候就会创建一个Worker对象,然后加入到这个Set中。下面我说一下线程的工作流程再配以故事和图解:

    4) 线程池是如何工作的

    • 线程池的组成部分(最少具有以下四个部分)

      1. 线程池管理器:用于创建和并管理线程

      2. 工作线程:线程池中的线程

      3. 任务接口:每个任务必须实现的接口,用于工作线程调度执行

      4. 任务队列:用于存放待处理的任务,或者称为工作队列。

    • 再说几个常见的概念,如果觉得概念性的东西不清楚,可以先看下面的工作流程,结合实际来理解这些概念。

      1. corePoolSize:核心线程数(有些资料称为基本池大小,只是称呼问题而已),这个指的是在线程池创建的时候指定的线程数量。当提交一个任务到线程池的时候,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于基本大小就不会立即创建新的线程。
      2. maximumPoolSize:最大池大小,这个就是线程池最大的线程数量。它和基本大小的区别简单来说就是,正常情况下,池大小等于核心池数量,但是任务特别多,线程池特别忙的时候,就再多创建几个线程来"帮忙",但是无论如何都不能大于最大池大小,有些资料把这些"帮忙"的线程称之为扩展线程池。当线程池空闲的时候会销毁掉部分"帮忙"的线程使池大小恢复到核心池大小,我理解的意思就是卸磨杀驴233333。这里再多说一句,只有一个池,核心池和扩展池只是逻辑上的概念,实际上它们都在一个池中,也就是上面说的那个 HashSet
      3. ThreadFactory:线程工厂,可以通过线程工厂来给每个创建的线程设置有意义的名字。
      4. RejectedExecutionHandle:饱和策略,当线程池满了,并且任务队列满了,对于新提交的任务的处理策略,默认情况下是 AbortPolicy,表示无法处理新任务时抛异常。简单理解就是,活太多,老子要罢工了,那么罢工的方式是啥呢,默认情况抛异常,当然也提供了其他的策略,这个后面我们再详细了解。
    • 线程池的工作流程

      当线程池创建之后会有任务提交给线程池来处理,那么线程池是如何处理的呢?我们看一下具体流程:

      1. 提交一个新任务,判断池中线程数量是否小于线程池的核心池大小(corePoolSize),如果小于,就创建一个新的线程来执行这个任务。否则,也就是线程数量已经大于等于核心池大小,那么进入下一步。
      2. 判断任务队列是否是否已满,如果任务队列没满,就存放在任务队列中。等着工作线程一个一个的从任务队列中取出任务来执行。如果队列已满,进入下一步。
      3. 判断线程池大小是否达到最大池大小(maximumPoolSize),如果未达到,则创建新的线程来处理任务。否则,也就是线程池已达到最大大小,则采取饱和策略。
      4. 当任务被执行完,线程池比较空闲的时候就会把大于核心线程池数量的那部分线程池(扩展线程池)中的线程销毁掉。

      所以提交任务的顺序是:核心线程池—任务队列—扩展线程池。请看以下流程图:

      image
      上面的流程其实已经很简单明了了,但是为了方便读者理解,笔者再通过一个现实场景来模拟线程池的运行过程。
    • 外卖员送外卖

      小明家楼下有一家炸鸡店(读者可能会疑惑为什么是炸鸡店呢?难不成线程池和炸鸡之间有着某种不可告人的秘密?别想太多,单纯是因为笔者爱吃炸鸡),每天都有很多外卖单子需要外卖小哥来送。那么这里面"外卖"就是“任务”,外卖小哥就是"线程",外卖小哥送外卖就是线程执行任务,外卖送不完就在店里面排着队等着外卖小哥来送,这个外卖排着的队伍就叫"任务队列"。

      1. 最开始情况下,生意不是很好,偶尔来一个外卖就叫一个外卖小哥来送外卖(这就类比应用程序有一个任务,就起一个线程来执行任务)。外卖送完了,外卖小哥就下班了(线程销毁了)。
      2. 后面生意慢慢好起来了,老板发现,每次外面小哥送完就下班,再来单子又要雇一个外卖小哥,麻烦的要死,还花很多冤枉钱,于是老板就雇了一个外卖团队(线程池),然后老板为了节省钱,同时为了应对偶尔的外卖高峰期,决定了团队就 10 个人(corePoolSize),但是高峰期的时候允许请5个临时工,共 15 人(maximumPoolSize)。
      3. 这时候外卖的运营就是类似上面的流程了,来一个外卖单子,老板就看一下,外卖团队有闲人吗?有,那就去送外卖。如果这 10 个人去送外面了,那就把单子先放在店里排队排起来(这个就是任务队列),等着某个小哥送完了手里的单子,就从队列中取外卖单继续送。
      4. 到了中午,外卖单子越来越多,外卖单子的队伍也排的越来越长,老板觉得不能这样,这样用户等太久了会差评的,就规定,最多 50 个外卖单子排队,再多了就请临时工,但是上面规定了,最多再请 5 个临时工。
      5. 于是当再来单子的时候就请临时工来送,但是单子实在太多,就算 15 个人也送不过来,外卖小哥累成狗,决定要罢工,再来单子老子不干了。再来单子的时候,队伍也满了,外卖员人数也满了,没办法,老板只能打电话告诉买家,抱歉啊,暂时不接单了,麻烦申请退款一下吧(拒绝策略)。
      6. 高峰期过去了,外卖单子都送完了,好多外卖员也歇着了,老板说这也不能白养着这群人啊,把临时工辞退了吧。嗯,没错这就是上面我说的,卸磨杀驴。

    三、线程池源码分析

    通过上面的讲解,相信读者已经能够明白线程池是什么、能做什么以及如何做的。那么下面就结合源码来剖析线程池的工作原理。以下所有源码均来自java.util.concurrent包下,这个包通常被简称为J.U.C。本文使用源码版本为Java8.

    1) Execuor 框架

    image
    • Executor

      public interface Executor {
          void execute(Runnable command);
      }
      

      顶级接口,虽然只有一个简单的方法,但是它是 Executor 框架的基础,它将任务的提交和执行解耦。这个Execute方法就是用来提交任务,线程池需要重写这个方法来实现提交任务的逻辑。

    • ExecutorService

      它是对Executor的扩展,增加了一些管理线程生命周期的方法和任务生命周期的方法。

    • AbstractExecutorService

      它是对ExecutorService的抽象实现,不是本文分析的重点。

    • ThreadPoolExecutor

      Java 线程池的核心实现,本文分析的重点。

    2) ThreadPoolExecutor源码分析

    • 核心成员变量解读

      // 以下所有中文注释为笔者添加,英文注释为作者添加
      // ctl 打包了 runState 和 workerCount
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      // Integer.SIZE = 32 - 3 = 29,29 个比特位
      private static final int COUNT_BITS = Integer.SIZE - 3;
      // 池最大线程数量,大概 5 亿
      private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
      
      // runState is stored in the high-order bits
      // 线程池运行状态存储在高三位中,作者注释真的很清晰
      private static final int RUNNING    = -1 << COUNT_BITS;
      private static final int SHUTDOWN   =  0 << COUNT_BITS;
      private static final int STOP       =  1 << COUNT_BITS;
      private static final int TIDYING    =  2 << COUNT_BITS;
      private static final int TERMINATED =  3 << COUNT_BITS;
      
      // Packing and unpacking ctl
      // 下面三个方法是用来打包和拆包 ctl
      // 拆包 runState
      private static int runStateOf(int c)     { return c & ~CAPACITY; }
      // 拆包 workerCount
      private static int workerCountOf(int c)  { return c & CAPACITY; }
      // 打包 runState 和 workerCount
      private static int ctlOf(int rs, int wc) { return rs | wc; }
      

    关于这些成员变量的含义,在ctl变量的注释中作者已经进行了详细的解释说明,如果你懂这些成员的意义并且你的英文能力不错的话,那么这个注释你读完一遍你就会发现,哇,作者写的真棒,但是如果你不懂或者你英文很烂,你就会发自肺腑的说一句,这什么破玩意。。

    ok,不扯淡,笔者来解读一下作者的注释。首先,这个ctl它"打包"(原文是"packing")了用来表示线程池工作线程数量线程池运行状态的两个值。如何打包的呢?一个int型数据有 32 位,ctl的高 3 位表示线程池状态,低 29 位表示线程数量29 位大概可以表示 5 亿个线程。为什么是 3,而不是别的值呢?因为线程池状态有 5 种,分别为RUNNINGSHUTDOWNSTOPTIDYING以及TERMINATED,如果小于 3 位则不够表示 5 种状态,大于 3 位又浪费。到这里前面的成员变量的意义没什么问题了。为什么用一个 int 表示两个状态呢?作者的解释是更快更简单,我觉得不仅如此,还更装逼,嗯没错。熟悉读写锁ReadWriteLock的大神肯定清楚,读写锁也是用一个int 来分别表示读写锁状态。

    再看一下下面三个方法,作者的注释翻译过来是:打包和拆包ctl,也就是我想获取runState咋获取,调用runStateOf,然后传入当前的ctl就可以了。再简单理解就是runStateworkerCount这俩玩意的gettersetter。下面是线程池不同状态对应的数值及其意义:

    runState 对应的高三位的数值 原文 翻译
    RUNNING 111 Accept new tasks and process queued tasks 接受新任务并且处理队列中的任务
    SHUTDOWN 000 Don't accept new tasks, but process queued tasks 不接受新任务,但是处理队列中的任务
    STOP 001 Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks 不接受新任务,不处理队列中的任务,并且会中断正在执行的任务
    TIDYING 010 All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method 所有任务都已经结束,workerCount = 0,线程转换到 TIDYING 状态,将会执行 terminated()钩子函数
    TERMINATED 011 terminated() has completed 钩子函数 terminated()执行完毕

    所以,总结一下,上面的成员变量和三个辅助函数就是为了表示线程数量和线程池状态。下面看一下构造方法:

    • 其他成员

      // 核心池大小
      private volatile int corePoolSize;
      // 最大池大小
      private volatile int maximumPoolSize;
      // 阻塞队列,存放任务的队列
      private final BlockingQueue<Runnable> workQueue;
      // 存放 worker 线程的集合
      private final HashSet<Worker> workers = new HashSet<Worker>();
      // 最大池大小,区分与 maximumPoolSize
      // largestPoolSize 只是记录池中的线程数量曾经达到的最大值
      // 而 maximumPoolSize 是创建线程池时候指定的对于池大小的限制
      private int largestPoolSize;
      // 线程空闲的时间,上面说的卸磨杀驴等待的时间
      private volatile long keepAliveTime;
      // 完成任务数量
      private long completedTaskCount;
      // 线程工厂
      private volatile ThreadFactory threadFactory;
      // 拒绝策略,默认提供了四种拒绝策略,都不太好用,不详细说了
      private volatile RejectedExecutionHandler handler;
      // 默认的拒绝策略,抛出运行时异常,实际生产还是要自己实现拒绝策略
      private static final RejectedExecutionHandler defaultHandler =
              new AbortPolicy();
      
    • 构造函数

      源码有四个构造函数,但是三个都是重载,看下面这一个就可以了:

      public ThreadPoolExecutor(int corePoolSize, // 核心池大小
                                int maximumPoolSize, // 池最大线程数
                                long keepAliveTime, // 存活时间
                                TimeUnit unit, // 时间单位
                                // 阻塞队列,也就是任务队列,称为工作队列也行,whatever
                                BlockingQueue<Runnable> workQueue, 
                                ThreadFactory threadFactory, // 线程工厂
                                RejectedExecutionHandler handler // 拒绝策略) {
            // 篇幅有限,省略若干代码
      }
      

      简单解释一下存活时间,就是前面所说的,线程池空闲之后会把超过核心线程池部分的线程干掉,但是不是立马干掉,还是有个缓冲期的,这个就是这个缓冲期,配合下面的时间单位使用。构造器内部也没有特别的逻辑,相信聪明的读者看一眼源码就懂。

    • 其他常见方法

      /**
       * 关闭线程池,调用后已提交的任务会继续执行,但是不再接受新任务
       * 也就是说,该方法被调用后线程池状态变为 SHUTDOWN 状态
       * 如果该方法被调用多次不会产生副作用
       */
      public void shutdown() { /* 省略方法体 */ }
      
      /**
       * 尝试停止所以正在执行的任务,并且将任务全部移除
       * 该方法被调用之后线程池状态变为 STOP
       */
      public void shutdownNow() { /* 省略方法体 */ }
      

    3) 核心方法源码分析

    根据前面的流程分析,线程池核心就是提交任务,然后添加核心线程,添加到任务队列等等,一切的一切都始于提交任务,所以我们最先要分析的就是提交任务的方法execute(Runnable command),但是大概看一眼源码可以发现,这个方法本身只有一些逻辑判断,然后根据不同的逻辑去调用其他逻辑方法,而最多调用的是添加worker的方法addWorker(Runnable firstTask, boolean core)

    所以想要理解线程池原理就要看懂execute方法,想看懂execute方法就要先看懂addWorker方法。下面我们就来分析一下addWorker方法,然后再分析execute方法。

    • addWorker()源码分析

      源码中作者为这个方法添加了很多的注释,这里笔者通过翻译软件以及结合源码说一下自己的理解:

      首先说一下方法的参数,第一个参数是firstTask,这个比较简单,就是新创建的worker,前文已经说过,他就是线程对象,那么它执行的第一个任务,通过这个参数来指定,可以指定为null。简单来说,当workerCount小于corePoolSize或者队列已满需要创建扩展线程时,都将新提交的任务直接指定给新创建的线程,而不是让这个任务去排队。

      第二个参数是Boolean core,也就是指定要创建的新的worker是不是核心线程。这个很简单,这个参数在源码中就用到一次。下面源码里面有介绍。

      然后方法的作用就是添加一个worker,作者在注释中写道,根据当前的池状态以及池大小边界(核心池大小或者最大池大小)来检查是否可以添加新的worker,如果可以就添加并且修改workerCount,同时如果可能的话,将firstTask作为这个worker的第一个任务来执行。如果因为池状态或者无法创建线程等原因创建失败,返回 false。下面结合源码分析:

      private boolean addWorker(Runnable firstTask, boolean core) {
          retry:
          for (; ; ) {
              int c = ctl.get();
              // 获取到池运行状态
              int rs = runStateOf(c);
      
              // Check if queue empty only if necessary.
              // 如果运行状态大于等于 SHUTDOWN,也就是说池处于非 RUNNING 状态
              // 并且 !(状态等于 SHUTDOWN的同时 fistTask 为空且队列不为空)
              // 符合以上条件,返回添加失败
              if (rs >= SHUTDOWN &&
                      !(rs == SHUTDOWN &&
                              firstTask == null &&
                              !workQueue.isEmpty()))
                  return false;
      
              for (; ; ) {
                  // 获取工作线程的数量
                  int wc = workerCountOf(c);
                  // 如果数量大于等于池最大线程数量
                  // 或者说,如果大于当前池限制的池大小(如果是核心池就是核心池大小,否则,就是最大池大小)
                  // 那么也返回 false 添加失败
                  // 也就是说你在调用这个方法的时候就需要判断当前池大小
                  // 如果当前池大小小于核心池大小,那么你添加的就是核心线程,传递 true,否则 false
                  // 这个最大数量和 maximumPoolSize 不一样,具体请看前面的源码解读
                  if (wc >= CAPACITY ||
                          wc >= (core ? corePoolSize : maximumPoolSize))
                      return false;
                  // CAS 尝试增加 workerCount,注意,是先增加数量,而实际上还没有增加 worker
                  // 这个也和作者注释中描述的一样,先检查能否添加,然后增加 worker count
                  // 并且如果可能,新建 worker 并且启动它
                  if (compareAndIncrementWorkerCount(c))
                      // 如果增加成功,那么就跳出 retry到第 41 行代码
                      break retry;
                  // 如果没增加成功,说明 ctl 被其他线程更改了,那就重试
                  c = ctl.get(); // Re-read ctl
                  if (runStateOf(c) != rs)
                      continue retry;
                  // else CAS failed due to workerCount change; retry inner loop
              }
          }
      
          // 上面增加 worker count成功,就走到了这儿,开始尝试创建 worker
          // 新建两个状态标记,分别表示 worker 是否已添加和已启动
          boolean workerStarted = false;
          boolean workerAdded = false;
          Worker w = null;
          try {
              w = new Worker(firstTask);
              final Thread t = w.thread;
              if (t != null) {
                  // 上锁,因为 HashSet 不是线程安全的
                  //如果不了解 ReentrantLock 可以简单认为这个try catch 被 synchronized块 包裹
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  try {
                      // Recheck while holding lock.
                      // Back out on ThreadFactory failure or if
                      // shut down before lock acquired.
                      int rs = runStateOf(ctl.get());
      
                      // 再次确认线程池状态 小于 SHUTDOWN 说明是 RUNNING 状态
                      // 或者已经是 SHUTDOWN 状态同时 firstTask 为 null
                      if (rs < SHUTDOWN ||
                              (rs == SHUTDOWN && firstTask == null)) {
                          if (t.isAlive()) // precheck that t is startable
                              throw new IllegalThreadStateException();
                          // 将新创建的线程添加到 workers 去
                          workers.add(w);
                          int s = workers.size();
                          if (s > largestPoolSize)
                              largestPoolSize = s;
                          workerAdded = true;
                      }
                  } finally {
                      mainLock.unlock();
                  }
                  if (workerAdded) {
                      t.start();
                      workerStarted = true;
                  }
              }
          } finally {
              if (!workerStarted)
                  // 如果因为某种原因启动失败,就调用这个方法
                  // 这个方法主要是将新添加的 worker 从池中移除并且将 workerCount 减一
                  addWorkerFailed(w);
          }
          return workerStarted;
      }
      

      上面的源码看起来很复杂,但是其实仔细看看很多都是循环操作、状态判断操作、加锁解锁操作。实际上核心操作总结起来就三步:

      1. 通过判断当前池状态以及传递的参数状态,来决定是否添加 worker,如果此时的状态不能够添加,返回false
      2. 可以添加就尝试使用CAS来增加workerCount,如果因为别的线程更改了ctl变量而导致增加失败,就回到第一步重试。如果增加成功,进入下一步。
      3. 数量增加成功,创建新的worker。为了保证线程安全,进行了加锁操作,可以忽略。然后继续各种判断,池状态、worker状态等等,如果都没问题,把worker添加到池中,并且尝试启动。这里面如果出现问题,那就把worker从池中移除,并且将workerCount减一,返回false
    • execute()源码分析

      上面的逻辑看明白了,这个方法也就没太多难点,直接看源码以及注释:

      public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          int c = ctl.get();
          // 如果 当前的线程数量 < 核心池大小 就添加一个 Worker
          if (workerCountOf(c) < corePoolSize) {
              // 把正在提交的任务作为新建的 worker 的第一个任务,并且标识是核心线程
              if (addWorker(command, true))
                   // 添加成功就结束了
                   return;
              // 如果没添加成功,重新获取ctl
              c = ctl.get();
          }
          // 走到这儿说明核心池已满,按照最上面的流程分析,
          // 此时可能添加任务到任务队列,可能新建 “扩展线程池”的 worker 来处理
          // 也可能采取拒绝策略
      
          // 这里判断如果还是运行状态,并且成功添加到工作队列
          if (isRunning(c) && workQueue.offer(command)) {
              // 再次检查状态(笔者内心:多线程就是蛋疼,一直检查,就怕别人改了。。。)
              int recheck = ctl.get();
              // 如果不是运行状态,那么就把任务移除 并且拒绝掉这个任务
              if (!isRunning(recheck) && remove(command))
                  reject(command);
              // 代码走到这里就说明可能是运行状态或者移除任务失败,再次检查workerCount
              else if (workerCountOf(recheck) == 0)
                  // 上面源码已经分析过了
                  addWorker(null, false);
          } else if (!addWorker(command, false))
                reject(command);
      }
      

      到这里,该创建worker也创建了,该提交任务到队列也提交了,外卖员有了,外卖也"提交"了,下一步就应该是如何送外卖了,ok,我们来看看这个worker如何工作的。

    4) Worker:工人是如何工作的。

    前面已经简单的介绍了一下,WorkerRunnable的子类,也就是线程类。那么我们先看看它的结构。

    • 类结构

      image

      首先,Worker实现了两个接口,一个是AQS同步器接口,一个是RunnableAQS是为了实现自己的同步策略,这里思考一下,为什么不直接用ReentrantLock呢?答案是线程执行任务时是不允许其它锁重入进来的,而前者可重入,所以不可用。

      同步相关的方法不是我们讨论的核心,所以我们不用考虑,所以,就主要看run方法就行了。源码里面run方法调用了runWorker方法,下面分析一下这个方法。

    • runWorker 方法源码分析

      final void runWorker(Worker w) {
          Thread wt = Thread.currentThread();
          Runnable task = w.firstTask;
          w.firstTask = null;
          // 这里的操作是为什么呢?其实看一下`Worker`的构造方法可以发现
          // 构造的时候有一句"setState(-1);" 这个是 AQS,我这里不具体分析,只解释作用
          // 这句话后面,作者注释 inhibit interrupts until runWorker 意思是 禁止中断线程,直到 runWorker
          // 这里 unlock 后面注释 allow interrupts  就是允许中断
          // ok,这应该就明白了,创建 worker 的时候设置禁止中断,runWorker 之后设置允许中断
          w.unlock(); // allow interrupts
          boolean completedAbruptly = true;
          try {
              // 这段代码很重要,如果 task 不为空或者 getTask 不为空!!getTask 就是从任务队列取任务
              // 这就说明 worker 一直在循环从任务队列取任务来执行
              while (task != null || (task = getTask()) != null) {
                  w.lock();
                  // If pool is stopping, ensure thread is interrupted;
                  // if not, ensure thread is not interrupted.  This
                  // requires a recheck in second case to deal with
                  // shutdownNow race while clearing interrupt
                  // 这里不解释了,直接翻译作者的注释
                  // 如果线程池已经停止,确保线程已经被中断,如果没有停止,确保线程不被中断.
                  // 在第二种情况下需要重新检查来处理 因为调用了 shutdownNow 方法而产生的竞争
                  if ((runStateAtLeast(ctl.get(), STOP) ||
                          (Thread.interrupted() &&
                                  runStateAtLeast(ctl.get(), STOP))) &&
                          !wt.isInterrupted())
                      wt.interrupt();
                  try {
                      // 在执行任务之前要执行的操作,默认情况下是空实现,什么都不做,如果子类有特殊的需求可以重写
                      beforeExecute(wt, task);
                      Throwable thrown = null;
                      try {
                          // 这里调用的是 run!!不是 start 方法!!!我想读者在最开始学习创建线程的时候,应该都看过线程的 run 方法和 start 方法的区别
                          // run 方法只是一个普通的方法,而 start 才是启动线程的方法
                          // 所以这里需要注意,这里是 worker 来执行任务,而不是让 worker 来启动新的线程来执行任务
                          // 所以调用的是 run 而不是 start,如果调用 start,那岂不是等于说,线程池里面每个任务都会新创建一个线程?
                          task.run();
                      } catch (RuntimeException x) {
                          thrown = x;
                          throw x;
                      } catch (Error x) {
                          thrown = x;
                          throw x;
                      } catch (Throwable x) {
                          thrown = x;
                          throw new Error(x);
                      } finally {
                          // 默认空实现,什么都不做
                          afterExecute(task, thrown);
                      }
                  } finally {
                      task = null;
                      w.completedTasks++;
                      w.unlock();
                  }
              }
              completedAbruptly = false;
          } finally {
              // 到这里说明任务全部完成,结束线程
              processWorkerExit(w, completedAbruptly);
          }
      }
      

      总结起来三个步骤:

      1. 循环取任务来消费,调用getTask方法取任务,调用run方法执行任务。
      2. 如果线程池正在停止,则中断线程。
      3. 取到的任务为null,跳出循环,移除线程(processWorkerExit方法会执行相应的逻辑,具体不分析)。

    5) 总结

    在第二部分线程池简介的时候我们已经分析详细的描述了线程池的工作流程,但是那只是理论,这一节我们通过代码具体的了解到了线程池的运行原理。总结起来主要三个东西:

    1. 提交任务

      execute方法提交任务,然后根据池状态来判断是否接受任务,不接受采用拒绝策略;能够接受任务,判断是需要创建新的worker还是直接加入到任务队列;

    2. 添加 worker

      通过retry来不断地尝试,判断能否添加,不能返回false;能的话就尝试增加workerCount;然后创建worker,然后启动。

    3. 执行任务

      线程循环从任务队列取出任务来执行,直到队列为空。

    四、总结

    • 思维导图:

      思维导图
    • 线程池的作用:

      1. 降低资源消耗
      2. 提高响应速度
      3. 提高线程的可管理性
    • ThreadPoolExecutor 重要成员:

      1. 使用一个 AtomicInteger变量 ctl来表示 workerCount(工作线程数量)和 runState(线程池运行状态)。
      2. corePoolSize:核心线程数量。
      3. maximumPoolSize:线程池最多线程数量。
      4. workers:线程集合,存放工作线程。
      5. workQueue:工作队列,或称为任务队列。
      6. handler:类型为 RejectedExecutionHandler,拒绝策略。
      7. threadFactory:线程工厂。
      8. 线程池五种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
    • 线程池的整体运行流程:

      image
      1. 创建线程池,并且通过execute方法往线程池中提交多个任务。
      2. 此时线程池线程数量比较少,线程池不断创建核心线程来处理任务,直到线程数量等于corePoolSize
      3. 当任务很多,所有核心线程都在处理任务时,新提交的任务没有线程处理,则放入到工作队列等待工作线程来处理。
      4. 工作线程处理完成一个任务之后去工作队列取任务来执行,直到队列为空,结束线程。
      5. 如果一直往工作队列中提交任务导致工作队列满了,就继续创建线程来处理任务,直到线程数量等于maximumPoolSize
      6. 线程数量已经达到最大限制并且队列满了,就会采取拒绝策略,默认抛异常。

    五、参考

    相关文章

      网友评论

          本文标题:Java并发编程 -- 超通俗易懂的线程池源码分析

          本文链接:https://www.haomeiwen.com/subject/mqjebctx.html