美文网首页
线程池源码剖析

线程池源码剖析

作者: 竺旭东 | 来源:发表于2020-10-24 18:51 被阅读0次

    ## 前置说明

    ```

    所有的源码基于JDK11.0.2

    ```

    ## 如何使用线程池呢?

    ```java

    public class WeChatBlogDemos {

    @Test

      public void useThreadPool() throws InterruptedException {

          // 创建线程池

          ExecutorService executorService = Executors.newFixedThreadPool(4);

          // 往线程池提交任务,等待异步执行

          executorService.submit(() -> System.out.println("hello world"));

          // 关闭线程池

          executorService.shutdown();

          // 阻塞 2 秒等待线程池终止完成

          executorService.awaitTermination(2,TimeUnit.SECONDS);

      }

    }

    ```

    ## 为什么使用线程池呢?

    ```

    1)每个Java线程都会对应一个操作系统的工作线程,频繁的创建和销毁线程会有很大的开销,线程池能提高线程的复用率,避免线程频繁的创建和销毁,

    2)线程池空闲时能自动缩小容量,防止消耗过多的系统资源,避免资源浪费。

    ```

    ## 当我往线程池里提交一个任务时,发生了什么【ThreadPoolExecutor.execute】?

    ```

    1)外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程,任务提交失败

    2)线程池处于 RUNNING 状态

        1)线程池的工作线程数 < 核心线程数,则新增工作者线程进行任务处理

        2)线程池的工作线程数 = 核心线程数,往任务队列里提交成功

        3)核心线程数 <= 线程池的工作线程数 < 最大线程数 && 往任务队列里提交失败,则新增工作者线程进行任务处理

        4)线程池的工作线程数 = 最大线程数 && 往任务队列里提交失败,通过拒绝策略处理任务

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    /**

        * 用于创建工作者线程的线程工厂

    */

    private volatile ThreadFactory threadFactory;

    /**

        * 线程池饱和或关闭时,用于拒绝任务的拒绝执行处理器

    */

    private volatile RejectedExecutionHandler handler;

    /**

        * 空闲工作线程等待任务的最大纳秒数

    */

    private volatile long keepAliveTime;

    /**

        * true:核心工作者线程空闲时也会推出

        * false:核心工作者线程空闲时不退出

    */

    private volatile boolean allowCoreThreadTimeOut;

    /**

        * 核心工作线程数,限制数量为2^29-1

    */

    private volatile int corePoolSize;

    /**

        * 最大工作线程数,限制数量为2^29-1

    */

    private volatile int maximumPoolSize;

    /**

        * 默认的拒绝执行处理器

    */

    private static final RejectedExecutionHandler defaultHandler =

            new AbortPolicy();

    /**

        * 核心工作者线程数

    */

    private volatile int corePoolSize;

    /**

        * 最大工作者线程数

    */

    private volatile int maximumPoolSize;

    /**

        * 任务队列,

        * 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务

        * 2)否则使用 workQueue.take() 读取任务

    */

      private final BlockingQueue<Runnable>workQueue;

    /**

        * 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁

    */

      private final ReentrantLock mainLock = new ReentrantLock();

    /**

        * 线程池中的工作者线程集合,只有在持有 mainLock 锁时才能访问

    */

      private final HashSet<Worker> workers = new HashSet<>();

    /**

        * 跟踪线程池同时存在的最大工作线程数

    * Accessed only under mainLock.

    */

    private int largestPoolSize;

    /**

        * 往线程池提交一个 Runnable 任务,

        * 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。

    */

    @Override

        public void execute(Runnable command) {

            if (command == null) {

                throw new NullPointerException();

            }

            // 读取控制变量

            int c = ctl.get();

            // 1)线程池工作线程数 < 核心线程数

            if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) {

                // 尝试创建一个新的工作者线程来处理这个任务

                if (addWorker(command, true)) {

                    // 创建成功则直接返回

    return;

                }

                // 创建失败,则重新读取控制变量

                c = ctl.get();

            }

    /**

            * 大前提:

            * 1)外部触发了线程池停止

            * 2)工作者线程本身创建失败了

            * 3)当前工作者线程数 >= 核心工作者线程

    *

            * 线程池处于 RUNNING 状态 && 尝试向任务队列中提交任务

    */

            if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) {

                final int recheck = ctl.get();

    /**

                * 任务成功提交到任务队列

                * 1)如果线程池正在停止,则尝试帮助终止线程池,并将任务从工作队列中移除

                * 2)线程池处于 RUNNING 状态,但是没有可用的工作者线程了,则尝试添加一个新的工作者线程

    */

                if (!ThreadPoolExecutor.isRunning(recheck) && remove(command)) {

                    // 执行拒绝处理器

                    reject(command);

                } else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) {

                    // 尝试添加一个新的工作者线程

                    addWorker(null, false);

                }

    }

    /**

            * 大前提:

            * 1)外部触发了线程池停止

            * 2)线程池处于 RUNNING 状态 && 当前工作者线程数 >= 核心工作者线程 && 任务往队列中提交失败了【如队列已满】

    *

            * 尝试新增一个工作者来处理任务

    */

            else if (!addWorker(command, false)) {

    /**

                * 最大可能性

                * 1)外部触发了线程池停止

                * 2)线程池处于 RUNNING 状态 && 工作者线程数>=  maximumPoolSize

                * 执行拒绝策略

    */

                reject(command);

            }

    }

    /**

        * 读取线程池的工作线程数

    */

        private static int workerCountOf(int c)  { return c & ThreadPoolExecutor.COUNT_MASK; }

    /**

        * 尝试增加一个核心工作者线程来处理这个任务

        * 什么时候会新增失败?

        * 1)线程池状态在 STOP 及以上

        * 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空

        * 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空

        * 4)core=true:工作者线程数 >= 核心线程数【corePoolSize】

        * 5)core=false: 工作者线程数 >= 最大线程数【maximumPoolSize】

        * 6)线程池本身原因工作者线程启动失败

    */

      private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

          for (int c = ctl.get(); ; ) {

    /**

              * 外部系统通过 shutdown 或 shutdownNow 显式触发了线程池的关闭流程

              * 1)线程池状态在 STOP 及以上

                * 2)线程池处于 SHUTDOWN 状态并且提交的任务不为空

                * 3)线程池处于 SHUTDOWN,提交的任务为空,并且工作队列也为空

    */

            if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)

                  && (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP)

    || firstTask != null

                  || workQueue.isEmpty())) {

                // 不允许创建新的工作者线程

    return false;

            }

            for (; ; ) {

    /**

                * 1)工作者线程数已经 >= 核心线程数【任务队列未满时】

                * 2)工作者线程数已经 >= 最大线程数【任务队列已满时】

    */

                if (ThreadPoolExecutor.workerCountOf(c)

                      >= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) {

                        // 不允许创建新的工作者线程

    return false;

                }

                // 尝试递增工作者线程数

                if (compareAndIncrementWorkerCount(c)) {

                  // 工作者线程数递增成功,退出循环

    break retry;

                }

                // 由于并发问题,其他线程优先递增了计数值,则重新读取计数值并重试

                c = ctl.get();

                // 线程池正在关闭,则重新进入循环后将直接退出

                if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)) {

    continue retry;

    // else CAS failed due to workerCount change; retry inner loop

                }

    }

    }

          // 1)阶段一:工作者线程是否已经添加到 workers 集合中

    boolean workerAdded = false;

          // 2)阶段二:工作者线程是否成功启动

    boolean workerStarted = false;

    Worker w = null;

          try {

            // 创建工作者线程

            w = new Worker(firstTask);

            // 读取线程对象

    final Thread t = w.thread;

            if (t != null) {

    final ReentrantLock mainLock = this.mainLock;

                // 占用锁

                mainLock.lock();

                try {

    /**

    * Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired.

                    * 读取控制变量再次进行校验

    */

                  final int c = ctl.get();

    /**

                    * 1)线程池处于 RUNNING 状态

                    * 2)线程池处于 SHUTDOWN 状态 && 提交任务为null

    */

                  if (ThreadPoolExecutor.isRunning(c)||

                        ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) {

                      // 检测工作者线程是否异常启动了

                      if (t.isAlive()) {

                        throw new IllegalThreadStateException();

                      }

                      // 将工作者线程添加到集合中

                      workers.add(w);

                      // 尝试记录最大并发工作者线程数

                      final int s = workers.size();

                      if (s > largestPoolSize) {

    largestPoolSize = s;

                      }

                      // 工作者线程添加到 workers 成功

    workerAdded = true;

                  }

                } finally {

                  mainLock.unlock();

                }

                // 如果添加成功,则启动工作者线程

                if (workerAdded) {

                  t.start();

                  // 工作者线程启动成功

    workerStarted = true;

                }

    }

          } finally {

            // 如果工作者线程启动失败,则进行回退和清理

            if (!workerStarted) {

                addWorkerFailed(w);

            }

    }

    return workerStarted;

      }

        // 运行状态 c 大于等于指定状态s

        private static boolean runStateAtLeast(int c, int s) {

    return c >= s;

        }

    /**

        * 尝试原子的将工作者线程数+1

    */

      private boolean compareAndIncrementWorkerCount(int expect) {

          return ctl.compareAndSet(expect, expect + 1);

      }

    /**

        * 线程池是否在运行

    */

      private static boolean isRunning(int c) {

    return c < ThreadPoolExecutor.SHUTDOWN;

      }

      // 运行状态 c 小于指定状态s

      private static boolean runStateLessThan(int c, int s) {

    return c < s;

      }

      private void addWorkerFailed(Worker w) {

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 1)从 workers 集合中移除工作者w

            if (w != null) {

                workers.remove(w);

            }

            // 递减总工作者线程数

            decrementWorkerCount();

            // 尝试停止线程池

            tryTerminate();

          } finally {

            mainLock.unlock();

          }

    }

    /**

        * 将工作者线程总数递减1

    */

      private void decrementWorkerCount() {

          ctl.addAndGet(-1);

      }

    /**

        * 将目标任务从队列中移除,并返回移除结果

    */

      public boolean remove(Runnable task) {

          final boolean removed = workQueue.remove(task);

          // 尝试终止线程池

          tryTerminate(); // In case SHUTDOWN and now empty

          // 返回移除结果

    return removed;

      }

    /**

        * 使用指定的拒绝执行处理器来处理该任务

    */

      final void reject(Runnable command) {

          handler.rejectedExecution(command, this);

      }

    }

    ```

    ## 工作者线程是如何工作的呢?

    ```

    1)如果初始化任务不为空,则先执行它

    2)从任务队列中循环拉取任务

        1)允许当前工作者线程超时退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务

        2)当前工作者线程数 <= corePoolSize 并且不允许超时退出:通过 workQueue.take() 阻塞读取任务

    3)如果拉取到了任务,则执行它,并循环步骤2

    4)任务拉取失败、阻塞时被中断、任务执行一次,则执行工作者退出流程。

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    /**

        * 工作者线程的核心循环,重复的从任务队列中读取任务并执行。

    */

      final void runWorker(Worker w) {

          // 读取当前线程

          final Thread wt = Thread.currentThread();

          // 读取第一个任务

    Runnable task = w.firstTask;

          // 清理

    w.firstTask = null;

          w.unlock(); // 允许中断

    /**

          * 是否异常退出

          * 1)前置钩子函数抛出异常

          * 2)任务执行时抛出异常

          * 3)后置钩子函数抛出异常

    */

    boolean completedAbruptly = true;

          try {

            // 1)尝试从工作队列中读取任务

            while (task != null || (task = getTask()) != null) {

                w.lock();

    /**

    * If pool is stopping, ensure thread is interrupted;

    * if not, ensure thread is not interrupted.

    * This requires a recheck in second case to deal with shutdownNow race while clearing interrupt

    */

                if ((ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP)||

                      Thread.interrupted()&&

                            ThreadPoolExecutor.runStateAtLeast(ctl.get(), ThreadPoolExecutor.STOP))&&

                      !wt.isInterrupted()) {

                  wt.interrupt();

                }

                try {

    /**

                    * 线程池钩子函数,在每个任务执行之前触发

    */

                  beforeExecute(wt, task);

                  try {

                      task.run();

    /**

                      * 线程池钩子函数,在每个任务执行之后或执行异常时触发

    */

                      afterExecute(task, null);

                  } catch (final Throwable ex) {

                      afterExecute(task, ex);

    throw ex;

                  }

                } finally {

                  // 将当前任务置空

    task = null;

                  // 递增累积完成任务数,包括正常完成和异常完成

    w.completedTasks++;

                  w.unlock();

                }

    }

            // 标记是正常完成任务

    completedAbruptly = false;

          } finally {

    /**

              * 1)completedAbruptly=false:工作线程拉取不到任务正常退出

              * 2)completedAbruptly=true:工作线程执行任务时异常退出,包括前置钩子、核心 run 方法、后置钩子

    */

            processWorkerExit(w, completedAbruptly);

          }

    }

      private Runnable getTask() {

          // 上次拉取任务超时了吗?

    boolean timedOut = false;

          for (; ; ) {

            // 读取控制变量

            final int c = ctl.get();

    /**

              * 1)线程池正在停止,状态>= STOP

              * 2)线程池状态为 SHUTDOWN,并且任务队列为空

    */

            if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)

                  && (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP) || workQueue.isEmpty())) {

                decrementWorkerCount();

    return null;

            }

    /**

              * 1)线程池处于 RUNNING 状态

              * 2)线程池处于 SHUTDOWN 状态,但是 workQueue 还未排空

    */

            // 计算当前工作者线程数

            final int wc = ThreadPoolExecutor.workerCountOf(c);

    /**

              * 是否允许当前工作者线程退出

              * 1)allowCoreThreadTimeOut=true:允许核心工作者线程退出

              * 2)allowCoreThreadTimeOut=false:当前工作者线程数 > 核心工作者线程数

    */

    final boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    /**

              * 1)外部系统通过 setMaximumPoolSize 调小了最大线程数 && 当前工作线程数溢出了

              * 2)允许当前线程过期 && 上次拉取未得到任务

    *    &&

              * 1)工作者线程数> 1

              * 2)工作者线程数=1 && 任务队列为空

    *

              * 什么情况下线程池的所有工作者线程都会退出?

              * 1)allowCoreThreadTimeOut=true && workQueue 为空

    */

            if ((wc > maximumPoolSize || timed && timedOut)

                  && (wc > 1 || workQueue.isEmpty())) {

                // 拉取任务失败就直接递减工作者线程数了

                if (compareAndDecrementWorkerCount(c)) {

                  // 返回 null 以终止该工作者线程

    return null;

                }

                // 出现竞争,重新拉取任务

    continue;

            }

            try {

    /**

                * 1)允许当前工作者线程退出:则通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方式尝试在 keepAliveTime 纳秒内获取任务

                * 2)当前工作者线程数 <= corePoolSize:通过 workQueue.take() 阻塞读取任务

    */

    final Runnable r = timed ?

                      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):

                      workQueue.take();

                if (r != null) {

                  // 成功获取到一个任务

    return r;

                }

                // 拉取超时了

    timedOut = true;

            } catch (final InterruptedException retry) {

                // 当前线程被中断,则继续循环拉取任务

    timedOut = false;

            }

    }

    }

      protected void beforeExecute(Thread t, Runnable r) {

    }

      protected void afterExecute(Runnable r, Throwable t) {

    }

      private void processWorkerExit(Worker w, boolean completedAbruptly) {

          // 如果是异常退出,则递减工作者线程数

          if (completedAbruptly) {

            decrementWorkerCount();

          }

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 1)将当前工作者 w 完成的任务数累加到线程池已完成任务数中

    completedTaskCount += w.completedTasks;

            // 2)从工作者集合中删除该工作者

            workers.remove(w);

          } finally {

            mainLock.unlock();

          }

          // 尝试终止线程池

          tryTerminate();

          final int c = ctl.get();

          // 线程池处于 RUNNING 或 SHUTDOWN 状态

          if (ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP)) {

            // 1)如果不是异常退出

            if (!completedAbruptly) {

    /**

                * 计算需要保留的最小工作者线程数,

                * 1)如果允许核心工作者线程退出则为 0;

                * 2)否则为corePoolSize

    */

    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                // 任务队列不为空,则至少保留一个工作者线程

                if (min == 0 && !workQueue.isEmpty()) {

    min = 1;

                }

                // 已有工作者线程 > 期望工作者线程数,则直接返回

                if (ThreadPoolExecutor.workerCountOf(c) >= min) {

    return; // replacement not needed

                }

    }

            // 2)异常退出则尝试新增工作者线程

            addWorker(null, false);

          }

    }

    private final class Worker

    extends AbstractQueuedSynchronizer

            implements Runnable {

    private static final long serialVersionUID = 6138294804551838833L;

          /** Worker 驻留线程,创建失败时为null */

    final Thread thread;

          /** 第一个运行的任务,可能为null */

    Runnable firstTask;

          /** 每个驻留线程完成的任务数,在线程退出时会累加到线程池中*/

    volatile long completedTasks;

    /**

          * 基于指定的初始任务和线程工厂创建工作者线程

    */

          Worker(Runnable firstTask) {

            // 禁止中断,直到工作者线程运行为止

            setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    /**

              * Worker 本身实现了 Runnable 并且重写了 run 方法,

              * 基于 Worker 创建驻留线程,并启动运行。

    */

            thread = getThreadFactory().newThread(this);

          }

          /** 运行工作者线程*/

    @Override

          public void run() {

            runWorker(this);

          }

    }

    }

    ```

    ## 如何停止线程池呢?

    ```

    触发线程池关闭之后,提交到线程池的任务会被直接拒绝

    1)通过 shutdown 停止线程池时,线程池的状态会递进到 SHUTDOWN,并且活跃工作者线程还能处理剩余任务。

    2)通过 shutdownNow 停止线程池时,线程池的状态会递进到 STOP,并且活跃工作者线程不能处理剩余任务,拉取到的任务是 null。

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    @Override

      public void shutdown() {

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 当前线程是否允许关闭线程池

            checkShutdownAccess();

            // 将线程池状态更新为SHUTDOWN

            advanceRunState(ThreadPoolExecutor.SHUTDOWN);

            // 中断所有空闲工作者,正在处理任务的工作者线程可以继续运行

            interruptIdleWorkers();

            // 执行钩子函数

            onShutdown(); // hook for ScheduledThreadPoolExecutor

          } finally {

            mainLock.unlock();

          }

          // 尝试终止线程池

          tryTerminate();

      }

      private void checkShutdownAccess() {

    // assert mainLock.isHeldByCurrentThread();

          final SecurityManager security = System.getSecurityManager();

          if (security != null) {

            security.checkPermission(ThreadPoolExecutor.shutdownPerm);

            for (final Worker w : workers) {

                security.checkAccess(w.thread);

            }

    }

    }

    /**

        * 将线程池状态设置为目标状态targetState

    */

      private void advanceRunState(int targetState) {

          for (; ; ) {

            final int c = ctl.get();

            if (ThreadPoolExecutor.runStateAtLeast(c, targetState)||

                  // CAS 更新线程池状态

                  ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(targetState, ThreadPoolExecutor.workerCountOf(c)))) {

    break;

            }

    }

    }

    /**

        * 中断所有阻塞拉取任务的空闲线程

    */

      private void interruptIdleWorkers() {

          interruptIdleWorkers(false);

      }

      private void interruptIdleWorkers(boolean atMostOne) {

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 遍历所有的工作者

            for (final Worker w : workers) {

                // 读取工作者驻留线程

    final Thread t = w.thread;

    /**

                * 当前线程还未被设置中断标志,则尝试锁定此 Worker【

                * 如果此 worker 已经获取到了任务正在执行,则锁已经被占用无法获取】

    */

                if (!t.isInterrupted() && w.tryLock()) {

                  try {

                      // 中断阻塞等待任务的空闲线程

                      t.interrupt();

                  } catch (final SecurityException ignore) {

                  } finally {

                      w.unlock();

                  }

    }

                // 随机获取的第一个线程正在处理任务 && atMostOne=true,此时一个线程都不会被中断退出

                if (atMostOne) {

    break;

                }

    }

          } finally {

            mainLock.unlock();

          }

    }

      void onShutdown() {

    }

      final void tryTerminate() {

          for (; ; ) {

            final int c = ctl.get();

    /**

              * 1)线程池处于 RUNNING 状态

              * 2)线程池处于 TIDYING、TERMINATED 状态

              * 3)线程池处于 SHUTDOWN 状态 && 任务队列不为空

              * 不允许终止

    */

            if (ThreadPoolExecutor.isRunning(c)||

                  ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING)||

                  ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && !workQueue.isEmpty()) {

    return;

            }

    /**

              * 工作者线程数不为 0,尝试中断最多一个空闲工作者线程

    */

            if (ThreadPoolExecutor.workerCountOf(c) != 0) {// Eligible to terminate

                interruptIdleWorkers(ThreadPoolExecutor.ONLY_ONE);

    return;

            }

            // 所有的工作者线程都已退出,或最后一个工作者线程正在退出

    final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                // 递进状态为TIDYING

                if (ctl.compareAndSet(c, ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TIDYING, 0))) {

                  try {

                      // 执行线程池的终止钩子函数

                      terminated();

                  } finally {

                      // 递进状态为TERMINATED

                      ctl.set(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.TERMINATED, 0));

                      // 唤醒通过 awaitTermination 阻塞的所有线程

                      termination.signalAll();

                  }

    return;

                }

            } finally {

                mainLock.unlock();

            }

    // else retry on failed CAS

          }

    }

      protected void terminated() {

    }

    @Override

      public List<Runnable> shutdownNow() {

          List<Runnable>tasks;

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 当前线程是否允许关闭线程池

            checkShutdownAccess();

            // 将线程池状态更新为STOP

            advanceRunState(ThreadPoolExecutor.STOP);

            // 强制中断所有工作者线程,包括正在执行任务的线程

            interruptWorkers();

            // 读取所有未完成的任务

            tasks = drainQueue();

          } finally {

            mainLock.unlock();

          }

          // 尝试终止线程池

          tryTerminate();

          // 返回所有未完成的任务

    return tasks;

      }

      private List<Runnable> drainQueue() {

          final BlockingQueue<Runnable>q = workQueue;

          final ArrayList<Runnable> taskList = new ArrayList<>();

          q.drainTo(taskList);

          if (!q.isEmpty()) {

            for (final Runnable r : q.toArray(new Runnable[0])) {

                if (q.remove(r)) {

                  taskList.add(r);

                }

    }

    }

    return taskList;

      }

    }

    ```

    ## 等待线程池完全退出

    ```

    目标线程会最多阻塞 unit.toNanos(timeout) 时间来等待线程池完全销毁。

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    @Override

      public boolean awaitTermination(long timeout, TimeUnit unit)

            throws InterruptedException {

          // 计算超时时间

          long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock = this.mainLock;

          mainLock.lock();

          try {

            // 如果线程池还未递进到 TERMINATED 状态【线程池还未退出】

            while (ThreadPoolExecutor.runStateLessThan(ctl.get(), ThreadPoolExecutor.TERMINATED)) {

                if (nanos <= 0L) {

    return false;

                }

                // 阻塞等待指定的纳秒数

                nanos = termination.awaitNanos(nanos);

            }

    return true;

          } finally {

            mainLock.unlock();

          }

    }

    }

    ```

    ## 线程池的逻辑最大线程数是多少呢?

    ```

    线程池的线程数保存在 ctl 控制变量的低 29 位中,因此线程池的逻辑最大线程数为 2^29-1。

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    /**

        * 控制变量低 29 位为线程池的工作线程数

        * 控制变量高 3 位为线程池的生命周期状态

    */

        private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0));

    /**

        * 工作线程数所占的位数,为 29 位,最大工作线程数为 2^29-1 个

    */

    private static final int COUNT_BITS = Integer.SIZE - 3;

    /**

        * 工作线程数掩码,低位 29 个1

    */

        private static final int COUNT_MASK = (1 << COUNT_BITS)- 1;

    /**

        * 读取线程池的工作线程数

    */

        private static int workerCountOf(int c)  { return c & ThreadPoolExecutor.COUNT_MASK; }

    }

    ```

    ## 线程池内置了哪些拒绝策略呢?

    ```

    1)CallerRunsPolicy:线程池未关闭,则交给提交任务的线程自己执行

    2)AbortPolicy:抛出 RejectedExecutionException 异常,默认拒绝策略。

    3)DiscardPolicy:静默丢弃

    4)DiscardOldestPolicy:静默丢弃最老的任务后,重新提交到线程池

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

      public static class CallerRunsPolicy implements RejectedExecutionHandler {

            public CallerRunsPolicy() { }

    @Override

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                if (!e.isShutdown()) {

                    // 线程池未关闭,则交给提交任务的线程自己执行

                    r.run();

                }

    }

    }

        public static class AbortPolicy implements RejectedExecutionHandler {

            public AbortPolicy() { }

    @Override

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                // 抛出异常

                throw new RejectedExecutionException("Task " + r.toString()+

    " rejected from " +

                        e.toString());

            }

    }

        public static class DiscardPolicy implements RejectedExecutionHandler {

            public DiscardPolicy() { }

    @Override

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                // 静默丢弃

            }

    }

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {

            public DiscardOldestPolicy() { }

    @Override

            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

                if (!e.isShutdown()) {

                    // 丢弃最老的任务,再尝试提交到线程池中

                    e.getQueue().poll();

                    e.execute(r);

                }

    }

    }

    }

    ```

    ## 内置线程池有哪些?

    ```

    1)newFixedThreadPool:固定工作者线程池

    2)newCachedThreadPool:一个任务一个工作者线程池,无法存储任务

    3)newSingleThreadExecutor:单工作者线程池

    ```

    ```

    public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

    0L, TimeUnit.MILLISECONDS,

    new LinkedBlockingQueue());

    }

    public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

    60L, TimeUnit.SECONDS,

    new SynchronousQueue());

    }

    public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

    (new ThreadPoolExecutor(1, 1,

    0L, TimeUnit.MILLISECONDS,

    new LinkedBlockingQueue()));

    }

    }

    ```

    ## 线程池有哪几种运行状态呢?

    ```

    线程池有 5 种运行状态,状态值是顺序递增的,分别为 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。

    RUNNING: 接受新任务,同时处理工作队列中的任务

    SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务

    STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程

    TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数

    TERMINATED: terminated() 钩子函数运行完毕,线程池退出

    RUNNING -> SHUTDOWN【调用 shutdown 时触发状态流转】

    RUNNING -> STOP【调用 shutdownNow 时触发状态流转】

    SHUTDOWN -> TIDYING【队列和线程池都为空时】

    STOP -> TIDYING【队列和线程池都为空时】

    TIDYING -> TERMINATED【terminated 方法完成执行时】

    ```

    ```java

    public class ThreadPoolExecutor extends AbstractExecutorService {

    /**

        * 线程池运行状态通过 int 高 3 位进行区分

    * 11100000000000000000000000000000

    */

    private static final int RUNNING    = -1 << COUNT_BITS;

    // 00000000000000000000000000000000

    private static final int SHUTDOWN  =  0 << COUNT_BITS;

    // 00100000000000000000000000000000

    private static final int STOP      =  1 << COUNT_BITS;

    // 01000000000000000000000000000000

    private static final int TIDYING    =  2 << COUNT_BITS;

    // 01100000000000000000000000000000

    private static final int TERMINATED =  3 << COUNT_BITS;

    }

    ```

    ## 什么情况下线程池的所有工作者线程都会退出?

    ```

    allowCoreThreadTimeOut=true && workQueue.isEmpty()

    ```

    ## 工作者线程拉取任务时被中断了会发生什么?

    ```

    会继续拉取任务?

    ```

    ## 线程池停止过程中会中断空闲工作者线程,如何保证不会伤及运行中的工作者线程?

    ```

    工作者线程本身继承了 AbstractQueuedSynchronizer,是一个互斥锁,当其获取到任务时会对自己进行锁定,

    线程池中断空闲线程过程中由于无法获取锁,此工作者线程不会被中断。

    ```

    ## shutdown 和 shutdownNow 的区别是什么?

    ```

    1)shutdown 之后:线程池的工作线程能完成正在处理的任务,也能拉取到存留的任务,任务队列中的任务会被执行完毕。

    2)shutdownNow 之后:线程池的工作线程只能完成正在处理的任务,但是无法拉取到存留的任务,存留任务会通过方法返回。

    ```

    ## 使用线程池的最佳实践

    ```

    1)自定义 ThreadFactory 并在其创建线程时提供一个有效的名称,用于后续的跟踪分析。

    2)限定任务队列的长度,避免突发流量导致系统 OOM。

    3)线程池创建完毕可以通过预先启动工作者线程来缩短响应时间

        prestartCoreThread:预先启动一个工作者线程

        prestartAllCoreThreads:预先启动所有的核心工作者线程

    ```

    相关文章

      网友评论

          本文标题:线程池源码剖析

          本文链接:https://www.haomeiwen.com/subject/hbpimktx.html