美文网首页
ThreadPoolExecutor源码分析

ThreadPoolExecutor源码分析

作者: 落落的博客 | 来源:发表于2019-11-10 15:49 被阅读0次

    一、引言

    线程是程序执行的最小单元,合理的使用线程可以充分利用系统资源、提高吞吐率以及加快响应时间。然而在实际应用中,很多线程都是朝生夕死的。而创建和销毁线程又极大的耗费系统资源,因此从jdk1.5开始引入了线程池的概念,用户可以使用Executors静态工厂类来创建各种各样的满足自己的需求的线程池。一般来说最常用的线程池主要有以下三种:

    • ThreadPoolExecutor:基础多任务线程池框架。
    • ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,通过实现延时队列实现定时任务
    • ForkJoinPool(>=jdk1.7):并行任务框架,利用多核处理器并行执行任务

    二、ThreadPoolExecutor

    ThreadPoolExecutor在初始化之前可以根据用户的配置来维护一个线程池来执行用户提交的任务。其核心方法是execute(),用来提交并执行任务(实现了Runnable接口),同时通过shutDown和shutDownNow来实现对线程池的生命周期的管理。在介绍核心方法之前,我们先介绍一些ThreadPoolExecutor的重要组成变量或类

    1.ctl
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      ctl是ThreadPoolExecutor类的核心,其包含了线程池运行状态以及线程池中的线程数,利用AtomicInteger来保证对这个变量的修改是原子性的。如下图所示为ctl结构图

    ThreadPoolExecutor1.png

    ctl的低29位用来表示线程池中线程的数量,高3位用来表示线程池的运行状态。

    线程池的运行状态:

    private static final int COUNT_BITS = Integer.SIZE - 3; //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //00011111111111111111111111111111 = 2^29 - 1
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //11100000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //00000000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS; //00100000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS; //01000000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS; //01100000000000000000000000000000
    
    private static int runStateOf(int c)     { return c & ~CAPACITY; }  //获取线程池状态
    private static int workerCountOf(int c)  { return c & CAPACITY; }   //获取线程池中线程的个数
    private static int ctlOf(int rs, int wc) { return rs | wc; }        //根据线程池状态和线程个数组合称ctrl
    
    • RUNNING:线程池正常运行,可接受新的任务或者消费队列中的任务
    • SHUTDOWN:线程池关闭(主线程调用shutdown方法),不再接受新的任务,但会继续执行队列中的任务
    • STOP:线程池终止(主线程调用shudownNow方法),既不接受新的任务也不再执行队列中的任务
    • TIDYING:所有的任务都被终止,线程数为0,所有线程都被终止。当线程池转换到TIDYING状态时会执行terminated(默认什么都不执行,由子类复写)方法
    • TERMINATED:TIDYING状态下执行完terminated方法,线程池转化为此状态

    线程池状态转换图:


    ThreadPoolExecutor2.png

    后两种状态相对来说比较复杂,我们这里着重关注前三种状态即可。

    2.构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    先来看一下构造函数的入参:

    1. corePoolSize:线程池核心线程数
    2. maximumPoolSize:线程池最大线程数
    3. keepAliveTime:线程空闲时存活时长,当allowCoreThreadTimeOut=true时,核心线程也会被回收
    4. unit:keepAliveTime的单位
    5. workQueue:任务存储队列,主要有以下几种队列
      • LinkedBlockingQueue:无界的FIFO队列(maximumPoolSize失效)
      • ArrayBlockingQueue:有界FIFO队列
      • SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待移除操作完成后才能执行
      • ProorityBlockingQueue:具有优先级的无界阻塞队列
    6. threadFactory:创建线程的工厂类(默认为DefaultThreadFactory)
    7. handler:线程池不能接受新线程时拒绝策略,默认有以下几种策略
      • AbortPolicy:直接抛异常(默认)
      • CallerRunsPolicy:由创建线程池的线程执行当前提交的任务
      • DiscardOldestPolicy:抛弃队列头的任务
      • DiscardPolicy:直接抛弃

    线程池的上面的几个参数决定了线程创建策略以及任务执行过程,如下图所示为一个任务提交执行的流程:


    ThreadPoolExecutor3.png
    ThreadPoolExecutor4.png

    3.Worker
      Worker是线程池中线程的基本单位,其对线程进行了一个简单的包装,本身实现了Runnable接口。同时Worker继承自AbstractQueuedSynchronizer(AQS),实现了一份简单的非重入互斥锁。

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable{
              /** worker运行的线程 */
                final Thread thread;
                /** 初始化时提交的任务,可能为null*/
                Runnable firstTask;
                /** workder完成的任务数*/
                volatile long completedTasks;
                ...
                
                //加锁
                protected boolean tryAcquire(int unused) {
                    if (compareAndSetState(0, 1)) {
                        setExclusiveOwnerThread(Thread.currentThread());
                        return true;
                    }
                    return false;
                }
        
                //释放锁
                protected boolean tryRelease(int unused) {
                    setExclusiveOwnerThread(null);
                    setState(0);
                    return true;
                }
            }
    
    

    三、ThreadPoolExecutor重要方法

    1. execute(Runnable command)提交任务
      execute方法用于用户提交任务,线程池对任务有三种处理方式:新建一个线程执行任务、放入queue队列或者拒绝任务执行。由于execute方法执行的时候并未加锁,因此会在多个地方进行double check线程池的状态。

    • 尝试增加核心线程(step1)
    • 尝试将任务放入任务队列(step2)
    • 尝试增加非核心线程(step3)
        public void execute(Runnable command) {    
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            // 线程池中线程数小于核心线程数(step1)
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();  //如果增加线程失败,说明线程池状态发生变化,需要重新获取线程池状态
            }
        
            //线程池正常,将任务放进队列中(step2)
            if (isRunning(c) && workQueue.offer(command)) {
                //需要进行double check,防止在执行该方法时线程状态变化
                int recheck = ctl.get();
                //如果线程处于非运行状态则需要移除该任务,并调用拒绝策略拒绝任务
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //线程池中线程数为0说明在执行该方法时主线程执行了shutdown操作,需要重新启动一个线程执行队列中的任务(由于workQueue.offer(command)执行成功,因此队列中至少有一个任务)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }else if (!addWorker(command, false))   //增加非核心线程(step3)
                reject(command);    
        }
    

    2.addWorker(Runnable firstTask, boolean core) 新增线程,并处理当前任务

    • firstTask:创建线程时当前需要执行的任务
    • core:创建的线程是否是核心线程(true:核心线程,false:非核心线程)

    新增Worker主要有以下三个步骤:

    • 线程池状态检查:非RUNNING和SHUTDOWN状态下线程池拒绝创建新线程并拒绝提交任务,SHUTDOWN状态下不允许提交新任务,但在线程池中线程数为0并且任务队列不为空时才允许创建一个线程来执行任务队列中剩余的任务。
    • 线程池中线程数量检查:线程池线程数量 >= CAPACITY || 核心线程数 >= corePoolSize || 总线程数 >= maximumPoolSize 不允许创建线程;
    • 创建新线程并将其加到线程池中,同时调用start()启动线程。
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
        
                /*******************  1. 状态检查begin   *****************************/
        
                //等同于 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
                //等同于 
                // 1. rs > SHUTDOWN:不允许新增线程,不接受新任务
                // 2. rs = SHUTDOWN && firstTask != null:不允许新增线程,并拒绝新任务
                // 3. rs = SHUTDOWN && firstTask == null && workQueue.isEmpty(): 线程池关闭,任务队列中的任务为空,不再允许提交新的任务
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
        
                /*******************  状态检查end     *****************************/
                
                /*******************  2. 线程池数量检查begin     *****************************/
                for (;;) {
                    int wc = workerCountOf(c);
                    //1. 线程数 >= CAPACITY
                    //2. 线程数 >= corePoolSize时不允许创建核心线程
                    //3. 线程数  >= maximumPoolSize 不允许创建非核心线程
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
        
                    //增加线程数成功,跳出retry循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    //re-check线程池状态,不一致说明触发了线程池状态变更,需要重新验证是否需要创建新的线程
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
             /*******************  线程池数量检查end     *****************************/
        
            /*******************  3.创建并运行线程begin     *****************************/
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                        // 1. 线程池处于正常运行时,新增线程ok
                        // 2. rs = SHUTDOWN状态下,只有firstTask为null时才允许新增线程,见execute(),线程池关闭并且线程数为0
                        //但任务队列中还有未完成的任务.
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();  //启动线程
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
            /*******************  3.创建并运行线程end     *****************************/
        }
    
    Worker.run()
    

    由于Worker本身实现了Runable接口,因此主线程在addWorker中调用t.start()方法后就启动了线程。而Worker的run中又调用了ThreadPoolExecutor的runWorker方法。

    public void run() {
        runWorker(this);
    }
    

    3.ThreadPoolExecutor.runWorker(Worker worker)

    runWorker是线程池中线程运行的核心,通过一个while的loop循环来保证线程运行状态,首先会处理初始化Worker的任务,如果初始化任务为空则会从任务队列中获取任务进行执行。

    • 获取需要执行的任务,初始化任务或者从任务队列中获取的任务
    • 检查线程池状态,保证线程池能够及时中断
    • beforeExecute—>task.run()—>afterExecute
    • 退出while循环后,执行processWorkerExit方法,中断一个空闲线程
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;    //先执行初始化时的任务
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;   //主要用来判断线程是正常结束还是异常结束,true为异常结束,在processWorkerExit中用来标志是否将线程池中的线程数减一
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //判断线程池状态,中断线程
                    // 线程池状态>=STOP并且线程未被中断,需要中断线程
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //执行自定义的任务执行前操作,默认什么也不执行
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //任务执行后执行的操作,默认什么也不执行
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //线程退出后执行清理工作
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    4.getTask()

    getTask主要有两个个功能,获取任务队列中的任务并判断是否需要回收线程(返回null则说明需要回收线程)。

    • 例行检查线程池状态
    • 根据timed(是否需要回收线程)和timeOut(线程空闲时间是否超过keepAlive)等来判断是否回收线程
    • 如果需要回收线程,则利用任务队列的poll机制来设定线程空闲时间
        // 返回null则退出线程
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
        
                // Check if queue empty only if necessary.
                // 1. rs > SHUTDOWN:线程池关闭,回收该线程
                // 2. rs = SHUTDOWN && workQueue.isEmpty():任务队列中任务执行完毕,回收该线程
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
        
                int wc = workerCountOf(c);
        
                // timed表示是否需要回收该线程,如果allowCoreThreadTimeOut设置为true则无论当前线程数
                // 设置多少都需要回收,否则只有当线程池中线程数大于corePoolSize时才需要回收
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
                //需要回收线程时,则将线程数减一,并返回null,在runWorker中回收线程
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
        
                try {
        
                    //1:如果timed为true,该线程需要回收,通过将workQueue.poll的超时时间设置为
                    // keepAliveTime来保证返回的task是否为空,从而来判断该线程是否需要回收
                    // 2:timed为false,则阻塞获取workQueue,直到线程中断或者获取到任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    5.processWorkerExit
      processWorkerExit主要是在线程结束后做一些处理工作

    • 如果线程异常结束则原子递减线程池中的线程数,同时移除线程池中的线程
    • 尝试中断线程池,主要是进行例行检查
    • 检查线程池状态,并根据线程池配置或者当前线程是否是正常结束来判断是否回补线程
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // 如果线程时异常退出,需要将线程数减一,正常线程数递减是在getTask()中进行的
                decrementWorkerCount();
        
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);  //移除线程
            } finally {
                mainLock.unlock();
            }
        
            tryTerminate();     //尝试中断线程池,在执行过程中shutDown()或者shutDownNow()可能被调用
        
            int c = ctl.get();
        
            //线程处于RUNNINg和SHUTDOWN状态时,需要进一步处理线程
            if (runStateLessThan(c, STOP)) {
        
                //线程正常结束
                if (!completedAbruptly) {
                    //如果允许核心线程回收,那么min就位0,否则为核心线程数
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    //当前线程池线程数大于min则没有什么问题
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                //线程异常结束,需要回补该异常结束的线程
                //线程数为0但线程池处于SHUTDOWN状态或者非核心线程也被回收了,则需要创建一个线程执行任务队列中的任务
                addWorker(null, false);
            }
        }
    

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor源码分析

          本文链接:https://www.haomeiwen.com/subject/gnrabctx.html