美文网首页Java学习笔记
java8线程池源码解析

java8线程池源码解析

作者: 白袜子先生 | 来源:发表于2018-04-21 15:23 被阅读227次

    1.对线程池的理解

    1.1 艰辛摸索

    看过许多关于线程池的介绍和讲解,看过方腾飞的 《并发编程的艺术》 也看过很多博客关于线程池的讲解,但是总觉得自己理解的不太好,总觉得哪里缺点。后来自己也花时间查看源码,自己去琢磨,多问为什么,结合一些博客的讲解。现在我想把这段时间的成果记录下来,也是为了加深自己的理解吧。
    下面所有的源码都是java8 源码。

    1.2 线程池的好处

    • 降低性能消耗、提高响应速度:对于应用需要频繁创建线程,而且线程任务都比较简单,比如一些IO任务,线程的生命周期都很短;而线程的创建需要花费一定的CPU时间,所以当任务到来时如果线程已经准备就绪了,而不是重新创建,则会大大提高系统的响应速度。
    • 对线程的集中管理监控:将创建的线程规约在线程池里,则可以对线程的数量和运行状态进行管理并进行监控,可对系统的线程资源进行集中管理。

    2.线程池内的一些属性

    2.1 线程池参数

    看下面的这个线程池的构造器,他有许多参数,这些参数都代表什么意思?

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize 核心的线程池数量
    • maximumPoolSize 线程池内最大的线程数量
    • keepAliveTime 线程存活时间
    • unit 时间单位 比如 秒 分钟
    • workQueue 存储任务的阻塞队列
    ArrayBlockingQueue 有界阻塞队列,此队列会满。
    SynchronousQueue 没有容量的阻塞队列,一次添加必须等待一次获取。反之亦然。
    LinkedBlockingQueue 无界阻塞队列 这个阻塞队列用于不会满
    
    • threadFactory 自定义线程工厂
    
    1.通过自定义线程池 可以给线程池内,线程都赋予更有意义的线程名。
    也可以根据喜好做些别的事情,比如记录一下何时创建线程之类的。
    2.如果没有则会采用默认的线程池。Executors.defaultThreadFactory()
    
    • handler 拒绝策略
    拒绝策略java 默认提供了四种实现。都是ThreadPoolExecutor的内部类。
    1 CallerRunsPolicy :如果当前线程池 处于运行状态 ,直接使用当前线程执行任务,如果是终止状态,则悄悄的直接抛弃。
    
    2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。默认采用。
    rejectedExecution 内直接抛异常。
    3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。查看源码发现他的rejectedExecution 函数就是一个空实现。
    
    4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个任务,然后把拒绝任务加到队列。查看源码
    if (!e.isShutdown()) {
       e.getQueue().poll();//获取并移除队列head 
       e.execute(r);//再次execute任务
    }
    

    3.Executors 提供的几种线程池

    3.1 newFixedThreadPool

    • 固定线程数的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
       new LinkedBlockingQueue<Runnable>());
    }
    1.核心 线程数 和 最大线程数一致。
    2.采用的阻塞队列是 无界阻塞队列,LinkedBlockingQueue。也就是在极端情况下,阻塞队列 会一直增长,直到堆内存溢出,需要谨慎使用该线程池。
    

    3.2 newCachedThreadPool

    • 缓冲线程池
     public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
    }
    0.在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    1.初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待
    ,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程? 
    2.这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。
    3.这个最大线程数是 Integer.MAX 使用中需要注意。
    

    3.3 newWorkStealingPool

    jdk 1.8 新加入的。创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不穿如并行级别参数,将默认为当前系统的CPU个数

    3.4 newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。

    4. 源码

    下面介绍下线程池的源码,由于对写作没有天赋,我就尽我最大努力把源码分析的浅显易懂一些。我介绍下线程池的一些状态属性,然后再从提交一个任务开始跟着源码进行描述一下我对源码的理解。

    4.1 线程池状态 和 线程统计

    image.png
    • ctl
      先看看线程池中最重要的一个属性 ctl,我觉得是Control的简写,ctl控制着整个线程池的运转。ctl是AtomicInteger类型,线程池利用ctl 的高3位作为记录当前线程池的状态。利用低29位记录线程池中线程数,所以线程池中线程的最大容量为 2^29。
      默认值是 1110 0000 0000 0000 0000 0000 0000 0000 = -536 870 912 ;

    • COUNT_BITS
      COUNT_BITS = Integer.SIZE - 3;
      COUNT_BITS 的意思是 一个整型数 有29位用于统计线程池内线程数;

    • RUNNING 运行状态
      RUNNING 是线程池的初始状态,是一个int 类型的常量,值为 -1 左移 29 位即为 -536 870 912,线程池 的状态
      只有RUNNING 是负数的。

    • SHUTDOWN
      RUNNING --shutDowm()--> SHUTDOWN
      RUNNING状态调用shutDowm()函数进入SHUTDOWN状态。是一个int类型的常量 值为0 ;
      此时线程池不接收新任务,但能处理已添加的任务。

    • STOP
      (RUNNING or SHUTDOWN) ---shutdownNow()--> STOP
      RUNNING状态或者SHUTSOWN状态调用shutDowmNow()函数进入SROP状态。是一个int类型的常量 值为536 870 912 ;
      不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

    • TIDYING
      是一个int类型的常量 值为1 073 741 824 ;
      SHUTDOWN -> TIDYING:当线程池内线程数为0并且队列内任务数量为0时
      STOP -> TIDYING:当线程池内线程数量为0时

    • TERMINATED
      是一个int类型的常量 值为1 610 612 736 ;
      线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

    • CAPACITY (线程的最大容量)
      1 转移 29位 ,就是 0010 0000 0000 0000 0000 0000 0000 0000,
      然后 减1 ,就变成了0001 1111 1111 1111 1111 1111 1111 1111,刚好就是 低29位 为1 ,就是线程内线程的最大容量 。

    4.2 execute 执行任务

    线程池添加任务流程图.png

    在未来的某个时刻执行给定的任务,这个任务会被一个新线程或者一个已经存在的线程执行。
    如果任务不能提交执行,那是因为线程池不处于运行状态或者已经到达容量上限。
    那么这个任务就会被RejectedExecutionHandler处理。
    execute()代码逻辑:

    1.当通过excute(Runable) 提交一个任务时,如果当前线程池内线程数量小于corePoolSize 数量,即使线程池内线程处于闲置状态,线程池也会创建一个新的线程。
    2.如果线程内线程数大于corePoolSize 数量小于 maximumPoolSize 数量,则会将任务 提交到 workQueue(阻塞队列)。
    3.如果workQueued队列已经满了,而当前线程池内线程数量小于maximumPoolSize 则创建一个新的线程执行。
    4.如果此时线程池内线程数量 不小于maximumPoolSize 数量,则执行拒绝策略。
    
        /**
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
           
            
            int c = ctl.get();
            //1.如果正在运行的线程数 小于 corePoolSize,创建一个线程执行command.
            
            if (workerCountOf(c) < corePoolSize) {
            //创建一个线程执行command
                if (addWorker(command, true))
                    return;
                    //如果失败 获取ctl recheck runstate
                c = ctl.get();
            }
            //2.如果是运行状态 并且追加command 到队列成功
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //重新检查运行状态 不是运行中状态 则 从队列移除 command,让RejectedExecutionHandler 处理command。
                if (! isRunning(recheck) && remove(command))
                
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                //如果当前运行中的线程数是0 ,则创建一个新线程。
                    addWorker(null, false);
            }
            //3.如果添加任务到队列失败,则新建一个线程执行任务。
            else if (!addWorker(command, false))
            //如果新建线程失败,让RejectedExecutionHandler 处理command。
                reject(command);
        }
    

    4.3 addWorker

    检查是否可以根据当前线程池状态和绑定条件(核心线程或者最大线程)添加一个新的工作线程。
    如果可以的话那么调整运行的线程数量。
    并且 如果线程成功创建和运行的话,那么firstTask将是新线程的第一个任务被执行。
    如果线程池已经关闭或者正在关闭,函数将返回false.
    如果线程创建失败,可能的原因是因为线程工厂返回null,或者异常(最可能的异常就是在执行Thread,start()时产生OutOfMemoryErro )。
    接着回滚以上操作。
    参数:core:如果为true 增加核心线程,false 增加最大线程。

    private boolean addWorker(Runnable firstTask, boolean core) {
           retry://外层循环 循环检查运行状态
           for (;;) {//
               int c = ctl.get();//获取 ctl 值
               int rs = runStateOf(c);//获取当前线程池状态
    
               // Check if queue empty only if necessary.
               //如果有必要的话 检查任务阻塞队列是否为空
               
               //这里第一个条件检查 运行状态 是否为RUNNING 状态。因为只有RUNNING状态不满足 rs>= SHUTDOWN 条件。
               //第二个条件 咋一看 不好懂,有点晕。
               //仔细分析下,当运行状态为SHUTDOWN 状态时,线程池会执行完阻塞队列内的任务。但是不再接收新任务。
               //也就是说 如果 是shutdown 状态,firstTask is null ,阻塞队列里面还有任务时,是允许创建新的新工作线程的。
               if (rs >= SHUTDOWN &&
                   ! (rs == SHUTDOWN &&
                      firstTask == null &&
                      ! workQueue.isEmpty()))
                   return false;
    
            //内循环 调整工作线程数
               for (;;) {
               //获取当前工作线程数
                   int wc = workerCountOf(c);
                   //如果 大于 最大容量 capacity, 或者 根据绑定条件 大于 核心线程 或者 最大线程数。return false。
                   
                   
                   if (wc >= CAPACITY ||
                       wc >= (core ? corePoolSize : maximumPoolSize))
                       return false;
                   //cas 调整 工作线程数
                   if (compareAndIncrementWorkerCount(c))
                       break retry;//如果成功 跳出外层循环
                   c = ctl.get();  // Re-read ctl
                   //cas 调整 工作线程数失败 
                   //检查 线程状态 是否改变
                   //如果没有改变 继续执行 内循环
                   //如果状态 改变 执行外循环。
                   if (runStateOf(c) != rs)
                       continue retry;
                   // else CAS failed due to workerCount change; retry inner loop
               }
           }
        //开始添加 worker
           boolean workerStarted = false;
           boolean workerAdded = false;
           Worker w = null;
           try {
           //创建一个 工作线程
           //worker 构造器内部 调用 线程工厂 创建一个线程。
           
               w = new Worker(firstTask);
               final Thread t = w.thread;
               if (t != null) {
                   final ReentrantLock mainLock = this.mainLock;
                   //同步锁 阻塞的获取锁
                   mainLock.lock();
                   try {
                       // Recheck while holding lock.
                       // Back out on ThreadFactory failure or if
                       // shut down before lock acquired.
                       //获取池运行状态
                       int rs = runStateOf(ctl.get());
                 //rs 即为当前线程池运行状态,如果状态是RUNNING 或者 
                 //第一个条件 是RUNNING 状态 
                 //第二个条件 是SHUTDOWN状态时 firstTask 必须是 null ,因为 shutdown 状态 不能提交新的任务。但可以创建新工作线程。
                 
                       if (rs < SHUTDOWN ||
                           (rs == SHUTDOWN && firstTask == null)) {
                           //t.isAlive() 如果线程已经start 则返回true ,所以这里是判断 线程t 是不是还可以执行start()。
                           
                           if (t.isAlive()) // precheck that t is startable
                           // 
                               throw new IllegalThreadStateException();
                           //工作线程 添加到 集合workers 中
                           workers.add(w);
                           //修改 poolSize
                           int s = workers.size();
                           if (s > largestPoolSize)
                               largestPoolSize = s;
                           workerAdded = true;
                       }
                   } finally {
                   //释放 同步锁
                       mainLock.unlock();
                   }
                   
                   if (workerAdded) {
                   //如果添加成功 启动线程
                       t.start();
                       workerStarted = true;
                   }
               }
           } finally {
           //如果线程启动失败 回滚操作
               if (! workerStarted)
                   addWorkerFailed(w);w
                   //线程安全的 从工作线程集合中移除 如果w存在的话,就CAS 调整 工作线程数。并且尝试停止线程池
           }
           return workerStarted;
           //返回启动成功 与否
       }
    

    4.3 addWorkerFailed

    执行addWorker添加任务失败之后 调用该方法执行回滚操作。
    回滚操作主要做了以下三件事情:

      1. 从workers 工作线程集合中移除 worker 
      2. worker Count  减 1
      3. 尝试终止线程池,如果满足终止条件的话 就会终止线程池。
    
    /**
         * Rolls back the worker thread creation.
         * - removes worker from workers, if present
         * - decrements worker count
         * - rechecks for termination, in case the existence of this
         *   worker was holding up termination
         */
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//获取主锁
            try {
                if (w != null)
                //从workers 移除指定worker
                    workers.remove(w);
                    //workerCount 减 1
                decrementWorkerCount();
                //尝试终止线程池 终止的条件比较苛刻
                tryTerminate();
            } finally {
                mainLock.unlock();//释放锁
            }
        }
    

    4.4 runWorker 执行任务

    添加worker 成功之后,worker就开始执行runWorker()了。
    这个方法是比较重要的方法,是线程池的核心。是worker 线程 运行循环,重复地从队列中获取任务并执行它们,同时处理以下一些问题:

      1.从worker 的firstTask 开始执行,随着线程的运行,worker不停的执行getTask() 获取任务,如果拿不到任务,就会根据线程的池配置信息 来决定是否让 worker退出。如果worker执行过程中抛出异常,会导致worker退出,并试图创建一个新的线程替代当前worker。
        2.执行任务之前,获取锁确保任务执行不被其他线程干扰。如果不是STOP状态,worker线程需要清除中断标记。
        3.处理每个任务之前都要执行钩子函数 beforeExecute(),如果钩子函数抛出异常那么也会出发worker 过早的退出,并尝试创建一个新的worker 替代它。
        4.假如执行beforeExecute()顺利,接下来就是 执行任务了。如果执行中出现异常,那么就会捕获这些异常,进行包装成 runtimeException 和 error,Throwable也被包装成 Error的因为Ruunabl.run 不能抛出Throwabl 异常。最后把这些异常 和 worker 发给 afterExecute()处理。
        5.任务执行完成后,执行afterExecute(),也可能会产生异常,任何的异常都会导致worker 销毁。
    
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;//获取firstTask
            w.firstTask = null;//清除 firstTask
            w.unlock(); 
            // 允许worker 中断,为什么unlock()就可以允许中断?因为在worker构建实列时。worker的state 属性被设置为-1,在调用池shutDownNow()函数分别对worker进行终止时,会判断worker 的 state >= 0 ,如果不符合 就不会终止。
            boolean completedAbruptly = true;//是否突然完成,就是如果执行任务时,意外退出,该标记就不会被修改为false.
            try {
            1.先执行woker的firstTask 如果firstTask 已经执行,从任务阻塞队列获取任务。
            2.直到获取不到任务跳出while循环。
                while (task != null || (task = getTask()) != null) {
                    w.lock();//获取锁
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    1.如果 小于 STOP 状态 ,就进行 中断标记的清除,
                    2.Thread.interrupted() 会清除当前线程的 中断标记。
                    3.清除中断标记是为了消除 外部代码 对worker 的中断操作。
                    4.如果大于等于 STOP状态 就会中断线程。
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                    //执行任务前 处理
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        //执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                        //任务执行后处理操作
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;//执行任务数++;
                        w.unlock();//施放锁。
                    }
                }
                completedAbruptly = false;//不是意外退出。
            } finally {
            //处理worker 退出。
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    4.5 getTask() 获取任务

    阻塞或者限时的通过getTask() 函数获取任务。不仅仅是获取任务,同时通过是不是返回null控制者worker 线程的退出与否。以下情况会导致函数返回null,导致worker 退出:

        1.当前池执行的线程数超过了maximumPoolSize设置。
        2.线程池stop状态
        3.线程池时shutdown 状态 并且 任务阻塞队列 是空的。
        4.当前worker 获取任务时等待超时。具体的需要根据是否允许核心线程退出,是否大于核心线程,设置判断
    
         private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
              //timedOut  是否已经超时的标记,如果超时获取任务 ,timedOut 设置为 true,就会影响到后面的逻辑判断。
            for (;;) {//循环的判断线程池状态 
                int c = ctl.get();//获取ctl 值
                int rs = runStateOf(c);//获取线程池运行状态
    
                // Check if queue empty only if necessary.
              1.如果是大于等于 shutdown 状态 ,并且 是大于等于stop状态 那么获取任务失败,线程退出
              2.如果是shutdown 状态,并且 任务阻塞队列 workQuenu is empty 那么获取任务失败,线程退出。
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();//调整工作线程数减1 ;
                    return null;//返回 task is null
                }
    
              //获取池正在运行工作线程数
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
               //是否 执行定时的获取任务标记
              // 如果 运行核心线程退出 或者 正在运行线程数大于核心线程 ,执行定时的获取任务。
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
              //判断当前worker 是否满足 退出条件,如果满足就执行cas 调整线程数 减 1 。并返回null
              
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //三元表达式
                    //wordQuenu.poll 是等待keepAliveTime 时间 获取任务,如果超时返回null
                    //workQuenu.take 阻塞的获取任务,直到获取到一个任务为止。
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;//如果 获取到的 r is null,那么说明 执行了 poll 获取任务超时。
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    4.6 processWorkerExit 处理worker退出

    1. 对即将死亡worker 进行清理 和 记账(统计worker执行的任务数)
    2. 如果worker突然死亡(执行中有异常) 就需要调整运行的线程数
    3. 将worker 从worker线程集合中移除。(除名)
    4. 可能会终止线程池
    5. 在小于STOP状态前提下,如果由于执行 用户任务异常导致线程退出,创建新线程替代
    6. 在小于STOP状态前提下,不允许核心线程退出,如果运行中的线程小于核心线程,创建新线程替代。
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // 如果runWorker中出现异常,那么调整woker 线程数量
                decrementWorkerCount();//调整数量,循环cas 保证调整成功
    
            final ReentrantLock mainLock = this.mainLock;//
            mainLock.lock();//获取锁
            try {
                completedTaskCount += w.completedTasks;//给即将死亡的工人记录
                workers.remove(w);//花名册除名
            } finally {
                mainLock.unlock();//释放锁
            }
    
            tryTerminate();//尝试终止池
    
            int c = ctl.get();
         //寻找新的worker 替代死亡的worker
            if (runStateLessThan(c, STOP)) {//如果 池STOP 那么就没必要找新的worker啦
                if (!completedAbruptly) {//如果 正常执行 终止
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//判断需要保留的最小worker数量
                    if (min == 0 && ! workQueue.isEmpty())//如果是 0 并且 任务 队列 不为 0 
                        min = 1;//至少保留一个 把活干完
                    if (workerCountOf(c) >= min) //如果 当前worker 数量 大于 至少 保留数量 
                        return; // 不需要 寻找替代者
                }
                addWorker(null, false);// 找新的worker 替代 firstTask is null
            }
        }
    

    4.7 tryTerminate 尝试终止池

    1.如果池是shutdown 并且 阻塞队列 为空 并且 工作线程数 为空 转换为终止状态

    2.如果 池是 stop 并且 工作线程为空 转换为 终止状态

    3.满足终止条件,但是工作线程不为 0 ,终止一个空闲线程

     final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                    1.如果池处于 允许状态,不终止池。
                    2.如果池状态 >= TIDYING,说明池正在终止,不需要再次终止
                    3.如果 池状态 == shutdown 并且 workQuenu 不为空,需要执行完剩下的任务,不终止。
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                //如果池可以被终止  但是 工作线程 不为 0 ,即使 都池状态 和 队列 都满足终止条件 
                //但是 工作线程数 还存在 那么就要终止一个空闲线程
                //为什么终止一个空闲线程?
                //因为此时任务队列为空, 有worker 在getTask 时 从阻塞队列中 take  任务 ,
                //take 方法会一直阻塞worker 直到有任务,但是现在池不再接受新的任务,所以worker 会被一直阻塞
                //由于take通过LockSupport.park()阻塞线程,而LockSupport.park()能响应中断信号
                //所以通过终止 被阻塞的 worker 使其抛出 InterruptedException
                //抛出异常之后 worker最终会被处理退出 ,就会调用该函数,又会终止下一个被take阻塞的worker
                //这样所有被take 阻塞的线程 会 一个一个的 被终止。
                //这一点线程池的设计者 真是很厉害啊 ,想了好久 才发现这一点。
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//获取锁
                try {
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置pool 为 TIDYING ,线程数为0 
                        try {
                            terminated();//转换为终止状态
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));//设置pool 为shutdown ,线程数为0
                            termination.signalAll();//唤醒所有等待在终止condition上的线程
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();//施放锁
                }
                // else retry on failed CAS cas 修改 ctl 失败 ,重试。
            }
        }
    
    

    4.8 interruptIdleWorkers

    中断可能等待任务的线程。如果队列任务是空的,有的线程会被一直阻塞,调用该方法会中断那些被一直阻塞的线程:

     private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

    相关文章

      网友评论

        本文标题:java8线程池源码解析

        本文链接:https://www.haomeiwen.com/subject/jyvqhftx.html