美文网首页Java 杂谈程序员
[Java源码][并发J.U.C]---解析线程池之Thread

[Java源码][并发J.U.C]---解析线程池之Thread

作者: nicktming | 来源:发表于2018-12-27 10:00 被阅读7次

前言

本文是承接上文[Java源码][并发J.U.C]---解析线程池之ThreadPoolExecutor(1)的思路继续进行分析的, 主要会涉及到线程池的关闭等方法.

关闭线程池主要有两个方法分别为shutdown()shutdownNow(),他们的区别在于shutdown()让线程池不接受新任务并且要处理队列中的任务, 然而shutdownNow()让线程池不接受新任务也不处理队列中的任务,并且中断正在执行任务的线程.(只是名义上的中断,关于中断的话题可以参考我的另一篇博客. [并发J.U.C] 用例子理解线程中断)

辅助方法

在讨论这两个方法之前,我们先看看几个辅助方法.

interruptIdleWorkers(boolean onlyOne)

该方法代码比较容易理解, 此时也可以看到为什么需要将Worker包装成一个互斥不可重入锁了. 因为需要先获得锁然后才去中断此线程.
onlyOne: true只中断一个空闲线程,false中断所有空闲线程.
作用: 中断空闲的线程(正在等待任务的线程也就是那些没有被锁住的线程).

   private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

advanceRunState(int targetState)

作用:runState转换为给定目标,或者如果已经至少已经给定目标则将其保留.

    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

interruptWorkers()

作用: 对所有的线程池中的线程发出中断.

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

tryTerminate()

作用: 尝试去terminate线程池. 主要是通过线程池的状态判断是否需要terminate线程池, 如果条件满足,但是线程池中含有线程,则尝试对其线程发出中断, 反之更新线程池状态从TIDYINGTERMINATED.

   final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             *  从线程池的状态来进行分析
             *  1. 如果线程池处于RUNNING状态 直接return 因为不需要terminate线程池
             *  2. 如果线程池大于等于TIDYING状态(也就是TIDYING或者TERMINATION) 此时已经terminate过了所以直接返回
             *  -> 不是1或者2 就表明线程池状态只能是SHUTDOWN或者是STOP
             *  如果是STOP 此时则需要terminate线程池 所以不能直接return
             *  如果是SHUTDOWN
             *   a. 阻塞队列不为空,则不能terminate线程池,因为要处理阻塞队列的任务 所以不能直接return
             *   b. 阻塞队列为空, 则需要terminate线程池
             */
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            /**
             *  进入到这里有两种可能性
             *  1. 线程池状态为STOP
             *  2. 线程池状态为SHUTDOWN并且队列为空
             *
             *  如果workerCount大于0,则中断一个线程 进而引发的操作是
             *  1. 运行getTask()中的某个线程会返回null
             *  2. 进而该线程返回到runWorker中退出while循环
             *  3. 进而进入到processWorkerExit方法中
             *  4. processWorkerExit中会调用tryTerminate方法进而进入了一个循环效应
             *
             *  当线程池状态是SHUTDOWN或者STOP时会慢慢得到线程池的最终终止状态.
             *  因为getTask()方法中数量会减1.
             */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            /**
             *  进入到这里有两种可能性
             *  1. 线程池状态为STOP 并且workerCount为0
             *  2. 线程池状态为SHUTDOWN并且队列为空并且workerCount为0
             *
             *  在该两种情况下就可以把线程池的状态设置为TIDYING继而设置为TERMINATED
             */
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                /** 先设置成TIDYING状态 然后调用子类实现的terminated方法
                 * 最后设置成TERMINATED状态 并唤醒那些在等待线程池TERMINATED的线程 */
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

processWorkerExit(Worker w, boolean completedAbruptly)

作用: 处理因为某种原因退出的Worker w并且尝试终止线程池进而判断是否需要给线程池增加线程.

/** 只被runWorker调用 */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        /**
         * 1. 如果异常退出runWorker,那么正常工作的worker线程数量需要减少1
         * 2. 如果正常退出runWorker,表明getTask()==null,已经在getTask()返回null前减1了
         */
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        /** 做一个尝试终止线程池的操作 只是有益于早发现线程池的终止指令进而真正去终止线程池 */
        tryTerminate();
        /**
         *  因为有一个线程因为异常或者正常退出,所以检查线程池是否需要增加worker线程
         *  1. 如果线程池状态>=STOP 则肯定不需要添加worker了
         *  2. 线程池只能是RUNNING或者SHUTDOWN状态的时候来检查
         *     2.1 如果是异常退出,则直接增加一个worker进入到线程池中
         *     2.2 如果是正常退出,则判断一下是否需要增加worker到线程池中
         */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

关闭线程池

shutdown()

作用: 将线程池状态调整为SHUTDOWN,进而中断所有空闲线程并尝试终止线程池.

  public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /** 判断调用者是否有权限shutdown线程池 */
            checkShutdownAccess();
            /** 使用CAS将线程池状态设置为SHUTDOWN,此时线程池将不会接受新任务不处理队列任务 */
            advanceRunState(SHUTDOWN);
            /** 中断所有空闲线程 */
            interruptIdleWorkers();
            /** hook for ScheduledThreadPoolExecutor 暂时无实现 */
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        /** 尝试终止线程池 */
        tryTerminate();
    }

shutdownNow()

作用: 将线程池状态调整为STOP,进而中断线程池中所有线程并尝试终止线程池. 最终返回所有未处理的任务.

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /** 判断调用者是否有权限shutdown线程池 */
            checkShutdownAccess();
            /** 使用CAS将线程池状态设置为STOP,此时线程池将不会接受新任务不处理队列任务并且中断所有线程 */
            advanceRunState(STOP);
            /** 尝试中断线程池中所有线程 */
            interruptWorkers();
            /** 取出队列中所有的任务 */
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        /** 尝试终止线程池 */
        tryTerminate();
        /** 返回所有未处理的任务 */
        return tasks;
    }

参考

1. java 1.8源码

相关文章

网友评论

    本文标题:[Java源码][并发J.U.C]---解析线程池之Thread

    本文链接:https://www.haomeiwen.com/subject/licclqtx.html