美文网首页
Java并发——ThreadPoolExecutor源码解析

Java并发——ThreadPoolExecutor源码解析

作者: Walkerc | 来源:发表于2018-10-15 15:12 被阅读0次

    本文总结一下对线程池源码的学习,基于jdk 1.8

    什么是线程池

    顾名思义线程池就是一个可以提供一组可复用线程的对象。线程池内部有阻塞队列,用来存放等待执行的任务。然后内部的线程来执行这些任务,线程会不断的从阻塞队列中获取任务来执行,而不是执行完一个任务就销毁。

    线程池的作用

    在高并发场景下,如果给每个任务都去创建一个线程来执行,结果就是大量的线程创建与销毁,系统的开销将会很大,影响应用的执行效率。

    同时,线程池可以有效的限制应用程序中同一时刻运行的线程数量,避免CPU资源不足,造成阻塞。

    线程池的使用

    定义一个线程池

    ExecutorService executor = new ThreadPoolExecutor(1, 4, 20,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    

    该线程池,核心线程数为1,最大线程数为4,非核心线程空闲存活时间20s,阻塞队列是长度为10的 ArrayBlockingQueue,线程工厂和饱和拒绝策略没有定义,采用默认实现

    线程池中添加任务
    Executor接口提供了execute方法,传入Runnable接口的实现(任务),线程池将会调度执行这些任务

    for (int i = 0; i < 12; i++) {
        executor.execute(() -> 
            System.out.println(Thread.currentThread().getName()));
    }
    

    ThreadPoolExecutor源码分析

    定义线程池
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    这是线程池最终执行的构造方法,共有7个参数,分别是

    • 核心线程数
    • 最大线程数(核心线程+非核心线程)
    • 非核心线程空闲存活时间
    • 空闲存活时间单位
    • 阻塞队列
    • 线程工厂
    • 饱和拒绝策略

    在定义时前5个参数是必须传递的,后两个参数不传递表示使用默认提供
    注意看第三个参数,默认它是作用在非核心线程上的,如果希望同时作用在核心线程上,可以调用如下方法设置

    allowCoreThreadTimeOut(true);
    
    线程池的状态

    下面来看一下线程池内部的一些状态,以及工作线程数的封装

        /**
         * The main pool control state, ctl, is an atomic integer packing
         * two conceptual fields
         *   workerCount, indicating the effective number of threads
         *   runState,    indicating whether running, shutting down etc
         *
         * In order to pack them into one int, we limit workerCount to
         * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
         * billion) otherwise representable. 
         */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    节选了部分关键注释说明

    ctl是一个原子的Integer类型,包含了workerCount和runState,为了把这个两个值拼到一个int中,限制了workerCount最大为2^29 -1,大约为500多万,而不是2^31-1。

    也就是说作者把工作线程数和状态值拼接到了一个int中,这些属性含义如下

    属性 含义
    COUNT_BITS 2进制计数位数 29
    CAPACITY 线程数容量 (2^29)-1
    RUNNING 运行状态 -2^29
    SHUTDOWN 关闭状态(不接受新任务,把已有任务执行完) 0
    STOP 停止(不接受新任务,终止正在执行的) 2^29
    TIDYING 所有任务终止,工作线程数为0 2^30
    TERMINATED terminated()方法执行完成 2^29 + 2^30

    通过上表可以看到,线程池的5个状态数值是递增的
    所以只要状态是>=SHUTDOWN,就代表线程池不会再接受新的任务

    三个静态方法解释

    • ctlOf(int rs, int wc)
      线程池状态与线程数拼成一个int,高3位为状态,低29位为工作线程数
    • runStateOf(int c)
      获取线程池状态
    • workerCountOf(int c)
      获取工作线程数
    执行任务—execute
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    可以看到,Doug Lea老爷子已经将该方法的流程注释的很清晰了,我这里就通俗的描述一下:

    • 如果运行的线程数小于核心线程数,那么就新启动一个线程,并将该任务作为此线程的firstTask
    • 若线程池的核心线程数已经满了,就将任务添加到阻塞队列中,需要二次检查(因为有可能在上一次检查之后死掉,或者是进入该方法时线程池被关闭),若线程池不是运行状态,则将该任务从队列中移除,并进行拒绝处理。如果二次检查后没有工作的线程了,那么就新启动一个线程执行该任务
    • 如果阻塞队列也满了,就新启动一个非核心线程,如果失败的话,说明线程池被shutdown或者是队列容量和最大线程数都已达到上限,将此任务拒绝掉
    添加工作线程—addWorker
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取工作线程数
                int wc = workerCountOf(c);
                // 如果大于CAPACITY最大容量,或者core为true,与corePoolSize比,
                // 否则与maximumPoolSize比较,如果大于允许的线程数则返回 false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // worker + 1成功,跳出retry外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // cas操作失败,如果线程池状态改变,跳出内层循环,继续判断状态
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 拿到锁以后二次检查
                    int rs = runStateOf(ctl.get());
                    
                    // 如果在运行状态,或者是SHUTDOWN状态且firstTask为空(取queue中任务)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加到workers中
                        workers.add(w);
                        // 记录最大的worker数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 添加失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    firstTask
    addWorker方法的第一个参数是firstTask,firstTask是线程池中Worker对象的一个属性,该对象代表新启动线程的第一个任务。
    execute方法源码中可以看到,只有在新增线程时才会给firstTask赋值,如果任务被添加到queue中,将其置为null,线程会去阻塞队列中获取任务执行。

    在添加worker前,会在有必要的情况下检查阻塞队列是否为空

    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
    

    1、如果状态大于SHUTDOWN,不接受新任务,直接返回false;
    2、如果状态等于SHUTDOWN,firstTask != null,返回false,不允许新增任务;
    2、如果状态等于SHUTDOWN,firstTask == null,说明该线程会去队列中取任务执行,如果此时workQueue.isEmpty(),则返回false;

    addWorkerFailed
    添加线程失败时,会将刚创建的worker对象移除掉

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // HashSet中移除worker
            if (w != null)
                workers.remove(w);
            // 线程数减一
            decrementWorkerCount();
            // 尝试关闭线程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    
    内部线程包装对象—Worker

    Worker是ThreadPoolExecutor中的内部类,包含了执行任务的线程(节选了部分属性和方法)

    private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable {
        /** 执行任务的线程 */
        final Thread thread;
        /** 初始化运行的任务,可能为空 */
        Runnable firstTask;
        /** 每个worker完成任务的计数器 */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 无法获取锁,从而禁止 interrupt worker
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 线程工厂初始化线程
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }
    

    Worker对象通过继承AbstractQueuedSynchronizer队列同步器,来控制worker的同步状态,
    新建worker时,setState(-1) ,设置状态为 -1 使得其他线程无法获取到worker的锁,禁止interrupt该线程(只有当前状态为 0 时才有机会获得锁)

    执行任务——runWorker
    线程池真正执行任务的地方就在这里了

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // 标记线程是否是因为发生异常中断的
        boolean completedAbruptly = true;
        try {
            // 获取要执行的任务, firstTask为空就从队列中取
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子函数,可以在执行前自定义些操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子函数,任务执行完成后调用的方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 执行成功,将异常标记置为 false
            completedAbruptly = false;
        } finally {
            // 执行worker退出操作
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    线程池状态检查

    if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
    

    1、如果runState >= stop(stop状态线程池要中断正在运行的任务),且线程未被设置为中断,则interrupt线程
    2、如果runState < stop,进行二次检查(有可能在第一次获取状态后,调用了shutdownNow方法),此时线程如果有中断标记,则清除(Thread.interrupted()返回线程中断状态,并将其清除),再次查看状态,runSate >= stop 则interrupt线程

    获取任务——getTask

    private Runnable getTask() {
        // 上一次循环取task是否超时
        boolean timedOut = false; 
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 检查线程池状态
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // 线程空闲了是否需要退出
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            // 检查线程池workerCount
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    1、循环取任务,直到取到任务,或者是不需要返回任务为止;
    2、如果线程池是 > stop状态,则workerCount减1,返回null,
    如果是shutdown状态,且队列为空,则workerCount减1,返回null
    3、(wc > maximumPoolSize || (timed && timedOut))
    wc > 最大线程数 或者是 线程空闲了keepAliveTime 且 空闲需被销毁
    (wc > 1 || workQueue.isEmpty())
    wc > 1 或者 队列为空
    同时满足上述两个条件,说明该线程不需要获取任务来执行,则workerCount减1,返回null

    • timedOut代表上一次循环中,取task时候是否超时(代表了该线程空闲了keepAliveTime时间)
    • timed代表该线程空闲了是否需要销毁

    4、如果timed == true,则调用poll方法,等待keepAliveTime时间,
    否则调用take方法阻塞直到获取到任务(到达这一步,说明线程池状态为running或者是shutdown且workQueue不为空)

    worker退出——processWorkerExit
    任务执行完成后,在finally语句中执行worker的退出操作

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果线程异常退出,则workerCount减 1
        if (completedAbruptly)
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // set 中异常执行完的worker对象
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试停止线程池
        tryTerminate();
    
        int c = ctl.get();
        // 状态为running或者shutdown
        if (runStateLessThan(c, STOP)) {
            // 异常退出直接新增加一个worker
            if (!completedAbruptly) {
                // 计算最小线程数
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 当前工作线程大于min,则无需新增,直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    该方法中有个很重要的操作就是调用tryTerminate方法,尝试终止线程池
    接下来就来分析线程池的关闭操作
    tryTerminate + awaitTermination

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 1、stop状态,则往下执行
            // 2、shutdown且队列为空则往下执行,其余情况直接return
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 如果工作线程数不为空,则中断一个worker线程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            // 执行到这里,说明worker为0,且没有任务需要执行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 设置线程池状态为tidying
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 调用钩子函数,需继承在子类中实现
                        terminated();
                    } finally {
                        // 设置线程池状态为terminated
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 线程池终止完成信号通知,通知awaitTermination方法
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        // 设置的阻塞超时时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果线程池已经关闭,直接返回
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                // 阻塞,如果tryTerminate方法关闭成功的话,会唤醒这里
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    该方法中比较重要的一步操作就是中断空闲线程interruptIdleWorkers(ONLY_ONE)

    /**
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果线程未中断,且可以获取到锁,则interrupt
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果该值为true,则跳出循环
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    粗略翻译一下方法上面的注释,如果onlyOne参数被设置为true的话,该方法最多只会中断一个worker线程,为了把shutdown信号传播下去,保证线程池最终的关闭,最多就只中断一个空闲线程。
    线程阻塞的话就是阻塞在getTask方法中,这里中断一个线程后,getTask --> processWorkerExit --> tryTerminate --> interruptIdleWorkers --> getTask
    其实tryTerminate方法中,为什么要设置onlyOne为true,如果那个地方是false会是什么结果,没有思考的很明白,后续多查阅些资料实践一下。
    其实上面已经涉及到了线程池的关闭流程,下面还有两个比较重要的方法来分析一下

    关闭线程池——shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池状态设置为shutdown
            advanceRunState(SHUTDOWN);
            // 中断所有空闲的线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试关闭线程池
        tryTerminate();
    }
    

    这个地方的巧妙之处就在于最后的tryTerminate方法,因为线程池shutdown状态时,是要把剩下的任务执行完的,如果调shutdown方法的时候恰好所有线程都在执行任务,那么就无法中断。
    关闭线程池——shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 设置线程池状态为stop
            advanceRunState(STOP);
            // 中断所有worker
            interruptWorkers();
            // 取出队列中的任务并返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试关闭线程池
        tryTerminate();
        return tasks;
    }
    

    stop状态需要把所有线程中断,任务也放弃,所有shutdownNow会中断所有worker线程

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    // 此方法是内部类Worker中的方法,提到这里来便于阅读
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    

    持有worker锁时,state 为 1,未持有锁时为 0,所以这里就可以看出区别,shutdown方法是只能中断空闲的worker线程,而shutdownNow则是把所有worker线程都中断。

    线程池的基本流程就到这里了,如果有理解的不对的地方,或者需要补充的地方,还望各位小伙伴不吝赐教 ^-^ ~

    相关文章

      网友评论

          本文标题:Java并发——ThreadPoolExecutor源码解析

          本文链接:https://www.haomeiwen.com/subject/ybiswftx.html