美文网首页
线程池核心类ThreadPoolExecutor源码解析

线程池核心类ThreadPoolExecutor源码解析

作者: hcy0411 | 来源:发表于2018-08-02 16:54 被阅读0次

    成员变量

    BlockingQueue 任务阻塞队列
    corePoolSize 核心线程数
    maximumPoolSize 最大线程数
    allowCoreThreadTimeOut 是否允许核心线程超时退出
    keepAliveTime 非核心线程超时时间
    ThreadFactory 线程池工厂,线程从该工厂中new
    RejectedExecutionHandler 拒绝策略
    AbortPolicy 直接抛出异常
    DiscardPolicy 直接抛弃
    DiscardOldestPolicy 抛弃最老的任务
    CallerRunsPolicy 调用线程池线程去执行。
    自定义 实现RejectedExecutionHandler,并自己定义策略模式

    其中很重要的一个变量ctl
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    是原子整型变量,主要存线程池状态和工作线程数
    高三位存储线程池状态,低29位存储工作线程数
    private static final int RUNNING = -1 << COUNT_BITS; 运行态
    private static final int SHUTDOWN = 0 << COUNT_BITS; 关闭
    private static final int STOP = 1 << COUNT_BITS; 已停止
    private static final int TIDYING = 2 << COUNT_BITS; 整理
    private static final int TERMINATED = 3 << COUNT_BITS; 结束

    线程池状态转换

    未命名文件.png

    初始是running状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务
    调用shutdown方法,转换成shutdown状态,不接收新任务,只能够继续处理阻塞队列中的任务
    调用shutdownnow方法,转换成stop状态,不接收新任务,同时也不能够继续处理新任务,中断线程。
    调用shutdown 或者shutdownnow方法,线程池从running状态转换成shutdown状态或者stop状态后,会调用tryTerminate方法,该方法中,等所有线程执行完之后,线程池状态会转换成TIDYING 状态,随后会调用terminated方法,完成后
    最后finally方法中线程池状态转换成TERMINATED 状态。
    附上 tryTerminate 代码

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
    // 等到线程池线程个数为0时转换成TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
    //最后转换成TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    

    方法解析

    execute

    线程池入口在execute函数 主要流程如下:
    1、新进来一个runnable 检查工作线程是否已经超过核心线程,如果没有,启动核心线程去执行addwokder方法
    2、如果已经超过核心线程个数,runnable放到阻塞队列中去
    3、如果阻塞队里已满,会启动非核心线程执行addwoker
    4、如果启动非核心线程去执行失败,执行拒绝策略

    核心代码如下

    int c = ctl.get();
    // 步骤1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 步骤二
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 步骤三
    else if (!addWorker(command, false))
    // 步骤四
        reject(command);
    

    addWorker

    该函数主要功能是封装runnable为Woker,然后去执行。
    先了解下woker,是ThreadPoolExecutor的内部类
    private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    继承了aqs和Runnable
    主要成员变量
    /** Thread this worker is running in. Null if factory fails. /
    final Thread thread; 工作线程
    /
    * Initial task to run. Possibly null. */
    Runnable firstTask; 封装任务

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //这里主要是一些线程池状态校验,和线程数校验
              int c = ctl.get();
              int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 获取工作线程个数,校验是否超过核心线程或者最大线程
                    int wc = workerCountOf(c); 
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                }
            }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //封装woker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 这里查看线程池状态,如果是running状态可以添加worker,或者是SHUTDOWN 状态,不是新增加的任务(shuntdown状态不接收新任务,只接受阻塞队列任务)
                     //这种情况会在processWorkerExit方法中可见,具体看该方法解析
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动woker线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 如果失败从workers中移除
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    runWorker

    woker继承runnable接口,run方法的实现是该函数,即线程执行执行该方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 获取第一个任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        // 是否因为异常退出循环
        boolean completedAbruptly = true;
        try {
            // 如果task为空,则通过getTask从阻塞队列中来获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果是STOP状态,中断线程,不在继续执行。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //空函数,可被重写
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //任务执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //空函数,可被重写
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    

    总结一下runWorker方法的执行过程:
    while循环不断地通过getTask()方法获取任务;
    getTask()方法从阻塞队列中取任务;
    如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
    调用task.run()执行任务;
    如果task为null则跳出循环,执行processWorkerExit()方法;
    runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

    getTask

    该方法主要是从阻塞队列中来获取任务

    private Runnable getTask() {
        // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            /*
             * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
             * 1. rs >= STOP,线程池是否正在stop;
             * 2. 阻塞队列是否为空。
             * 如果以上条件满足,则将workerCount减1并返回null。
             * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // timed变量用于判断是否需要进行超时控制。
            // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
            // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
            // 对于超过核心线程数量的这些线程,需要进行超时控制 ,即非核心线程有超时限制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            /*
             * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
             * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
             * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
             * 如果减1失败,则返回重试。
             * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                /*
                 * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null,针对非核心线程。
                 * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
                 * 
                 */
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 如果 r == null,说明已经超时,timedOut设置为true
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
                timedOut = false;
            }
        }
    }
    

    如果返回null runwoker方法会跳出循环 执行processWorkerExit方法

    processWorkerExit

    该方法是线程退出,修改线程池监控信息,包括增加完成任务数,减少工作线程数

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // completedAbruptly线程执行时是否发生异常。
        // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
        // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。  
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //统计完成的任务数
            completedTaskCount += w.completedTasks;
            // 从workers中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 根据线程池状态进行判断是否结束线程池
        tryTerminate();
    
        int c = ctl.get();
        /*
         * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
         * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
         * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
         */
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    tryTerminate

    该函数是尝试去终止线程池

    final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                /**
                * 如果是运行状态,退出返回
                * 如果是TIDYING状态,说明已经正在终止,不必继续尝试终止,返回
                * 如果是SHUTDOWN状态,但是阻塞队列不为空,说明还有些任务没执行完,返回。
                **/
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 加锁设置状态TIDYING,
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            //结束动作,可重写
                            terminated();
                        } finally {
                            //设置最终状态TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            //唤醒termination Condition等待队列,相应的等待操作在awaitTermination函数
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    

    shutdown

    该方法主要就是把线程池状态设置成SHUTDOWN,然后尝试终止线程池

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(SHUTDOWN);
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    

    shutdownNow

    该方法主要就是把线程池状态设置成STOP,然后终止运行中线程,返回阻塞队列剩余任务

        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    

    finalize

    ThreadPoolExecutor重写了finalize方法,即在垃圾回收的时候会执行,调用shutdown方法,保证线程池垃圾回收时,线程池没有线程了

        protected void finalize() {
            SecurityManager sm = System.getSecurityManager();
            if (sm == null || acc == null) {
                shutdown();
            } else {
                PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
                AccessController.doPrivileged(pa, acc);
            }
        }
    

    相关文章

      网友评论

          本文标题:线程池核心类ThreadPoolExecutor源码解析

          本文链接:https://www.haomeiwen.com/subject/ufjjvftx.html