美文网首页
什么是线程池

什么是线程池

作者: yanghx | 来源:发表于2021-02-02 17:15 被阅读0次

    什么是线程池

    本篇文章是类似笔记的形式。文笔写不好,而且会大量摘抄别人的文章。

    类图

    类图

    重点是 ThreadPoolExecutor。 是我们常用的线程池实现类

    初始化参数

    1. int corePoolSize 核心线程数
    2. int maximunPoolSize 最大线程数
    3. long keepAliveTime 线程空闲最大存活时间
      • 在线程池空闲时线程继续存活的时间。注意:keepAliveTime 只有线程数大于 corePoolSize 才有效;
    4. BlockingQueue workQueue 阻塞队列
    5. ThreadFactory threadFactory 线程工厂
    6. RejectedExecutionHandler handler 拒绝策略
      • AbortPolicy 抛出 RejectedExecutionException 异常(默认)
      • CallerRunsPolicy 直接在调用线程执行
      • DiscardOldestPolicy 丢弃阻塞队列中队首的任务
      • DiscardPolicy 直接丢弃当前任务

    线程池工作机制

    1、如果当前运行的线程少于 corePoolSize,则「创建新的线程」来执行任务(注意,执行这一步骤需要获取全局锁);
    2、如果运行的线程等于或多于 corePoolSize,则将任务「加入 BlockingQueue」;
    3、如果 BlockingQueue 已满,则「创建新的线程」来处理任务;
    4、如果创建新线程将使当前运行的线程超出 maximumPoolSize,将「触发拒绝策略」并调用RejectedExecutionHandler#rejectedExecution()方法。

    image.png

    线程池代码解析

    属性

    静态属性

      //高3位:表示当前线程池运行状态   除去高3位之后的低位:表示当前线程池中所拥有的线程数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位。
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //低COUNT_BITS位 所能表达的最大数值。 000 11111111111111111111 => 5亿多。
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        //111 000000000000000000  转换成整数,其实是一个负数
        private static final int RUNNING    = -1 << COUNT_BITS;
        //000 000000000000000000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //001 000000000000000000
        private static final int STOP       =  1 << COUNT_BITS;
        //010 000000000000000000
        private static final int TIDYING    =  2 << COUNT_BITS;
        //011 000000000000000000
        private static final int TERMINATED =  3 << COUNT_BITS;
    
    
        /**
         * The default rejected execution handler
         */
        //缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式。
        private static final RejectedExecutionHandler defaultHandler =
                new AbortPolicy();
    
       
    

    成员属性

      //任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue
        //workQueue  instanceOf ArrayBrokingQueue   LinkedBrokingQueue  同步队列
        private final BlockingQueue<Runnable> workQueue;
    
       //线程池全局锁,增加worker 减少 worker 时需要持有mainLock , 修改线程池运行状态时,也需要。
        private final ReentrantLock mainLock = new ReentrantLock();
    
       //线程池中真正存放 worker->thread 的地方。
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
     /**
         * Wait condition to support awaitTermination
         */
        //当外部线程调用  awaitTermination() 方法时,外部线程会等待当前线程池状态为 Termination 为止。
        //等待是如何实现的? 就是将外部线程 封装成 waitNode 放入到 Condition 队列中了, waitNode.Thread 就是外部线程,会被park掉(处于WAITING状态)。
        //当线程池 状态 变为 Termination时,会去唤醒这些线程。通过 termination.signalAll() ,唤醒之后这些线程会进入到 阻塞队列,然后头结点会去抢占mainLock。
        //抢占到的线程,会继续执行awaitTermination() 后面程序。这些线程最后,都会正常执行。
        //简单理解:termination.await() 会将线程阻塞在这。
        //         termination.signalAll() 会将阻塞在这的线程依次唤醒
        private final Condition termination = mainLock.newCondition();
    
    
       /**
         * Tracks largest attained pool size. Accessed only under
         * mainLock.
         */
        //记录线程池生命周期内 线程数最大值
        private int largestPoolSize;
    
        /**
         * Counter for completed tasks. Updated only on termination of
         * worker threads. Accessed only under mainLock.
         */
        //记录线程池所完成任务总数 ,当worker退出时会将 worker完成的任务累积到completedTaskCount
        private long completedTaskCount;
    
        /*
         * All user control parameters are declared as volatiles so that
         * ongoing actions are based on freshest values, but without need
         * for locking, since no internal invariants depend on them
         * changing synchronously with respect to other actions.
         */
    
        /**
         * Factory for new threads. All threads are created using this
         * factory (via method addWorker).  All callers must be prepared
         * for addWorker to fail, which may reflect a system or user's
         * policy limiting the number of threads.  Even though it is not
         * treated as an error, failure to create threads may result in
         * new tasks being rejected or existing ones remaining stuck in
         * the queue.
         *
         * We go further and preserve pool invariants even in the face of
         * errors such as OutOfMemoryError, that might be thrown while
         * trying to create threads.  Such errors are rather common due to
         * the need to allocate a native stack in Thread.start, and users
         * will want to perform clean pool shutdown to clean up.  There
         * will likely be enough memory available for the cleanup code to
         * complete without encountering yet another OutOfMemoryError.
         */
        //创建线程时会使用 线程工厂,当我们使用 Executors.newFix...  newCache... 创建线程池时,使用的是 DefaultThreadFactory
        //一般不建议使用Default线程池,推荐自己实现ThreadFactory
        private volatile ThreadFactory threadFactory;
    
        /**
         * Handler called when saturated or shutdown in execute.
         */
        //拒绝策略,juc包提供了4中方式,默认采用 Abort..抛出异常的方式。
        private volatile RejectedExecutionHandler handler;
    
        /**
         * Timeout in nanoseconds for idle threads waiting for work.
         * Threads use this timeout when there are more than corePoolSize
         * present or if allowCoreThreadTimeOut. Otherwise they wait
         * forever for new work.
         */
        //空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出部分会被超时。
        //allowCoreThreadTimeOut == true 核心数量内的线程 空闲时 也会被回收。
        private volatile long keepAliveTime;
    
        /**
         * If false (default), core threads stay alive even when idle.
         * If true, core threads use keepAliveTime to time out waiting
         * for work.
         */
        //控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。
        private volatile boolean allowCoreThreadTimeOut;
    
        /**
         * Core pool size is the minimum number of workers to keep alive
         * (and not allow to time out etc) unless allowCoreThreadTimeOut
         * is set, in which case the minimum is zero.
         */
        //核心线程数量限制。
        private volatile int corePoolSize;
    
        /**
         * Maximum pool size. Note that the actual maximum is internally
         * bounded by CAPACITY.
         */
        //线程池最大线程数量限制。
        private volatile int maximumPoolSize;
    
    

    构造方法

      public ThreadPoolExecutor(int corePoolSize,//核心线程数限制
                                  int maximumPoolSize,//最大线程限制
                                  long keepAliveTime,//空闲线程存活时间
                                  TimeUnit unit,//时间单位 seconds nano..
                                  BlockingQueue<Runnable> workQueue,//任务队列
                                  ThreadFactory threadFactory,//线程工厂
                                  RejectedExecutionHandler handler/*拒绝策略*/) {
            //判断参数是否越界
            if (corePoolSize < 0 ||
                    maximumPoolSize <= 0 ||
                    maximumPoolSize < corePoolSize ||
                    keepAliveTime < 0)
                throw new IllegalArgumentException();
    
            //工作队列 和 线程工厂 和 拒绝策略 都不能为空。
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
    
    
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
    
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    

    内部类 Worker

    
    private final class Worker
                extends AbstractQueuedSynchronizer
                implements Runnable
        {
            //Worker采用了AQS的独占模式
            //独占模式:两个重要属性  state  和  ExclusiveOwnerThread
            //state:0时表示未被占用 > 0时表示被占用   < 0 时 表示初始状态,这种情况下不能被抢锁。
            //ExclusiveOwnerThread:表示独占锁的线程。
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            //worker内部封装的工作线程
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            //假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。
            Runnable firstTask;
    
            /** Per-thread task counter */
            //记录当前worker所完成任务数量。
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            //firstTask可以为null。为null 启动后会到queue中获取。
            Worker(Runnable firstTask) {
                //设置AQS独占模式为初始化中状态,这个时候 不能被抢占锁。
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //使用线程工厂创建了一个线程,并且将当前worker 指定为 Runnable,也就是说当thread启动的时候,会以worker.run()为入口。
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            //当worker启动时,会执行run()
            public void run() {
                //ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入。
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
            //判断当前worker的独占锁是否被独占。
            //0 表示未被占用
            //1 表示已占用
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
    
            //尝试去占用worker的独占锁
            //返回值 表示是否抢占成功
            protected boolean tryAcquire(int unused) {
                //使用CAS修改 AQS中的 state ,期望值为0(0时表示未被占用),修改成功表示当前线程抢占成功
                //那么则设置 ExclusiveOwnerThread 为当前线程。
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
    
                return false;
            }
    
            //外部不会直接调用这个方法 这个方法是AQS 内调用的,外部调用unlock时 ,unlock->AQS.release() ->tryRelease()
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            //加锁,加锁失败时,会阻塞当前线程,直到获取到锁位置。
            public void lock()        { acquire(1); }
    
            //尝试去加锁,如果当前锁是未被持有状态,那么加锁成功后 会返回true,否则不会阻塞当前线程,直接返回false.
            public boolean tryLock()  { return tryAcquire(1); }
    
            //一般情况下,咱们调用unlock 要保证 当前线程是持有锁的。
            //特殊情况,当worker的state == -1 时,调用unlock 表示初始化state 设置state == 0
            //启动worker之前会先调用unlock()这个方法。会强制刷新ExclusiveOwnerThread == null State==0
            public void unlock()      { release(1); }
    
            //就是返回当前worker的lock是否被占用。
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    工具方法

    
    
        // Packing and unpacking ctl
        //获取当前线程池运行状态
        //~000 11111111111111111111 => 111 000000000000000000000
        //c == ctl = 111 000000000000000000111
        //111 000000000000000000111
        //111 000000000000000000000
        //111 000000000000000000000
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
        //获取当前线程池线程数量
        //c == ctl = 111 000000000000000000111
        //111 000000000000000000111
        //000 111111111111111111111
        //000 000000000000000000111 => 7
        private static int workerCountOf(int c)  { return c & CAPACITY; }
    
        //用在重置当前线程池ctl值时  会用到
        //rs 表示线程池状态   wc 表示当前线程池中worker(线程)数量
        //111 000000000000000000
        //000 000000000000000111
        //111 000000000000000111
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
        //比较当前线程池ctl所表示的状态,是否小于某个状态s
        //c = 111 000000000000000111 <  000 000000000000000000 == true
        //所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        //比较当前线程池ctl所表示的状态,是否大于等于某个状态s
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        //小于SHUTDOWN 的一定是RUNNING。 SHUTDOWN == 0
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
        /**
         * Attempts to CAS-increment the workerCount field of ctl.
         */
        //使用CAS方式 让ctl值+1 ,成功返回true, 失败返回false
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        /**
         * Attempts to CAS-decrement the workerCount field of ctl.
         */
        //使用CAS方式 让ctl值-1 ,成功返回true, 失败返回false
        private boolean compareAndDecrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect - 1);
        }
    
        /**
         * Decrements the workerCount field of ctl. This is called only on
         * abrupt termination of a thread (see processWorkerExit). Other
         * decrements are performed within getTask.
         */
        //将ctl值减一,这个方法一定成功
        private void decrementWorkerCount() {
            //这里会一直重试,直到成功为止。
            do {} while (! compareAndDecrementWorkerCount(ctl.get()));
        }
    
    

    execute 方法

    线程池有 submit、execute 方法提交任务。 而 submit 是对 execute方法的封装 。多了一个返回值为 Future。这个可以单独讲

    先放一张图

    image.png
     /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        //command 可以是普通的Runnable 实现类,也可以是 FutureTask
        public void execute(Runnable command) {
            //非空判断..
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            //获取ctl最新值赋值给c,ctl :高3位 表示线程池状态,低位表示当前线程池线程数量。
            int c = ctl.get();
            //workerCountOf(c) 获取出当前线程数量
            //条件成立:表示当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新的线程。
            if (workerCountOf(c) < corePoolSize) {
                //addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask
                //core == true 表示采用核心线程数量限制  false表示采用 maximumPoolSize
                if (addWorker(command, true))
                    //创建成功后,直接返回。addWorker方法里面会启动新创建的worker,将firstTask执行。
                    return;
    
                //执行到这条语句,说明addWorker一定是失败了...
                //有几种可能呢??
                //1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize成立后,
                //其它线程可能也成立了,并且向线程池中创建了worker。这个时候线程池中的核心线程数已经达到,所以...
                //2.当前线程池状态发生改变了。 RUNNING SHUTDOWN STOP TIDYING TERMINATION
                //当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, true|false) 一定会失败。
                //SHUTDOWN 状态下,也有可能创建成功。前提 firstTask == null 而且当前 queue  不为空。特殊情况。
                c = ctl.get();
            }
    
    
            //执行到这里有几种情况?
            //1.当前线程数量已经达到corePoolSize
            //2.addWorker失败..
    
            //条件成立:说明当前线程池处于running状态,则尝试将 task 放入到workQueue中。
            if (isRunning(c) && workQueue.offer(command)) {
                //执行到这里,说明offer提交任务成功了..
    
                //再次获取ctl保存到recheck。
                int recheck = ctl.get();
    
                //条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
                //这种情况 需要把刚刚提交的任务删除掉。
                //条件二:remove(command) 有可能成功,也有可能失败
                //成功:提交之后,线程池中的线程还未消费(处理)
                //失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理。
                if (! isRunning(recheck) && remove(command))
                    //提交之后线程池状态为 非running 且 任务出队成功,走个拒绝策略。
                    reject(command);
    
                //有几种情况会到这里?
                //1.当前线程池是running状态(这个概率最大)
                //2.线程池状态是非running状态 但是remove提交的任务失败.
    
                //担心 当前线程池是running状态,但是线程池中的存活线程数量是0,这个时候,如果是0的话,会很尴尬,任务没线程去跑了,
                //这里其实是一个担保机制,保证线程池在running状态下,最起码得有一个线程在工作。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
            //执行到这里,有几种情况?
            //1.offer失败
            //2.当前线程池是非running状态
    
            //1.offer失败,需要做什么? 说明当前queue 满了!这个时候 如果当前线程数量尚未达到maximumPoolSize的话,会创建新的worker直接执行command
            //假设当前线程数量达到maximumPoolSize的话,这里也会失败,也走拒绝策略。
    
            //2.线程池状态为非running状态,这个时候因为 command != null addWorker 一定是返回false。
            else if (!addWorker(command, false))
                reject(command);
    
        }
    

    addWorker方法

    看完execute方法过程,发现内部是通过 addWordk去添加任务的。

      /**
         * Checks if a new worker can be added with respect to current
         * pool state and the given bound (either core or maximum). If so,
         * the worker count is adjusted accordingly, and, if possible, a
         * new worker is created and started, running firstTask as its
         * first task. This method returns false if the pool is stopped or
         * eligible to shut down. It also returns false if the thread
         * factory fails to create a thread when asked.  If the thread
         * creation fails, either due to the thread factory returning
         * null, or due to an exception (typically OutOfMemoryError in
         * Thread.start()), we roll back cleanly.
         *
         * @param firstTask the task the new thread should run first (or
         * null if none). Workers are created with an initial first task
         * (in method execute()) to bypass queuing when there are fewer
         * than corePoolSize threads (in which case we always start one),
         * or when the queue is full (in which case we must bypass queue).
         * Initially idle threads are usually created via
         * prestartCoreThread or to replace other dying workers.
         *
         * @param core if true use corePoolSize as bound, else
         * maximumPoolSize. (A boolean indicator is used here rather than a
         * value to ensure reads of fresh values after checking other pool
         * state).
         * @return true if successful
         */
        //firstTask 可以为null,表示启动worker之后,worker自动到queue中获取任务.. 如果不是null,则worker优先执行firstTask
        //core 采用的线程数限制 如果为true 采用 核心线程数限制  false采用 maximumPoolSize线程数限制.
    
        //返回值总结:
        //true 表示创建worker成功,且线程启动
    
        //false 表示创建失败。
        //1.线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
        //2.rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
        //3.当前线程池已经达到指定指标(coprePoolSize 或者 maximumPoolSIze)
        //4.threadFactory 创建的线程是null
        private boolean addWorker(Runnable firstTask, boolean core) {
            //自旋 判断当前线程池状态是否允许创建线程的事情。
            retry:
            for (;;) {
                //获取当前ctl值保存到c
                int c = ctl.get();
                //获取当前线程池运行状态 保存到rs长
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
    
                //条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是running状态
                //条件二:前置条件,当前的线程池状态不是running状态  ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
                //rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
                //表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的。 & 当前任务队列不是空
                //排除掉这种情况,当前线程池是SHUTDOWN状态,但是队列里面还有任务尚未处理完,这个时候是允许添加worker,但是不允许再次提交task。
                if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                                firstTask == null &&
                                ! workQueue.isEmpty()))
                    //什么情况下回返回false?
                    //线程池状态 rs > SHUTDOWN
                    //rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
                    return false;
    
                //上面这些代码,就是判断 当前线程池状态 是否允许添加线程。
    
    
                //内部自旋 获取创建线程令牌的过程。
                for (;;) {
                    //获取当前线程池中线程数量 保存到wc中
                    int wc = workerCountOf(c);
    
                    //条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
                    //条件二:wc >= (core ? corePoolSize : maximumPoolSize)
                    //core == true ,判断当前线程数量是否>=corePoolSize,会拿核心线程数量做限制。
                    //core == false,判断当前线程数量是否>=maximumPoolSize,会拿最大线程数量做限制。
                    if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                        //执行到这里,说明当前无法添加线程了,已经达到指定限制了
                        return false;
    
                    //条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌。
                    //条件失败:说明可能有其它线程,修改过ctl这个值了。
                    //可能发生过什么事?
                    //1.其它线程execute() 申请过令牌了,在这之前。导致CAS失败
                    //2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生变化了,咱们知道 ctl 高3位表示状态
                    //状态改变后,cas也会失败。
                    if (compareAndIncrementWorkerCount(c))
                        //进入到这里面,一定是cas成功啦!申请到令牌了
                        //直接跳出了 retry 外部这个for自旋。
                        break retry;
    
                    //CAS失败,没有成功的申请到令牌
                    //获取最新的ctl值
                    c = ctl.get();  // Re-read ctl
                    //判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown. shutdownNow 会导致状态变化。
                    if (runStateOf(c) != rs)
                        //状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程。
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
    
    
    
    
            //表示创建的worker是否已经启动,false未启动  true启动
            boolean workerStarted = false;
            //表示创建的worker是否添加到池子中了 默认false 未添加 true是添加。
            boolean workerAdded = false;
    
            //w表示后面创建worker的一个引用。
            Worker w = null;
            try {
                //创建Worker,执行完后,线程应该是已经创建好了。
                w = new Worker(firstTask);
    
                //将新创建的worker节点的线程 赋值给 t
                final Thread t = w.thread;
    
                //为什么要做 t != null 这个判断?
                //为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现。
                //万一哪个 小哥哥 脑子一热,有bug,创建出来的线程 是null、、
                //Doug lea考虑的比较全面。肯定会防止他自己的程序报空指针,所以这里一定要做!
                if (t != null) {
                    //将全局锁的引用保存到mainLock
                    final ReentrantLock mainLock = this.mainLock;
                    //持有全局锁,可能会阻塞,直到获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持锁。
                    mainLock.lock();
                    //从这里加锁之后,其它线程 是无法修改当前线程池状态的。
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        //获取最新线程池运行状态保存到rs中
                        int rs = runStateOf(ctl.get());
    
                        //条件一:rs < SHUTDOWN 成立:最正常状态,当前线程池为RUNNING状态.
                        //条件二:前置条件:当前线程池状态不是RUNNING状态。
                        //(rs == SHUTDOWN && firstTask == null)  当前状态为SHUTDOWN状态且firstTask为空。其实判断的就是SHUTDOWN状态下的特殊情况,
                        //只不过这里不再判断队列是否为空了
                        if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                            //t.isAlive() 当线程start后,线程isAlive会返回true。
                            //防止脑子发热的程序员,ThreadFactory创建线程返回给外部之前,将线程start了。。
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
    
                            //将咱们创建的worker添加到线程池中。
                            workers.add(w);
                            //获取最新当前线程池线程数量
                            int s = workers.size();
                            //条件成立:说明当前线程数量是一个新高。更新largestPoolSize
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //表示线程已经追加进线程池中了。
                            workerAdded = true;
                        }
                    } finally {
                        //释放线程池全局锁。
                        mainLock.unlock();
                    }
                    //条件成立:说明 添加worker成功
                    //条件失败:说明线程池在lock之前,线程池状态发生了变化,导致添加失败。
                    if (workerAdded) {
                        //成功后,则将创建的worker启动,线程启动。
                        t.start();
                        //启动标记设置为true
                        workerStarted = true;
                    }
                }
    
            } finally {
                //条件成立:! workerStarted 说明启动失败,需要做清理工作。
                if (! workerStarted)
                    //失败时做什么清理工作?
                    //1.释放令牌
                    //2.将当前worker清理出workers集合
                    addWorkerFailed(w);
            }
    
            //返回新创建的线程是否启动。
            return workerStarted;
        }
    

    runWorker 方法 重点

    看 addWork中这个行 t.start();
    这个t指的是 Worker 示例中的thread属性
    而Work类实例化时,将自身赋值给thread。所以start的方法,就是调用的Work类的run方法。run方法中只有一个 runWorker方法

        //Worker方法的构造方法
    
       //firstTask可以为null。为null 启动后会到queue中获取。
            Worker(Runnable firstTask) {
                //设置AQS独占模式为初始化中状态,这个时候 不能被抢占锁。
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //使用线程工厂创建了一个线程,并且将当前worker 指定为 Runnable,也就是说当thread启动的时候,会以worker.run()为入口。
                this.thread = getThreadFactory().newThread(this);
            }
    
      // Worker类中的run方法。
    
         /** Delegates main run loop to outer runWorker  */
            //当worker启动时,会执行run()
            public void run() {
                //ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入。
                runWorker(this);
            }
    

    下面看看RunWorker方法。

     //w 就是启动worker
        final void runWorker(Worker w) {
            //wt == w.thread
            Thread wt = Thread.currentThread();
            //将初始执行task赋值给task
            Runnable task = w.firstTask;
            //清空当前w.firstTask引用
            w.firstTask = null;
            //这里为什么先调用unlock? 就是为了初始化worker state == 0 和 exclusiveOwnerThread ==null
            w.unlock(); // allow interrupts
    
            //是否是突然退出,true->发生异常了,当前线程是突然退出,回头需要做一些处理
            //false->正常退出。
            boolean completedAbruptly = true;
    
            try {
                //条件一:task != null 指的就是firstTask是不是null,如果不是null,直接执行循环体里面。
                //条件二:(task = getTask()) != null   条件成立:说明当前线程在queue中获取任务成功,getTask这个方法是一个会阻塞线程的方法
                //getTask如果返回null,当前线程需要执行结束逻辑。
                while (task != null || (task = getTask()) != null) {
                    //worker设置独占锁 为当前线程
                    //为什么要设置独占锁呢?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作。
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
    
                    //条件一:runStateAtLeast(ctl.get(), STOP)  说明线程池目前处于STOP/TIDYING/TERMINATION 此时线程一定要给它一个中断信号
                    //条件一成立:runStateAtLeast(ctl.get(), STOP)&& !wt.isInterrupted()
                    //上面如果成立:说明当前线程池状态是>=STOP 且 当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断。
    
                    //假设:runStateAtLeast(ctl.get(), STOP) == false
                    // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) 在干吗呢?
                    // Thread.interrupted() 获取当前中断状态,且设置中断位为false。连续调用两次,这个interrupted()方法 第二次一定是返回false.
                    // runStateAtLeast(ctl.get(), STOP) 大概率这里还是false.
                    // 其实它在强制刷新当前线程的中断标记位 false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位 设置为了 true,且没有处理
                    // 这里一定要强制刷新一下。不会再影响到后面的task了。
                    //假设:Thread.interrupted() == true  且 runStateAtLeast(ctl.get(), STOP)) == true
                    //这种情况有发生几率么?
                    //有可能,因为外部线程在 第一次 (runStateAtLeast(ctl.get(), STOP) == false 后,有机会调用shutdown 、shutdownNow方法,将线程池状态修改
                    //这个时候,也会将当前线程的中断标记位 再次设置回 中断状态。
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
    
    
                    try {
                        //钩子方法,留给子类实现的
                        beforeExecute(wt, task);
                        //表示异常情况,如果thrown不为空,表示 task运行过程中 向上层抛出异常了。
                        Throwable thrown = null;
                        try {
                            //task 可能是FutureTask 也可能是 普通的Runnable接口实现类。
                            //如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。这个不清楚,请看上一期,在b站。
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //钩子方法,留给子类实现的
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //将局部变量task置为Null
                        task = null;
                        //更新worker完成任务数量
                        w.completedTasks++;
    
                        //worker处理完一个任务后,会释放掉独占锁
                        //1.正常情况下,会再次回到getTask()那里获取任务  while(getTask...)
                        //2.task.run()时内部抛出异常了..
                        w.unlock();
                    }
                }
    
                //什么情况下,会来到这里?
                //getTask()方法返回null时,说明当前线程应该执行退出逻辑了。
                completedAbruptly = false;
            } finally {
    
                //task.run()内部抛出异常时,直接从 w.unlock() 那里 跳到这一行。
                //正常退出 completedAbruptly == false
                //异常退出 completedAbruptly == true
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
    

    getTask 方法

     //什么情况下会返回null?
        //1.rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
        //2.前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()
        //3.线程池中的线程数量 超过 最大限制时,会有一部分线程返回Null
        //4.线程池中的线程数超过corePoolSize时,会有一部分线程 超时后,返回null。
        private Runnable getTask() {
            //表示当前线程获取任务是否超时 默认false true表示已超时
            boolean timedOut = false; // Did the last poll() time out?
    
            //自旋
            for (;;) {
                //获取最新ctl值保存到c中。
                int c = ctl.get();
                //获取线程池当前运行状态
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                //条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING状态,可能是 SHUTDOWN/STOP....
                //条件二:(rs >= STOP || workQueue.isEmpty())
                //2.1:rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
                //2.2:前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()条件成立:说明当前线程池状态为SHUTDOWN状态 且 任务队列已空,此时一定返回null。
                //返回null,runWorker方法就会将返回Null的线程执行线程退出线程池的逻辑。
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    //使用CAS+死循环的方式让 ctl值 -1
                    decrementWorkerCount();
                    return null;
                }
    
                //执行到这里,有几种情况?
                //1.线程池是RUNNING状态
                //2.线程池是SHUTDOWN状态 但是队列还未空,此时可以创建线程。
    
                //获取线程池中的线程数量
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                //timed == true 表示当前这个线程 获取 task 时 是支持超时机制的,使用queue.poll(xxx,xxx); 当获取task超时的情况下,下一次自旋就可能返回null了。
                //timed == false 表示当前这个线程 获取 task 时 是不支持超时机制的,当前线程会使用 queue.take();
    
                //情况1:allowCoreThreadTimeOut == true 表示核心线程数量内的线程 也可以被回收。
                //所有线程 都是使用queue.poll(xxx,xxx) 超时机制这种方式获取task.
                //情况2:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程。
                //wc > corePoolSize
                //条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程,都是用poll 支持超时的方式去获取任务,
                //这样,就会可能有一部分线程获取不到任务,获取不到任务 返回Null,然后..runWorker会执行线程退出逻辑。
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    
                //条件一:(wc > maximumPoolSize || (timed && timedOut))
                //1.1:wc > maximumPoolSize  为什么会成立?setMaximumPoolSize()方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
                //1.2: (timed && timedOut) 条件成立:前置条件,当前线程使用 poll方式获取task。上一次循环时  使用poll方式获取任务时,超时了
                //条件一 为true 表示 线程可以被回收,达到回收标准,当确实需要回收时再回收。
    
                //条件二:(wc > 1 || workQueue.isEmpty())
                //2.1: wc > 1  条件成立,说明当前线程池中还有其他线程,当前线程可以直接回收,返回null
                //2.2: workQueue.isEmpty() 前置条件 wc == 1, 条件成立:说明当前任务队列 已经空了,最后一个线程,也可以放心的退出。
                if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    //使用CAS机制 将 ctl值 -1 ,减1成功的线程,返回null
                    //CAS成功的,返回Null
                    //CAS失败? 为什么会CAS失败?
                    //1.其它线程先你一步退出了
                    //2.线程池状态发生变化了。
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    //再次自旋时,timed有可能就是false了,因为当前线程cas失败,很有可能是因为其它线程成功退出导致的,再次咨询时
                    //检查发现,当前线程 就可能属于 不需要回收范围内了。
                    continue;
                }
    
    
    
    
                try {
                    //获取任务的逻辑
    
    
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
    
                    //条件成立:返回任务
                    if (r != null)
                        return r;
    
                    //说明当前线程超时了...
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    

    shutdown 方法

    优雅的关闭

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            //获取线程池全局锁
            mainLock.lock();
            try {
                checkShutdownAccess();
                //设置线程池状态为SHUTDOWN
                advanceRunState(SHUTDOWN);
                //中断空闲线程
                interruptIdleWorkers();
                //空方法,子类可以扩展
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                //释放线程池全局锁
                mainLock.unlock();
            }
            //回头说..
            tryTerminate();
        }
    
    

    shutdownNow 方法

    立即关闭。不管有没有任务,或者正在执行的任务

    public List<Runnable> shutdownNow() {
            //返回值引用
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            //获取线程池全局锁
            mainLock.lock();
            try {
                checkShutdownAccess();
                //设置线程池状态为STOP
                advanceRunState(STOP);
                //中断线程池中所有线程
                interruptWorkers();
                //导出未处理的task
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
            //返回当前任务队列中 未处理的任务。
            return tasks;
        }
    

    tryTerminate 方法

    final void tryTerminate() {
            //自旋
            for (;;) {
                //获取最新ctl值
                int c = ctl.get();
                //条件一:isRunning(c)  成立,直接返回就行,线程池很正常!
                //条件二:runStateAtLeast(c, TIDYING) 说明 已经有其它线程 在执行 TIDYING -> TERMINATED状态了,当前线程直接回去。
                //条件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
                //SHUTDOWN特殊情况,如果是这种情况,直接回去。得等队列中的任务处理完毕后,再转化状态。
                if (isRunning(c) ||
                        runStateAtLeast(c, TIDYING) ||
                        (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
    
                //什么情况会执行到这里?
                //1.线程池状态 >= STOP
                //2.线程池状态为 SHUTDOWN 且 队列已经空了
    
                //条件成立:当前线程池中的线程数量 > 0
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    //中断一个空闲线程。
                    //空闲线程,在哪空闲呢? queue.take() | queue.poll()
                    //1.唤醒后的线程 会在getTask()方法返回null
                    //2.执行退出逻辑的时候会再次调用tryTerminate() 唤醒下一个空闲线程
                    //3.因为线程池状态是 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了) 最终调用addWorker时,会失败。
                    //最终空闲线程都会在这里退出,非空闲线程 当执行完当前task时,也会调用tryTerminate方法,有可能会走到这里。
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                //执行到这里的线程是谁?
                //workerCountOf(c) == 0 时,会来到这里。
                //最后一个退出的线程。 咱们知道,在 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了)
                //线程唤醒后,都会执行退出逻辑,退出过程中 会 先将 workerCount计数 -1 => ctl -1。
                //调用tryTerminate 方法之前,已经减过了,所以0时,表示这是最后一个退出的线程了。
    
                final ReentrantLock mainLock = this.mainLock;
                //获取线程池全局锁
                mainLock.lock();
                try {
                    //设置线程池状态为TIDYING状态。
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            //调用钩子方法
                            terminated();
                        } finally {
                            //设置线程池状态为TERMINATED状态。
                            ctl.set(ctlOf(TERMINATED, 0));
                            //唤醒调用 awaitTermination() 方法的线程。
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    //释放线程池全局锁。
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    

    FutureTask

    CompletableFuture

    大佬的文章 https://javadoop.com/post/completable-future

    相关文章

      网友评论

          本文标题:什么是线程池

          本文链接:https://www.haomeiwen.com/subject/oshttltx.html