美文网首页
Java线程池管理类ThreadPollExecutor梳理

Java线程池管理类ThreadPollExecutor梳理

作者: Android程序员老鸦 | 来源:发表于2021-06-07 16:37 被阅读0次
这几天梳理了一下java线程池的设计思想,又看了一遍源码,最大的感触就是其实官方的源码设计上确实都很牛逼,但是开发者看起来多少有点费劲,主要原因是他们会严格遵守代码规范和设计原则,而导致出现很多封装的类或者接口,这就给我们这种平时写代码不规范的开发者(我就是- -)造成很多阅读障碍,从而看的过程中很容易忽略一些东西,看不到重点,每次看完了觉得好像懂了但又感觉没抓到要点,但是还好,多看一些优秀的源码和三方库就能慢慢跟上他们的设计思路,同时我们自己要多想多自我提问哪些是要点哪些还是迷糊的东西,然后重复看反复研究,我觉得这是一个提高代码能力和思维架构很好的途径。
废话不多说,在一般开发中,开启线程我们一般是继承Thread类(或直接new)并重写run()方法,然后调用start()方法,这样就开启了一个会执行run方法里面代码的异步线程:
//方法一
public class MyThread extends Thread {
  public void run() {
    LogUtils.d("hello thread");
  }
}
new MyThread().start();
//方法二
new Thread(new Runnable() {
      @Override
      public void run() {
        LogUtils.d("hello thread");
      }
    }).start();

从中我们看出来,代码的关键是要用到Thread类,这个类就是java的描述线程的类,创建一个Thread对象代码创建了一个线程,调用了start()方法就是让这个线程跑起来。所以这里就引发了一个思考,正常如果只是开一个线程做个简单的事情这样写是没任何问题,如果涉及到多线程大并发,就会频繁的创建线程开启线程,从而加大程序的性能内存损耗,由此就提出了线程池的概念。

而java线程池号称可以解决以上的问题,他的好处概括起来有以下三点:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。

ok,那我们就带着这些疑问去看看java线程池是怎么运行的,他是怎么做到线程复用降低资源消耗的,首先看看线程池的使用步骤,假设要用到异步线程执行多个任务,用线程池是这么做的:

 ExecutorService pool = Executors.newFixedThreadPool(10);//创建一个核心线程数为10的线程池
    for (int i =0;i<100;i++){
      //执行100个线程任务
      pool.execute(new Runnable() {
        @Override
        public void run() {
           LogUtils.d("test");
        }
      });
    }

如果不用线程池,就直接for循环100次新建100个Thrad去执行了,这样就做了100次线程的创建和执行,我们在上面的代码看不到线程的创建和执行,仅仅看到把100个Runnable放到pool里面,说明肯定是线程池维护了一个或几个固定线程去做这件事。他是怎么设计的呢,看看源码吧。

Executors类是线程池的一个创建工厂类,newFixedThreadPool()是其中一种线程池的创建方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

调用的还是ThreadPoolExecutor线程池的构造方法:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
       //核心线程数,核心线程的概念是这个线程池里重点使用的线程,一般情况下(有个全局变量allowCoreThreadTimeOut控制,默认false)就算空闲的时候,核心线程也不会被回收
        this.corePoolSize = corePoolSize;
        //线程池允许的最大线程数
        this.maximumPoolSize = maximumPoolSize;
        //工作队列,暂时没有空闲线程的时候任务会放在这里
        this.workQueue = workQueue;
        //空闲线程超时时间,过了这个时间会回收线程
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        //线程创建工厂,用这个类来创建开发者想要的线程
        this.threadFactory = threadFactory;
        //拒绝任务时的处理策略
        this.handler = handler;
    }

再看看他的成员变量:

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 这个ctl就是用来保存 线程池的状态(runState) 和 线程数(workerCount) 的
     * 这里使用AtomicInteger 来保证原子操作
     * 这里的ctl的初始值其实就是-1左移29位,即3个1和29个0, 
     * 111 00000000000000000000000000000
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
    // COUNT_BITS值为29,代表着低29位用于存储线程数,高3位用于存储线程池的状态
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程池最大的容量,值为3个 0和29个1。也就是536870911
    // 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
    // 下面这5个值代表线程池的状态,存储在高3位中
    // 3个1,29个0   111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 全是0  000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    // 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
 
    // ~就是按位取反,CAPACITY按位取反得到111 00000000000000000000000000000,
    // 再和c按位与,其实就是得到高3位,代表线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // CAPACITY就是000 11111111111111111111111111111,
    // 直接按位与,其实就是得到低29位,代表线程池中的线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 按位或
    private static int ctlOf(int rs, int wc) { return rs | wc; }
 
 
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
 
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
 
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
 
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
 
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
 
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
 
    /**
     * 任务队列
     */
    private final BlockingQueue<Runnable> workQueue;
 
    /**
     * JAVA线程池的全局锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();
 
    /**
     * 这个HashSet用于存放线程池中所有的工作线程,
     * 只有在持有全局锁(mainLock)的前提下,才能对这个集合进行存取操作
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
 
    /**
     * 这个condition是用于支持awaitTermination的
     */
    private final Condition termination = mainLock.newCondition();
 
    /**
     * largestPoolSize记录了线程池中线程数曾经达到的最大值
     */
    private int largestPoolSize;
 
    /**
     * 已完成任务的数量
     */
    private long completedTaskCount;
 
    /**
     * 线程工厂
     */
    private volatile ThreadFactory threadFactory;
 
    /**
     * 拒绝策略
     */
    private volatile RejectedExecutionHandler handler;
 
    /**
     * 空闲线程存活时间
     */
    private volatile long keepAliveTime;
 
    /**
     * 如果这个参数为true,
     * 那么核心线程数内的空闲线程 空闲时间超过keepAliveTime后,也可以被回收。
     */
    private volatile boolean allowCoreThreadTimeOut;
 
    /**
     * 核心线程数
     */
    private volatile int corePoolSize;
 
    /**
     * 最大线程数
     */
    private volatile int maximumPoolSize;
 
    /**
     * 默认的拒绝策略为AbortPolicy
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    ......
}

重点关注一下线程池的5个工作状态:

1.RUNNING

能够接收新任务,以及对已添加的任务进行处理,代表是一个可用的正常的状态,线程池的初始化状态。

2.SHUTDOWN

不接收新任务,但能处理已添加的任务,即已经放在队列workQueue的任务还是会被执行。调用线程池的shutdown()接口时,线程池由RUNNING >> SHUTDOWN

3.STOP

不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) >> STOP。

4.TIDYING

当所有的任务已终止,ctl变量记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN >> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP >> TIDYING。

5. TERMINATED

线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING >> TERMINATED。

以上状态需要明确其正确的含义,光看状态名可能会误解,从而达不到自己想要的效果。

大致了解了这些变量名的含义后开始看重点方法,execute(Runnable command),正是这个方法开启了线程任务的分发和执行之旅:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 这个ctl就是用来存储线程池状态和线程数的
         */
        int c = ctl.get();
        // workerCountOf(),获取线程池中的线程数。
        // 如果当前线程数小于核心线程数,那么添加一个Worker来执行任务。
        // 所以这里会开启一个新的线程,并给线程分配这个方法传入的runnable任务(command)
        if (workerCountOf(c) < corePoolSize) {
            // 如果提交任务成功,代表线程池已经接到了任务,这个时候直接return
            if (addWorker(command, true))
                return;
            // 如果提交任务失败,再次获取ctl的值
            c = ctl.get();
        }
        // 走到这里有两种情况:
        // 1.当前线程数>=corePoolSize,核心线程数满了,还可以尝试添加非核心线程
        // 2.上面addWorker提交任务失败了
 
        // 如果线程池处于RUNNING状态,将runnable任务加入到任务队列workQueue中, workQueue.offer(command)添加成功返回true
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取ctl的值
            int recheck = ctl.get();
            // 如果线程池不处于RUNNING状态,那么移除这个已入队的任务,执行对应的拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果线程是处于RUNNING状态,并且当前线程池中的线程数为0,开启一个新的线程,这时候创建的是非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果队列已满,执行addWorker尝试创建新的线程,
        // 如果成功,说明当前线程数<maximumPoolSize。
        // 如果失败,说明当前线程数已达到maximumPoolSize,需要执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

 // 这个方法会创建线程并且执行任务
    // 以下几种情况这个方法会返回false:
    // 1.传入的core这个参数为true,代表此时要创建的是核心线程,线程数的上限为corePoolSize, 如果当前线程数已达到corePoolSize,返回false
    // 2.传入的core这个参数为false,代表此时要创建的是非核心线程,线程数的上限为maximumPoolSize,如果当前线程数已达到maximumPoolSize,返回false
    // 3.线程池stopped或者shutdown
    // 4.使用ThreadFactory创建线程失败,或者ThreadFactory返回的线程为null
    // 5.或者线程启动出现异常
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 这个rs就是线程池的状态
            int rs = runStateOf(c);
 
            // 这里的if说的是以下3种情况,直接返回false,不会创建新的线程:
            // 1.rs大于SHUTDOWN,说明线程状态是STOP,TIDYING, 或者TERMINATED,
            //   这几种状态下,不接受新的任务,并且会中断正在执行的任务。所以直接返回false
            // 2.线程池状态处于SHUTDOWN,并且firstTask!=null。
            //   因为SHUTDOWN状态下,是不接收新的任务的。所以返回false。
            // 3.线程池处于SHUTDOWN并且firstTask为null,但是workQueue是空的。
            //   因为SHUTDOWN虽然不接收新的任务,但是已经进入workQueue的任务还是要执行的,
            //   恰巧workQueue中没有任务。所以也是返回false,不需要创建线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
 
            for (;;) { // 注意:这里是个for循环
                // 获取线程池中线程的数量
                int wc = workerCountOf(c);
                // 这里传入的core为true代表线程数上限为corePoolSize,
                // false代表线程数上限为maximumPoolSize,如果线程数超出上限,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 使用compareAndIncrementWorkerCount()对线程计数+1,如果成功,说明已经满足创建线程的条件了,跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果上面的compareAndIncrementWorkerCount()失败,说明有并发,再次获取ctl的值
                c = ctl.get();  // Re-read ctl
                // 如果线程池的状态发生了变化,例如线程池已经关闭了,
                // 导致的compareAndIncrementWorkerCount()失败,那么回到外层的for循环(retry)
                // 否则,说明是正常的compareAndIncrementWorkerCount()失败,这个时候进入里面的循环
                if (runStateOf(c) != rs)
                    continue retry;
                
            }
        }
 
        // 能到这里,说明已经做好创建线程的准备了
 
        // worker是否已经启动的标志位
        boolean workerStarted = false;
        // 我们前面说了workers这个HashSet用于存储线程池中的所有线程,
        // 所以这个变量是代表当前worker是否已经存放到workers这个HashSet中
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 传入firstTask这个任务构造一个Worker
            w = new Worker(firstTask);
            // Worker的构造方法中会使用ThreadFactory创建新的线程,
            // 所以这里可以直接获取到对应的线程
            final Thread t = w.thread;
            // 如果创建线程成功
            if (t != null) {
                
                final ReentrantLock mainLock = this.mainLock;
                // 获取线程池的全局锁,下面涉及线程池的操作都需要在持有全局锁的前提下进行
                mainLock.lock();
                try {
                    // 获取线程池的状态
                    int rs = runStateOf(ctl.get());
                    // 如果rs<SHUTDOWN,说明线程池处于RUNNING状态
                    // 或者 线程池处于SHUTDOWN状态并且没有新的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将包装线程的worker加入到workers这个HashSet中
                        workers.add(w);
                        int s = workers.size();
                        // 我们前面说了,largestPoolSize记录的是线程池中线程数曾经到达的最大值
                        // 线程池中worker的数量是会变化的,所以记录下worker数的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 修改标志,代表当前worker已经加入到workers这个HashSet中
                        workerAdded = true;
                    }
                } finally {
                    // 释放全局锁
                    mainLock.unlock();
                }
                // 如果worker添加成功,启动线程执行任务
                if (workerAdded) {
                    // 启动线程,这里才是真正的启动线程动作!!!!!
                    t.start();
                    // 代表worker已经启动
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程没有启动,这里还需要进行一些清理工作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回线程是否成功启动
        return workerStarted;
    }
 
    // 这个方法做下面几件事:
    // 1.将worker从workers中移除
    // 2.worker的数量-1
    // 3.检查termination
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        // 要操作workers这个HashSet,先获取java线程池全局锁
        mainLock.lock();
        try {
            if (w != null)
                // 从worker中移除
                workers.remove(w);
            // WorkerCount -1
            decrementWorkerCount();
            // 处理TERMINATED状态
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

上面的代码引入了一个重要的类,Worker类,这个类是线程池维护线程实例的,线程就封装在这个类里面,上面代码启动了线程,就会执行thread的:

  /**
   * 继承了AbstractQueuedSynchronizer(是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。这里不详谈)同时实现了Runnable,看到这里我是有点懵圈的,因为他还有个Thread类型的成员变量,那才是真正执行任务的线程,那它搞出这么多Runnable来干嘛呢?而且线程池接受的任务也是Runnable,说实话我当时有点被绕晕,越是这时候越要挺住,真相往往就在眼前
   */
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
 
        /** 这个才是真正执行任务的线程 ,我自己一直被Runnable这个接口类蛊惑了,一看到这个就联想到线程,但其实不是,这只是个带run()方法的接口,这里的Worker和execute(Runnable runnable)方法传进来的任务,都只是实现了Runnable的封装而已*/
        final Thread thread;
        /** 每个线程要执行的第一个任务,这个值可以为null,线程会在BlokingQueue队列里中获取任务来执行 */
        Runnable firstTask;
        /** 记录每个线程已完成的任务数 */
        volatile long completedTasks;
 
        /**
         * 这个构造方法传入这个线程第一个要执行的任务,当然也可以传入null
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            // firstTask赋值
            this.firstTask = firstTask;
            // 通过ThreadFactory工厂创建线程,注意:这里传入的Runnable是this,代表当前的Worker对象,所以上面addWorker()方法里最后执行t.start()启动线程的时候,会执行worker里的run()方法。
            // Worker实现了Runnable所以执行任务的时候最终会调用Worker.run()
            this.thread = getThreadFactory().newThread(this);
        }
 
        /** 实现了Run方法,里面会调用runWorker */
        public void run() {
            runWorker(this);
        }
 
        // 下面这些方法都是Worker对AQS同步控制的实现了,要获取线程的执行权,需要先获取独占锁
 
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
 
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
 
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
通过上面分析知道,任务最终会在Worker里的run()方法里执行,而run()里调用的是 runWorker(this),接下来继续看看这个方法具体怎么做的:
 public void run() {
        // 这里调用的runWorker方法
        runWorker(this);
    }
    // 这里就是执行任务的代码了,有一个while循环不断从队列中取出任务并执行,正是这个循环,保证了线程的复用性,只要外面添加进来任务,都会在Worker里维护的线程里执行。
    // 退出循环的条件是获取不到要执行的任务
    final void runWorker(Worker w) {
        // 当前线程
        Thread wt = Thread.currentThread();
        // 前面说了new Worker的时候可以指定firstTask,代表Worker的第一个任务
        Runnable task = w.firstTask;
        // 这一步就已经将firstTask置为null了
        w.firstTask = null;
        // 释放Worker的独占锁,这里它释放锁的操作一定会成功,也就是将AQS中state设置为0
        w.unlock(); // allow interrupts
        // completedAbruptly这个标志位代表当前Worker是否因为执行任务出现异常而停止的
        boolean completedAbruptly = true;
 
        try {
            // while循环;如果firstTask不为null那就直接执行firstTask,
            // 否则就要调用getTask()从队列中获取队列,这是个很重要的方法。
            // 也就是说Worker的第一个任务是不需要从队列中获取的
            while (task != null || (task = getTask()) != null) {
                // 给这个worker上独占锁
                // Worker加锁的意义在于,在线程池的其他方法中可能会中断Worker,
                // 为了保证Worker安全的完成任务,必须要在获取到锁的情况下才能中断Worker,
                // 如tryTerminate(),shutdown()等都会关闭worker。
                w.lock();
                // 如果ctl的值大于等于STOP,说明线程池的状态是STOP,TIDYING或TERMINATED。
                // 这个时候需要确保该线程已中断,否则就应该确保线程没有中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 其实这里只是设置线程的中断状态
                    wt.interrupt();
                try {
                    // 这个方法留给子类去实现的
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 真正执行任务的地方
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 后置处理,留给子类去实现的
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 最后将task置为null,准备接受下一个任务
                    task = null;
                    // 这个worker已完成任务数+1
                    w.completedTasks++;
                    // 释放独占锁
                    w.unlock();
                }
            }
            // 到这一步说明没抛出异常
            completedAbruptly = false;
        } finally {
 
            // 执行到这里说明:要么队列中已经没有任务了,要么执行任务出现了异常。
            // 这个时候需要调用processWorkerExit关闭闲置线程
            processWorkerExit(w, completedAbruptly);
        }
    }

看看获取任务的方法getTask():

  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //又是个循环,在关于线程调度的很多源码里都能看到这种死循环,要特别注意它的使用
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 不是running状态并且(大于或等于stop的状态或者工作队列为空了)返回null, 跳出循环
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//计数减一
                return null;
            }

            int wc = workerCountOf(c);
            // timed 的意思可以理解为 具备超时的条件
            //核心线程允许超时或者当前线程数大于核心线程数(说明多出来的是非核心线程)就为true
           //其实就是在这区分出了核心线程和非核心线程,本身worker并没有标记核心线程的字段,线程数
          //大于核心线程的时候,超时机制筛选下留下来的就是核心线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //(当前线程数大于线程池线程最大值||超时了)&&(当前线程数大于1||任务队列为空)则返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))//防止并发,计数减一
                    return null;
                continue;
            }
            //下面的代码就是超时机制了,workQueue的poll()方法有个入参就是超时时间,如果在时间内没有
            //拿到值,则会返回空,take()方法则没有超时机制,直接拿
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //为空则超时了,然后继续循环,如果满足闲置线程回收条件上面的代码就会返回null,否则就一直阻塞循环
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

再来看看他是怎么销毁线程的:

  private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 异常导致的销毁,要在这里做计数减一
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
          //从线程列表里移除,之后会被垃圾回收机制回收,这就是清理超时空闲线程的操作
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //尝试终止线程池,因为可能没有任务了
        tryTerminate();
        //再次获取线程池的状态
        int c = ctl.get();
        //如果此时线程池是RUNNING和SHUTDOWM
        if (runStateLessThan(c, STOP)) {
            //并且不是异常进来这个方法的
            if (!completedAbruptly) {
                //如果核心线程也会超时回收那么min为0,否则为corePoolSize 
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果为0且任务列表不为空,则置为1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果此时的线程数不小于min  ,则不需要新增线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //否则会新增一个线程来继续执行任务
            addWorker(null, false);
        }
    }

继续看tryTerminate()方法,这个方法也很重要,在切换线程池状态的很多地方都有用到:

    //方法名很有意思,尝试终止
    final void tryTerminate() {
        //for循环!!!
        for (;;) {
            int c = ctl.get();
            //线程池状态是RUNNING || 状线程池态是TIDYING或SHUTDOWN ||状线程池态是SHUTDOWN并且
            //任务列表不为空,这几种情况就不继续走下去了,返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            //线程池线程数量不为0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //中断一个线程就退出循环,这个方法也要重点看
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            //拿到全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //尝试设置线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {  
                        //成功则调用terminated(),是个空方法,给子类实现
                        terminated();
                    } finally {  
                        //设置线程池状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }


     // 这个方法其实就是用来中断正在等待任务的线程,方法名直译:中断空闲线程
    // 注意这里说的中断其实也只是将线程的状态置为“中断”,并不是说线程在这里就真的停止了
    // 如果onlyOne为true,这里最多会关闭一个worker,因为shutdown()方法需要中断所有的worker,
    // 这里中断一个worker能够帮助shutdown迅速的完成,而不用等待一些还在等待任务的worker结束
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        // 一样要获取全局锁
        mainLock.lock();
        try {
            // 遍历所有的Worker,如果传入的onlyOne为true,那最多会终止一个Worker。
            // 如果传入的onlyOne为false,终止所有的Worker
            for (Worker w : workers) {
                Thread t = w.thread;
                // 这里要获取到worker的独占锁后才能够中断线程
                //这个锁在线程拿到task执行的时候会获取,所以他无法中断正在执行任务的线程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
              //Thread的中断方法interrupt()只是做了个中断标记,真正相应中断的地方是在BlockingQueue
            //的poll()和take()方法,他们会抛出中断异常来达到中断的目的。
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

至此线程池的正常使用方法就走完了,当然还有其他的一些辅助性的知识点这里没有详细讲,有兴趣的小伙伴可以自行再深究一下。这时候回过头来看看线程池的优点:

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。

是不是就更能理解其背后意义了呢。
部分参考CSDN博主「Epoch-Elysian」的文章,原文链接:https://blog.csdn.net/Epoch_Elysian/article/details/107282186

相关文章

网友评论

      本文标题:Java线程池管理类ThreadPollExecutor梳理

      本文链接:https://www.haomeiwen.com/subject/lgzzsltx.html