线程池源码分析

作者: whoami2019 | 来源:发表于2018-08-22 10:20 被阅读2次

    最近在阅读线程池源码时,看了一些博客,也没能很快的解决我的疑问,所以以这篇文章来总结一下学习的成果,内容围绕讲清楚线程池的内部运行逻辑。

    成员变量

    // AtomicInteger对象, 提供原子操作进行Integer的使用, 适用于高并发场景,, 高3位用于保存运行状态, 低29位用于保存线程数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // 线程的位数,27位, 表示ctl的后27位用来表示线程数量
        private static final int COUNT_BITS = Integer.SIZE - 3;
        // 系统默认的线程容量就是(2^29)-1 , 大约5亿条线程
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 运行状态保存在ctl的高3位 (所有数值左移29位)
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
        
        // 一个阻塞队列,用来存储等待执行的任务
        private final BlockingQueue<Runnable> workQueue;
        // 任务集合
        private final HashSet<Worker> workers = new HashSet<Worker>();
        // 线程池的最大数量
        private int largestPoolSize;
        // 记录已完成的任务数量
        private long completedTaskCount;
        // 线程工厂,主要用来创建线程
        private volatile ThreadFactory threadFactory;
        // 表示当拒绝处理任务时的策略
        private volatile RejectedExecutionHandler handler;
        /**
         * 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:
         * 即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;但是如果调用了
         * allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0
         */
        private volatile long keepAliveTime;
        // 在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
        private volatile boolean allowCoreThreadTimeOut;
        /**
         * 核心池的大小,这个参数与后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了
         * prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程
        */
        private volatile int corePoolSize;
        // 线程池最大线程数,它表示在线程池中最多能创建多少个线程
        private volatile int maximumPoolSize;
    

    [线程池状态]:

    RUNNING: 创建线程池后,初始时状态, 接收新任务,并执行队列中的任务;

    SHUTDOWN: shutdown()方法,不接收新任务,但是执行队列中的任务;

    STOP: shutdownNow()方法, 不接收新任务,不执行队列中的任务,中断正在执行中的任务;

    TIDYING: 当线程池处于SHUTDOWN或STOP状态,并且所有的任务都已结束,线程数量为0,处于该状态的线程池即将调用terminated()方法;

    TERMINATED: terminated()方法执行完成;

    线程数量控制策略

    ThreadPoolExecutor是线程池的实现类,无论是自定义线程池,还是使用系统提供的线程池,都会使用到这个类.通过类的execute(Runnable command)方法来执行Runnable任务。

    /**
     * 将该Runnable任务加入线程池并在未来某个时刻执行
     * 该任务可能执行在一个新的线程 或 一个已存在的线程池中的线程
     * 如果该任务提交失败,可能是因为线程池已关闭,或者已达到线程池队列和线程数已满.
     * 该Runnable将交给RejectedExecutionHandler处理,抛出RejectedExecutionException
     */
    public void execute(Runnable command) {
        if (command == null){
            //如果没传入Runnable任务,则抛出空指针异常
            throw new NullPointerException();
        }
        
        int c = ctl.get();
        //当前线程数 小于 核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //直接开启新的线程,并将Runnable传入作为第一个要执行的任务,成功返回true,否则返回false
            if (addWorker(command, true)){
                return;
            }
            c = ctl.get();
        }
    
        //c < SHUTDOWN代表线程池处于RUNNING状态 + 将Runnable添加到任务队列,如果添加成功返回true失败返回false
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //成功加入队列后,再次检查是否需要添加新线程(因为已存在的线程可能在上次检查后销毁了,或者线程池在进入本方法后关闭了)
            if (! isRunning(recheck) && remove(command)){
                //如果线程池处于非RUNNING状态 并且 将该Runnable从任务队列中移除成功,则拒绝执行此任务
                //交给RejectedExecutionHandler调用rejectedExecution方法,拒绝执行此任务
                reject(command);
            }else if (workerCountOf(recheck) == 0){
                //如果线程池线程数量为0,则创建一条新线程,去执行
                addWorker(null, false);
            }   
        }else if (!addWorker(command, false))
            //如果线程池处于非RUNNING状态 或 将Runnable添加到队列失败(队列已满导致),则执行默认的拒绝策略
            reject(command);
    }
    

    整理流程如下:

    1. 如果线程池中的线程数量少于corePoolSize(核心线程数量),那么会直接
      开启一个新的核心线程来执行任务,即使此时有空闲线程存在.
    2. 如果线程池中线程数量大于等于corePoolSize(核心线程数量),那么任务会被插入到任务队列中排队,等待被执行.此时并不添加新的线程.
    3. 如果在步骤2中由于任务队列已满导致无法将新任务进行排队,这个时候有两种情况:
    • 线程数量 [未] 达到maximumPoolSize(线程池最大线程数) , 立刻启动一个非核心线程来执行任务.
    • 线程数量 [已] 达到maximumPoolSize(线程池最大线程数) , 拒绝执行此任务.ThreadPoolExecutor会通过RejectedExecutionHandler,抛出RejectExecutionException异常.

    以上就是一旦将一个Runnable任务execute()以后,执行的一系列逻辑,理解起来并不难,下面再对其中调用的一些方法做一些追查,就更方便理解其中的运行逻辑.

    新线程的创建

    /**
     * 往线程池中添加Worker对象
     * @param  firstTask 线程中第一个要执行的任务 
     * @param  core      是否为核心线程
     * @return           添加是否成功
     */
     private boolean addWorker(Runnable firstTask, boolean core) {
        //这里有两层[死循环],外循环:不停的判断线程池的状态
        retry: for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                //一系列判断条件:线程池关闭,Runnable为空,队列为空,则直接return false,代表Runnable添加失败
                if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
                    return false;
                }
    
                //内循环:不停的检查线程容量        
                for (;;) {
                    int wc = workerCountOf(c);
                    //超过线程数限制,则return false
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                        return false;
                    }
                    //★ 添加线程成功,则直接跳出两层循环,继续往下执行.
                    //注意:这里只是把线程数成功添加到了AtomicInteger记录的线程池数量中,真正的Runnable添加,在下面的代码中进行
                    if (compareAndIncrementWorkerCount(c)){
                        break retry;
                    }
                    //再次判断线程池最新状态,如果状态改变了(内循环和外循环记录的状态不符),则重新开始外层死循环
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs){
                        continue retry;
                    }
                }
            }
        
        //结束循环之后,开始真正的创建线程.
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Worker对象,并将Runnable当做参数传入
            w = new Worker(firstTask);
            //从worker对象中取出线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //拿到锁
                mainLock.lock();
                try {
                    //再次检查线程池最新状态
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        //检查准备执行Runnable的Thread的状态,如果该Thread已处于启动状态,则抛出状态异常(因为目前还没启动呢)
                        if (t.isAlive()){
                            throw new IllegalThreadStateException();
                        } 
                        //将新创建的worker,添加到worker集合
                        workers.add(w);
                        ...
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //★Thread开始启动
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //添加worker失败
            if (! workerStarted){
                addWorkerFailed(w);
            }
        }
        return workerStarted;
    }
    

    总结:

    1. 先判断线程池状态和线程池中线程的容量,如果满足线程添加的条件,则先把AtomicInteger中记录的线程数量+1.然后再进行线程添加的工作.
    2. 创建worker对象,并将Runnable作为参数传递进去,并从worker中取出Thread对象,进行一系列条件判断后.开启Thread的start()方法,线程开始运行.所以worker对象中必然包含了一个Thread和一个要被执行的Runnable.

    那么接下来继续追源码,印证下第二点的推断,看看Worker到底干了什么.

    Worker类

    //ThreadPoolExecutor的内部finial类
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
        //当前worker要执行任务所用的线程(如果创建失败,则可能是null)
        final Thread thread;
        //第一个要执行的任务(可能是null)
        Runnable firstTask;
        //当前线程执行完的任务总数
        volatile long completedTasks;
    
        //通过构造传入Runnable任务
        Worker(Runnable firstTask) {
            ...
            this.firstTask = firstTask;
            //通过ThreadFactory()创建新线程
            this.thread = getThreadFactory().newThread(this);
        }
    
        //调用外部类runWorker()方法
        public void run() {
            runWorker(this);
        }
        ...
    }
    

    worker类中的内部实现也印证了我们的推断:

    1. 每个worker,都是一条线程,同时里面包含了一个firstTask,即初始化时要被首先执行的任务.
    2. 最终执行任务的,是runWorker()方法

    线程的复用

    继续追runWorker()方法的源码

    //ThreadPoolExecutor的final类,该方法由内部类Worker的run()方法调用
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出Worker对象中的Runnable任务
        Runnable task = w.firstTask;
        boolean completedAbruptly = true;
        ...
        try {
            //★注意这个while循环,在这里实现了 [线程复用]
            while (task != null || (task = getTask()) != null) {
                //上锁
                w.lock();
                //检查Thread状态的代码
                ...
                try {
                    ...
                    try {
                        //执行Worker中的Runnable任务
                        task.run();
                    } catch (...) {
                       ...catch各种异常
                    } 
                } finally {
                    //置空任务(这样下次循环开始时,task依然为null,需要再通过getTask()取) + 记录该Worker完成任务数量 + 解锁
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //该线程已经从队列中取不到任务了,改变标记
             completedAbruptly = false;
        } finally {
            //线程移除
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    通过上面的源码,发现通过一个while循环,不断的getTask()取任务出来执行,以这种方式实现了线程的复用.

    线程复用逻辑整理如下:

    如果task不为空,则开始执行task
    如果task为空,则通过getTask()再去取任务,并赋值给task,如果取到的Runnable不为空,则执行该任务
    执行完毕后,通过while循环继续getTask()取任务
    如果getTask()取到的任务依然是空,那么整个runWorker()方法执行完毕
    上面只是从getTask()方法名和其返回值来猜测此方法的作用,下面就继续追源码,来证实和研究getTask()到底是怎么取任务的,从哪取,怎么取.

    从任务阻塞队列获取任务

    private Runnable getTask() {
        ...
        for (;;) {
            ...
            // 如果线程池已关闭 或 任务队列为空,则AtomicInteger中记录的线程数量-1,并return null,结束本方法
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取当前线程池中的总线程数
            int wc = workerCountOf(c);
            //allowCoreThreadTimeOut参数是使用者自行设置的(默认false),用来设置:是否允许核心线程有超时策略
            //条件1:核心线程超时 条件2:当前线程数 > 核心线程数,满足任何一个条件则timed标记为true 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            //超过最大线程数 或 超时 或 任务队列为空...  线程数量-1 + return null
            ...
            try {
                //根据timed标记,使用不同的方式(限时等待 or 阻塞)从BlockingQueue<Runnable> workQueue 队列中取任务
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null){
                    //如果取到了,就将Runnable返回
                    return r;
                }
                //如果没取到,则重新for循环
                ...
            }
        }
    }
    

    将以上源码中的信息整理如下:

    1. 线程池使用BlockingQueue来管理整个线程池中的Runnable任务,变量workQueue存放的都是待执行的任务
    2. BlockingQueue是个阻塞队列,BlockingQueue.take()方法如果得到的是空,则进入等待状态,直到BlockingQueue有新的对象被加入时,才可以正常将Runnable取出并返回,线程开始正常运转,正常执行Runnable任务。
    3. 若配置超时或者线程数大于corePoolSize,并且任务队列为空,则线程进入限时阻塞,当超过限时时间还没有任务进来,则线程的生命周期结束,若有任务队列不为空,则取出任务处理
    4. 通过源码可知,线程阻塞机制是通过await(), signal()方法实现的

    总结:

    1. 一旦一个线程开启之后,会一直执行下去,直至任务队列中的任务执行完毕, 并且达到超时时间或者是线程数大于核心线程数时,这达成了线程的复用
    2. 以Runnable队列为目标的worker虽然是串行操作,但是由于可以通过addWorker()添加多个worker,并且多个worker取的是同一个BlockingQueue中的Runnable,所以就实现了并行处理, 还有重要的是,当任务阻塞对列为空时,线程可以阻塞等待新任务。所以说同一时间可以有多个线程去处理任务,这就构成了一个线程池。

    线程的移除

    在runWorker()方法中有如下代码:

    final void runWorker(Worker w) {
        boolean completedAbruptly = true;
        ...
        try {
            while (getTask()...) {
                ...
                处理任务
            }
            //该线程已经从队列中取不到任务了,改变标记,该标记表示:该线程是否因用户因素导致的异常而终止
             completedAbruptly = false;
        } finally {
            //线程移除
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    processWorkerExit这里用来将worker从worker集合中移除,步骤如下:

    1. 先移除传入的Worker(线程)
    2. 判断线程池里的最少线程数,如果最少线程数为0条,但是队列里依然有任 务未执行完毕.那么必须确保线程池中至少有1条线程.(将最小线程数置为1)
    3. 如果当前线程数 > 最小线程数,本方法结束,不再往下执行
    4. 否则添加一条新线程,来替代当前线程,继续去执行队列中的任务.
    /**
     * @param w the worker 线程
     * @param completedAbruptly 该线程是否因用户因素导致的异常而终止
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        ...
        try {
            //记录该线程完成任务的总数
            completedTaskCount += w.completedTasks;
            //从worker集合中移除本worker(线程)
            workers.remove(w);
        }
        ...
        //如果在runWoker()中正常执行任务完毕,这里completedAbruptly传入的就是false
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果线程池里最少线程数为0,但是此时任务队列里依然还有任务
            if (min == 0 && ! workQueue.isEmpty()){
                //那么必须保留一条线程,所以将最小值设置为1
                min = 1;
            }
            //如果当前线程数>= 最小线程数,则直接return
            if (workerCountOf(c) >= min){
                return; 
            }
        }
        //否则添加一条新线程,来替代当前线程,继续去执行队列中的任务.
        addWorker(null, false);
    }
    

    相关文章

      网友评论

        本文标题:线程池源码分析

        本文链接:https://www.haomeiwen.com/subject/gfszbftx.html