美文网首页
Java线程池管理类ThreadPollExecutor梳理

Java线程池管理类ThreadPollExecutor梳理

作者: Android程序员老鸦 | 来源:发表于2021-06-07 16:37 被阅读0次
    这几天梳理了一下java线程池的设计思想,又看了一遍源码,最大的感触就是其实官方的源码设计上确实都很牛逼,但是开发者看起来多少有点费劲,主要原因是他们会严格遵守代码规范和设计原则,而导致出现很多封装的类或者接口,这就给我们这种平时写代码不规范的开发者(我就是- -)造成很多阅读障碍,从而看的过程中很容易忽略一些东西,看不到重点,每次看完了觉得好像懂了但又感觉没抓到要点,但是还好,多看一些优秀的源码和三方库就能慢慢跟上他们的设计思路,同时我们自己要多想多自我提问哪些是要点哪些还是迷糊的东西,然后重复看反复研究,我觉得这是一个提高代码能力和思维架构很好的途径。
    废话不多说,在一般开发中,开启线程我们一般是继承Thread类(或直接new)并重写run()方法,然后调用start()方法,这样就开启了一个会执行run方法里面代码的异步线程:
    //方法一
    public class MyThread extends Thread {
      public void run() {
        LogUtils.d("hello thread");
      }
    }
    new MyThread().start();
    //方法二
    new Thread(new Runnable() {
          @Override
          public void run() {
            LogUtils.d("hello thread");
          }
        }).start();
    

    从中我们看出来,代码的关键是要用到Thread类,这个类就是java的描述线程的类,创建一个Thread对象代码创建了一个线程,调用了start()方法就是让这个线程跑起来。所以这里就引发了一个思考,正常如果只是开一个线程做个简单的事情这样写是没任何问题,如果涉及到多线程大并发,就会频繁的创建线程开启线程,从而加大程序的性能内存损耗,由此就提出了线程池的概念。

    而java线程池号称可以解决以上的问题,他的好处概括起来有以下三点:
    第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    第三:提高线程的可管理性。

    ok,那我们就带着这些疑问去看看java线程池是怎么运行的,他是怎么做到线程复用降低资源消耗的,首先看看线程池的使用步骤,假设要用到异步线程执行多个任务,用线程池是这么做的:

     ExecutorService pool = Executors.newFixedThreadPool(10);//创建一个核心线程数为10的线程池
        for (int i =0;i<100;i++){
          //执行100个线程任务
          pool.execute(new Runnable() {
            @Override
            public void run() {
               LogUtils.d("test");
            }
          });
        }
    

    如果不用线程池,就直接for循环100次新建100个Thrad去执行了,这样就做了100次线程的创建和执行,我们在上面的代码看不到线程的创建和执行,仅仅看到把100个Runnable放到pool里面,说明肯定是线程池维护了一个或几个固定线程去做这件事。他是怎么设计的呢,看看源码吧。

    Executors类是线程池的一个创建工厂类,newFixedThreadPool()是其中一种线程池的创建方式:

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    调用的还是ThreadPoolExecutor线程池的构造方法:

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
    }
    
      public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
           //核心线程数,核心线程的概念是这个线程池里重点使用的线程,一般情况下(有个全局变量allowCoreThreadTimeOut控制,默认false)就算空闲的时候,核心线程也不会被回收
            this.corePoolSize = corePoolSize;
            //线程池允许的最大线程数
            this.maximumPoolSize = maximumPoolSize;
            //工作队列,暂时没有空闲线程的时候任务会放在这里
            this.workQueue = workQueue;
            //空闲线程超时时间,过了这个时间会回收线程
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            //线程创建工厂,用这个类来创建开发者想要的线程
            this.threadFactory = threadFactory;
            //拒绝任务时的处理策略
            this.handler = handler;
        }
    

    再看看他的成员变量:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        /**
         * 这个ctl就是用来保存 线程池的状态(runState) 和 线程数(workerCount) 的
         * 这里使用AtomicInteger 来保证原子操作
         * 这里的ctl的初始值其实就是-1左移29位,即3个1和29个0, 
         * 111 00000000000000000000000000000
         */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     
        // COUNT_BITS值为29,代表着低29位用于存储线程数,高3位用于存储线程池的状态
        private static final int COUNT_BITS = Integer.SIZE - 3;
        // 线程池最大的容量,值为3个 0和29个1。也就是536870911
        // 000 11111111111111111111111111111
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
     
        // 下面这5个值代表线程池的状态,存储在高3位中
        // 3个1,29个0   111 00000000000000000000000000000
        private static final int RUNNING    = -1 << COUNT_BITS;
        // 全是0  000 00000000000000000000000000000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 001 00000000000000000000000000000
        private static final int STOP       =  1 << COUNT_BITS;
        // 010 00000000000000000000000000000
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 011 00000000000000000000000000000
        private static final int TERMINATED =  3 << COUNT_BITS;
     
        // ~就是按位取反,CAPACITY按位取反得到111 00000000000000000000000000000,
        // 再和c按位与,其实就是得到高3位,代表线程池的状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // CAPACITY就是000 11111111111111111111111111111,
        // 直接按位与,其实就是得到低29位,代表线程池中的线程数
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 按位或
        private static int ctlOf(int rs, int wc) { return rs | wc; }
     
     
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
     
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
     
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
     
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
     
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
     
        private void decrementWorkerCount() {
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }
     
        /**
         * 任务队列
         */
        private final BlockingQueue<Runnable> workQueue;
     
        /**
         * JAVA线程池的全局锁
         */
        private final ReentrantLock mainLock = new ReentrantLock();
     
        /**
         * 这个HashSet用于存放线程池中所有的工作线程,
         * 只有在持有全局锁(mainLock)的前提下,才能对这个集合进行存取操作
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();
     
        /**
         * 这个condition是用于支持awaitTermination的
         */
        private final Condition termination = mainLock.newCondition();
     
        /**
         * largestPoolSize记录了线程池中线程数曾经达到的最大值
         */
        private int largestPoolSize;
     
        /**
         * 已完成任务的数量
         */
        private long completedTaskCount;
     
        /**
         * 线程工厂
         */
        private volatile ThreadFactory threadFactory;
     
        /**
         * 拒绝策略
         */
        private volatile RejectedExecutionHandler handler;
     
        /**
         * 空闲线程存活时间
         */
        private volatile long keepAliveTime;
     
        /**
         * 如果这个参数为true,
         * 那么核心线程数内的空闲线程 空闲时间超过keepAliveTime后,也可以被回收。
         */
        private volatile boolean allowCoreThreadTimeOut;
     
        /**
         * 核心线程数
         */
        private volatile int corePoolSize;
     
        /**
         * 最大线程数
         */
        private volatile int maximumPoolSize;
     
        /**
         * 默认的拒绝策略为AbortPolicy
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
        ......
    }
    

    重点关注一下线程池的5个工作状态:

    1.RUNNING

    能够接收新任务,以及对已添加的任务进行处理,代表是一个可用的正常的状态,线程池的初始化状态。

    2.SHUTDOWN

    不接收新任务,但能处理已添加的任务,即已经放在队列workQueue的任务还是会被执行。调用线程池的shutdown()接口时,线程池由RUNNING >> SHUTDOWN

    3.STOP

    不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) >> STOP。

    4.TIDYING

    当所有的任务已终止,ctl变量记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN >> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP >> TIDYING。

    5. TERMINATED

    线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING >> TERMINATED。

    以上状态需要明确其正确的含义,光看状态名可能会误解,从而达不到自己想要的效果。

    大致了解了这些变量名的含义后开始看重点方法,execute(Runnable command),正是这个方法开启了线程任务的分发和执行之旅:

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * 这个ctl就是用来存储线程池状态和线程数的
             */
            int c = ctl.get();
            // workerCountOf(),获取线程池中的线程数。
            // 如果当前线程数小于核心线程数,那么添加一个Worker来执行任务。
            // 所以这里会开启一个新的线程,并给线程分配这个方法传入的runnable任务(command)
            if (workerCountOf(c) < corePoolSize) {
                // 如果提交任务成功,代表线程池已经接到了任务,这个时候直接return
                if (addWorker(command, true))
                    return;
                // 如果提交任务失败,再次获取ctl的值
                c = ctl.get();
            }
            // 走到这里有两种情况:
            // 1.当前线程数>=corePoolSize,核心线程数满了,还可以尝试添加非核心线程
            // 2.上面addWorker提交任务失败了
     
            // 如果线程池处于RUNNING状态,将runnable任务加入到任务队列workQueue中, workQueue.offer(command)添加成功返回true
            if (isRunning(c) && workQueue.offer(command)) {
                // 再次获取ctl的值
                int recheck = ctl.get();
                // 如果线程池不处于RUNNING状态,那么移除这个已入队的任务,执行对应的拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 如果线程是处于RUNNING状态,并且当前线程池中的线程数为0,开启一个新的线程,这时候创建的是非核心线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 如果队列已满,执行addWorker尝试创建新的线程,
            // 如果成功,说明当前线程数<maximumPoolSize。
            // 如果失败,说明当前线程数已达到maximumPoolSize,需要执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    
     // 这个方法会创建线程并且执行任务
        // 以下几种情况这个方法会返回false:
        // 1.传入的core这个参数为true,代表此时要创建的是核心线程,线程数的上限为corePoolSize, 如果当前线程数已达到corePoolSize,返回false
        // 2.传入的core这个参数为false,代表此时要创建的是非核心线程,线程数的上限为maximumPoolSize,如果当前线程数已达到maximumPoolSize,返回false
        // 3.线程池stopped或者shutdown
        // 4.使用ThreadFactory创建线程失败,或者ThreadFactory返回的线程为null
        // 5.或者线程启动出现异常
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                // 这个rs就是线程池的状态
                int rs = runStateOf(c);
     
                // 这里的if说的是以下3种情况,直接返回false,不会创建新的线程:
                // 1.rs大于SHUTDOWN,说明线程状态是STOP,TIDYING, 或者TERMINATED,
                //   这几种状态下,不接受新的任务,并且会中断正在执行的任务。所以直接返回false
                // 2.线程池状态处于SHUTDOWN,并且firstTask!=null。
                //   因为SHUTDOWN状态下,是不接收新的任务的。所以返回false。
                // 3.线程池处于SHUTDOWN并且firstTask为null,但是workQueue是空的。
                //   因为SHUTDOWN虽然不接收新的任务,但是已经进入workQueue的任务还是要执行的,
                //   恰巧workQueue中没有任务。所以也是返回false,不需要创建线程
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
     
                for (;;) { // 注意:这里是个for循环
                    // 获取线程池中线程的数量
                    int wc = workerCountOf(c);
                    // 这里传入的core为true代表线程数上限为corePoolSize,
                    // false代表线程数上限为maximumPoolSize,如果线程数超出上限,直接返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 使用compareAndIncrementWorkerCount()对线程计数+1,如果成功,说明已经满足创建线程的条件了,跳出循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 如果上面的compareAndIncrementWorkerCount()失败,说明有并发,再次获取ctl的值
                    c = ctl.get();  // Re-read ctl
                    // 如果线程池的状态发生了变化,例如线程池已经关闭了,
                    // 导致的compareAndIncrementWorkerCount()失败,那么回到外层的for循环(retry)
                    // 否则,说明是正常的compareAndIncrementWorkerCount()失败,这个时候进入里面的循环
                    if (runStateOf(c) != rs)
                        continue retry;
                    
                }
            }
     
            // 能到这里,说明已经做好创建线程的准备了
     
            // worker是否已经启动的标志位
            boolean workerStarted = false;
            // 我们前面说了workers这个HashSet用于存储线程池中的所有线程,
            // 所以这个变量是代表当前worker是否已经存放到workers这个HashSet中
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 传入firstTask这个任务构造一个Worker
                w = new Worker(firstTask);
                // Worker的构造方法中会使用ThreadFactory创建新的线程,
                // 所以这里可以直接获取到对应的线程
                final Thread t = w.thread;
                // 如果创建线程成功
                if (t != null) {
                    
                    final ReentrantLock mainLock = this.mainLock;
                    // 获取线程池的全局锁,下面涉及线程池的操作都需要在持有全局锁的前提下进行
                    mainLock.lock();
                    try {
                        // 获取线程池的状态
                        int rs = runStateOf(ctl.get());
                        // 如果rs<SHUTDOWN,说明线程池处于RUNNING状态
                        // 或者 线程池处于SHUTDOWN状态并且没有新的任务
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 如果线程已经启动,抛出异常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 将包装线程的worker加入到workers这个HashSet中
                            workers.add(w);
                            int s = workers.size();
                            // 我们前面说了,largestPoolSize记录的是线程池中线程数曾经到达的最大值
                            // 线程池中worker的数量是会变化的,所以记录下worker数的最大值
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            // 修改标志,代表当前worker已经加入到workers这个HashSet中
                            workerAdded = true;
                        }
                    } finally {
                        // 释放全局锁
                        mainLock.unlock();
                    }
                    // 如果worker添加成功,启动线程执行任务
                    if (workerAdded) {
                        // 启动线程,这里才是真正的启动线程动作!!!!!
                        t.start();
                        // 代表worker已经启动
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果线程没有启动,这里还需要进行一些清理工作
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            // 返回线程是否成功启动
            return workerStarted;
        }
     
        // 这个方法做下面几件事:
        // 1.将worker从workers中移除
        // 2.worker的数量-1
        // 3.检查termination
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            // 要操作workers这个HashSet,先获取java线程池全局锁
            mainLock.lock();
            try {
                if (w != null)
                    // 从worker中移除
                    workers.remove(w);
                // WorkerCount -1
                decrementWorkerCount();
                // 处理TERMINATED状态
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
    
    上面的代码引入了一个重要的类,Worker类,这个类是线程池维护线程实例的,线程就封装在这个类里面,上面代码启动了线程,就会执行thread的:
    
      /**
       * 继承了AbstractQueuedSynchronizer(是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。这里不详谈)同时实现了Runnable,看到这里我是有点懵圈的,因为他还有个Thread类型的成员变量,那才是真正执行任务的线程,那它搞出这么多Runnable来干嘛呢?而且线程池接受的任务也是Runnable,说实话我当时有点被绕晕,越是这时候越要挺住,真相往往就在眼前
       */
        private final class Worker
                extends AbstractQueuedSynchronizer
                implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
     
            /** 这个才是真正执行任务的线程 ,我自己一直被Runnable这个接口类蛊惑了,一看到这个就联想到线程,但其实不是,这只是个带run()方法的接口,这里的Worker和execute(Runnable runnable)方法传进来的任务,都只是实现了Runnable的封装而已*/
            final Thread thread;
            /** 每个线程要执行的第一个任务,这个值可以为null,线程会在BlokingQueue队列里中获取任务来执行 */
            Runnable firstTask;
            /** 记录每个线程已完成的任务数 */
            volatile long completedTasks;
     
            /**
             * 这个构造方法传入这个线程第一个要执行的任务,当然也可以传入null
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                // firstTask赋值
                this.firstTask = firstTask;
                // 通过ThreadFactory工厂创建线程,注意:这里传入的Runnable是this,代表当前的Worker对象,所以上面addWorker()方法里最后执行t.start()启动线程的时候,会执行worker里的run()方法。
                // Worker实现了Runnable所以执行任务的时候最终会调用Worker.run()
                this.thread = getThreadFactory().newThread(this);
            }
     
            /** 实现了Run方法,里面会调用runWorker */
            public void run() {
                runWorker(this);
            }
     
            // 下面这些方法都是Worker对AQS同步控制的实现了,要获取线程的执行权,需要先获取独占锁
     
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
     
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
     
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
     
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
     
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    通过上面分析知道,任务最终会在Worker里的run()方法里执行,而run()里调用的是 runWorker(this),接下来继续看看这个方法具体怎么做的:
     public void run() {
            // 这里调用的runWorker方法
            runWorker(this);
        }
        // 这里就是执行任务的代码了,有一个while循环不断从队列中取出任务并执行,正是这个循环,保证了线程的复用性,只要外面添加进来任务,都会在Worker里维护的线程里执行。
        // 退出循环的条件是获取不到要执行的任务
        final void runWorker(Worker w) {
            // 当前线程
            Thread wt = Thread.currentThread();
            // 前面说了new Worker的时候可以指定firstTask,代表Worker的第一个任务
            Runnable task = w.firstTask;
            // 这一步就已经将firstTask置为null了
            w.firstTask = null;
            // 释放Worker的独占锁,这里它释放锁的操作一定会成功,也就是将AQS中state设置为0
            w.unlock(); // allow interrupts
            // completedAbruptly这个标志位代表当前Worker是否因为执行任务出现异常而停止的
            boolean completedAbruptly = true;
     
            try {
                // while循环;如果firstTask不为null那就直接执行firstTask,
                // 否则就要调用getTask()从队列中获取队列,这是个很重要的方法。
                // 也就是说Worker的第一个任务是不需要从队列中获取的
                while (task != null || (task = getTask()) != null) {
                    // 给这个worker上独占锁
                    // Worker加锁的意义在于,在线程池的其他方法中可能会中断Worker,
                    // 为了保证Worker安全的完成任务,必须要在获取到锁的情况下才能中断Worker,
                    // 如tryTerminate(),shutdown()等都会关闭worker。
                    w.lock();
                    // 如果ctl的值大于等于STOP,说明线程池的状态是STOP,TIDYING或TERMINATED。
                    // 这个时候需要确保该线程已中断,否则就应该确保线程没有中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        // 其实这里只是设置线程的中断状态
                        wt.interrupt();
                    try {
                        // 这个方法留给子类去实现的
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 真正执行任务的地方
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 后置处理,留给子类去实现的
                            afterExecute(task, thrown);
                        }
                    } finally {
                        // 最后将task置为null,准备接受下一个任务
                        task = null;
                        // 这个worker已完成任务数+1
                        w.completedTasks++;
                        // 释放独占锁
                        w.unlock();
                    }
                }
                // 到这一步说明没抛出异常
                completedAbruptly = false;
            } finally {
     
                // 执行到这里说明:要么队列中已经没有任务了,要么执行任务出现了异常。
                // 这个时候需要调用processWorkerExit关闭闲置线程
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    看看获取任务的方法getTask():

      private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            //又是个循环,在关于线程调度的很多源码里都能看到这种死循环,要特别注意它的使用
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 不是running状态并且(大于或等于stop的状态或者工作队列为空了)返回null, 跳出循环
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();//计数减一
                    return null;
                }
    
                int wc = workerCountOf(c);
                // timed 的意思可以理解为 具备超时的条件
                //核心线程允许超时或者当前线程数大于核心线程数(说明多出来的是非核心线程)就为true
               //其实就是在这区分出了核心线程和非核心线程,本身worker并没有标记核心线程的字段,线程数
              //大于核心线程的时候,超时机制筛选下留下来的就是核心线程
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                //(当前线程数大于线程池线程最大值||超时了)&&(当前线程数大于1||任务队列为空)则返回null
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))//防止并发,计数减一
                        return null;
                    continue;
                }
                //下面的代码就是超时机制了,workQueue的poll()方法有个入参就是超时时间,如果在时间内没有
                //拿到值,则会返回空,take()方法则没有超时机制,直接拿
                try {
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    //为空则超时了,然后继续循环,如果满足闲置线程回收条件上面的代码就会返回null,否则就一直阻塞循环
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    再来看看他是怎么销毁线程的:

      private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // 异常导致的销毁,要在这里做计数减一
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
              //从线程列表里移除,之后会被垃圾回收机制回收,这就是清理超时空闲线程的操作
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            //尝试终止线程池,因为可能没有任务了
            tryTerminate();
            //再次获取线程池的状态
            int c = ctl.get();
            //如果此时线程池是RUNNING和SHUTDOWM
            if (runStateLessThan(c, STOP)) {
                //并且不是异常进来这个方法的
                if (!completedAbruptly) {
                    //如果核心线程也会超时回收那么min为0,否则为corePoolSize 
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    //如果为0且任务列表不为空,则置为1
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    //如果此时的线程数不小于min  ,则不需要新增线程
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                //否则会新增一个线程来继续执行任务
                addWorker(null, false);
            }
        }
    

    继续看tryTerminate()方法,这个方法也很重要,在切换线程池状态的很多地方都有用到:

        //方法名很有意思,尝试终止
        final void tryTerminate() {
            //for循环!!!
            for (;;) {
                int c = ctl.get();
                //线程池状态是RUNNING || 状线程池态是TIDYING或SHUTDOWN ||状线程池态是SHUTDOWN并且
                //任务列表不为空,这几种情况就不继续走下去了,返回
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                //线程池线程数量不为0
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    //中断一个线程就退出循环,这个方法也要重点看
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
                //拿到全局锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //尝试设置线程池状态为TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {  
                            //成功则调用terminated(),是个空方法,给子类实现
                            terminated();
                        } finally {  
                            //设置线程池状态为TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
    
         // 这个方法其实就是用来中断正在等待任务的线程,方法名直译:中断空闲线程
        // 注意这里说的中断其实也只是将线程的状态置为“中断”,并不是说线程在这里就真的停止了
        // 如果onlyOne为true,这里最多会关闭一个worker,因为shutdown()方法需要中断所有的worker,
        // 这里中断一个worker能够帮助shutdown迅速的完成,而不用等待一些还在等待任务的worker结束
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            // 一样要获取全局锁
            mainLock.lock();
            try {
                // 遍历所有的Worker,如果传入的onlyOne为true,那最多会终止一个Worker。
                // 如果传入的onlyOne为false,终止所有的Worker
                for (Worker w : workers) {
                    Thread t = w.thread;
                    // 这里要获取到worker的独占锁后才能够中断线程
                    //这个锁在线程拿到task执行的时候会获取,所以他无法中断正在执行任务的线程
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                  //Thread的中断方法interrupt()只是做了个中断标记,真正相应中断的地方是在BlockingQueue
                //的poll()和take()方法,他们会抛出中断异常来达到中断的目的。
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

    至此线程池的正常使用方法就走完了,当然还有其他的一些辅助性的知识点这里没有详细讲,有兴趣的小伙伴可以自行再深究一下。这时候回过头来看看线程池的优点:

    第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    第三:提高线程的可管理性。

    是不是就更能理解其背后意义了呢。
    部分参考CSDN博主「Epoch-Elysian」的文章,原文链接:https://blog.csdn.net/Epoch_Elysian/article/details/107282186

    相关文章

      网友评论

          本文标题:Java线程池管理类ThreadPollExecutor梳理

          本文链接:https://www.haomeiwen.com/subject/lgzzsltx.html