美文网首页
concurrency-threadpoolexecutor

concurrency-threadpoolexecutor

作者: 甜甜起司猫_ | 来源:发表于2021-08-19 15:05 被阅读0次

concurrency-threadpoolexecutor

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3 个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

ThreadPoolExecutor 执行流程

ThreadPoolExecutor 执行 execute 方法分下面 4 种情况。

  1. 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
  3. 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute() 方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute() 方法调用都是执行步骤2,而步骤2不需要获取全局锁。

ThreadPoolExecutor 源码分析

线程池生命周期和线程状态标识ctl

线程池用 ctl 的低 29 位表示线程池中的线程数,高 3 位表示当前线程状态。

高3位表示状态

  1. RUNNING:运行状态,高3位为111;
  2. SHUTDOWN:关闭状态,高3位为000,在此状态下,线程池不再接受新任务,但是仍然处理阻塞队列中的任务;
  3. STOP:停止状态,高3位为001,在此状态下,线程池不再接受新任务,也不会处理阻塞队列中的任务,正在运行的任务也会停止;
  4. TIDYING:高3位为010;
  5. TERMINATED:终止状态,高3位为011。

线程状态标识ctl

// ctl 高3位表示线程池状态,低29位表示当前工作线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;         // 低29位表示工作线程数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // 最大线程数 0x1fffffff

// 获取线程池状态、线程总数、构造 ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

全局锁

// 全局锁,创建工作线程等操作时需要获取全局锁
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

工作线程

// 工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;
private volatile int corePoolSize;

任务提交execute

// ThreadPoolExecutor 的任务提交过程
    // java.util.concurrent.ThreadPoolExecutor#execute
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        // ctl 是一个重要的控制全局状态的数据结构,定义为一个线程安全的 AtomicInteger
        // ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        int c = ctl.get();
        /**
        * workerCountOf方法取出低29位的值,表示当前活动的线程数;
        * 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中
        */
        if (workerCountOf(c) < corePoolSize) {
            /**
            * addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断;
            * 如果是ture,根据corePoolSize判断
            * 如果是false,根据maximumPoolSize判断
            */
            if (addWorker(command, true))
                return;
            /**
            * 如果添加失败,则重新获取ctl值
            */
            c = ctl.get();
        }
        /**
        * 如果线程池是Running状态,并且任务添加到队列中
        */
        if (isRunning(c) && workQueue.offer(command)) {
            //double-check,重新获取ctl的值
            int recheck = ctl.get();
            /**
            * 再次判断线程池的状态,如果不是运行状态,由于之前已经把command添加到阻塞队列中,这时候需要从队列中移除command;
            * 通过handler使用拒绝策略对该任务进行处理,整个方法返回
            */
            if (!isRunning(recheck) && remove(command))
                reject(command);
            /**
            * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法;
            * 第一个参数为null,表示在线程池中创建一个线程,但不去启动
            * 第二个参数为false,将线程池的线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断
            */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
            /**
            * 执行到这里,有两种情况:
            * 1、线程池的状态不是RUNNING;
            * 2、线程池状态是RUNNING,但是workerCount >= corePoolSize, workerQueue已满
            * 这个时候,再次调用addWorker方法,第二个参数传false,将线程池的有限线程数量的上限设置为maximumPoolSize;
            * 如果失败则执行拒绝策略;
            */
        } else if (!addWorker(command, false))
            reject(command);
    }

execute总结

通过上面这一小段代码,我们就已经完整地看到了。通过一个 ctl 变量进行全局状态控制,从而保证了线程安全性。整个框架并没有使用锁,但是却是线程安全的。

整段代码刚好完整描述了线程池的执行流程:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

工作线程worker

线程池中的每一个对象被封装成一个Worker对象,ThreadPool维护的就是一组Worker对象。Worker类继承了AQS,并实现了Runnable接口,其中包含了两个重要属性:firstTask用来保存传入的任务,thread是在调用构造方法是通过ThreadFactory来创建的线程,是用来处理任务的线程。

// Worker 是对线程 Thread 的包装,实现了 AbstractQueuedSynchronizer
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
 
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
       /**
        *  把state设置为-1,,阻止中断直到调用runWorker方法;
        *  因为AQS默认state是0,如果刚创建一个Worker对象,还没有执行任务时,这时候不应该被中断
        */
        setState(-1);
        this.firstTask = firstTask;
        /**
         * 创建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程;
         * 所以一个Worker对象在启动的时候会调用Worker类中run方法
         */
        this.thread = getThreadFactory().newThread(this);
    }
} 

Worker 为什么要继承 AbstractQueuedSynchronizer 实现自己的锁,而不使用 ReentrantLock 呢?

  1. lock方法一旦获取独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲状态,说明它没有处理任务,这时可以对该线程进行中断;
  4. 线程池中执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入的,是因为在任务调用setCorePoolSize这类线程池控制的方法时,不会中断正在运行的线程所以,Worker继承自AQS,用于判断线程是否空闲以及是否处于被中断。

创建工作线程 addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    /**
     * 由于线程执行过程中,各种情况都有可能处于,通过自旋的方式来保证worker的增加;
     */
    for (; ; ) {
        int c = ctl.get();
        //获取线程池运行状态
        int rs = runStateOf(c);

        /**
         *
         * 如果rs >= SHUTDOWN, 则表示此时不再接收新任务;
         * 接下来是三个条件 通过 && 连接,只要有一个任务不满足,就返回false;
         * 1.rs == SHUTDOWN,表示关闭状态,不再接收提交的任务,但却可以继续处理阻塞队列中已经保存的任务;
         * 2.fisrtTask为空
         * 3.Check if queue empty only if necessary.
         */
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty()))
            return false;

        for (; ; ) {
            //获取线程池的线程数
            int wc = workerCountOf(c);
            /**
             * 如果线程数 >= CAPACITY, 也就是ctl的低29位的最大值,则返回false;
             * 这里的core用来判断 限制线程数量的上限是corePoolSize还是maximumPoolSize;
             * 如果core是ture表示根据corePoolSize来比较;
             * 如果core是false表示根据maximumPoolSize来比较;
             */
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /**
             * 通过CAS原子的方式来增加线程数量;
             * 如果成功,则跳出第一个for循环;
             */
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //如果当前运行的状态不等于rs,说明线程池的状态已经改变了,则返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        //每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            //创建可重入锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取线程池的状态
                int rs = runStateOf(ctl.get());

                /**
                 * 线程池的状态小于SHUTDOWN,表示线程池处于RUNNING状态;
                 * 如果rs是RUNNING状态或rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程;
                 * 因为在SHUTDOWN状态时不会再添加新的任务,但还是处理workQueue中的任务;
                 */
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //workers是一个hashSet
                    workers.add(w);
                    int s = workers.size();
                    //largestPoolSize记录线程池中出现的最大的线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程,Worker实现了Running方法,此时会调用Worker的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
  • addWorker 前半部分主要是判断能否新建工作线程,如果允许则执行 compareAndIncrementWorkerCount(c),利用 CAS 原则,将线程数量+1。
  • addWorker 后半部分则是真正创建工作线程并启动,这个过程需要获取全局锁。创建失败则需要回滚 addWorkerFailed。

addWorker 的 4 种调用方式:

  1. addWorker(command, true) 线程数 < coreSize 时,则创建新线程
  2. addWorker(command, false) 当①阻塞队列已满,②线程数 < maximumPoolSize 时,则创建新线程
  3. addWorker(null, true) 同 1。只是线程初始化任务为 null,相当于创建一个新的线程。实际的使用是在 prestartCoreThread() 等方法。
  4. addWorker(null, false) 同 2。只是线程初始化任务为 null,相当于创建一个新的线程,没立马分配任务;

线程执行 runWorker

worker类中的runworker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允许中断
    w.unlock();
    //是否因异常退出循环
    boolean completedAbruptly = true;
    try {
        //如果task为空,则通过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**
             * 如果线程池正在停止,那么要保证当前线程时中断状态;
             * 如果不是的话,则要保证当前线程不是中断状态
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                //beforeExecute和afterExecute是留给子类来实现的
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //通过任务方式执行,不是线程方式
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //processWorkerExit会对completedAbruptly进行判断,表示在执行过程中是否出现异常
        processWorkerExit(w, completedAbruptly);
    }
}

总结 runworker

  1. while循环不断地通过getTask方法来获取任务;

  2. getTask方法从阻塞队列中获取任务;

  3. 如果线程池正在停止,那么要保证当前线程处于中断状态, 否则要保证当前线程不是中断状态;

  4. 调用task.run()执行任务;

  5. 如果task为null则会跳出循环,执行processWorkerExit方法;

  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

  7. 线程启动后,释放锁,设 AQS 状态为 0,释放锁。此时其它线程才可以获取锁,中断线程 interrupt;

  8. 获取 firstTask 任务并执行,执行任务前后可定制 beforeExecute 和 afterExecute;

  9. 如果 getTask 从阻塞队列获取等待任务执行,如果获取的任务为 null,while 则退出循环,线程关闭。

  10. 如果线程已经STOP,则一定要将线程 interrupt。如果线程处于运行状态(包括SHUTDOWN),则一定不能 interrupt。但实际上 interrupt() 方法并不一定能中断正在运行的线程,它只能唤醒 wait 阻塞的线程或给线程设置一个标记位。业务线程必须对 interrupt 做出响应才能中断线程,否则会一直等线程执行结束才会销毁。

获取任务 getTask

getTask方法用于从阻塞队列中获取任务

private Runnable getTask() {
    //timeout变量的值表示上次从阻塞队列中获取任务是否超时
    boolean timedOut = false;
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * 如果rs >= SHUTDOWN,表示线程池非RUNNING状态,需要再次判断:
         * 1、rs >= STOP ,线程池是否正在STOP
         * 2、阻塞队列是否为空
         * 满足上述条件之一,则将workCount减一,并返回null;
         * 因为如果当前线程池的状态处于STOP及以上或队列为空,不能从阻塞队列中获取任务;
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        /**
         * timed变量用于判断是否需要进行超时控制;
         * allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
         * wc > corePoolSize,表示当前线程数大于核心线程数量;
         * 对于超过核心线程数量的这些线程,需要进行超时控制;
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /**
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时;
         * 接下来判断,如果有效咸亨数量大于1,或者workQueue为空,那么将尝试workCount减1;
         * 如果减1失败,则返回重试;
         * 如果wc==1时,也就说明当前线程是线程池中的唯一线程了;
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        /**
         * timed为trure,则通过workQueue的poll方法进行超时控制,如果在keepAliveTime时间内没有获取任务,则返回null;
         * 否则通过take方法,如果队列为空,则take方法会阻塞直到队列中不为空;
         */
        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            //如果r==null,说明已经超时了,timedOut = true;
            timedOut = true;
        } catch (InterruptedException retry) {
            //如果获取任务时当前线程发生了中断,则将timedOut = false;
            timedOut = false;
        }
    }
}

getTask 总结

第二个if判断,目的是为了控制线程池的有效线程数量。有上文分析得到,在execute方法时,如果当前线程池的线程数量超过coolPoolSize且小于maxmumPoolSize,并且阻塞队列已满时,则可以通过增加工作线程。但是如果工作线程在超时时间内没有获取到任务,timeOut=true,说明workQueue为空,也就说当前线程池不需要那么多线程来执行任务了,可以把多于的corePoolSize数量的线程销毁掉,保证线程数量在corePoolSize即可。

  1. getTask 时,worker 已经释放了锁,也就是说其它线程可以调用 wt.interrupt() 唤醒等待的线程。
  2. 如果当前线程数大于最大线程数,或允许核心线程销毁时,如果获取任务超时则返回 null,即销毁线程。

processWorkerExit方法

processWorkerExit执行完之后,工作线程被销毁。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 如果completedAbruptly为true,则说明线程执行时出现异常,需要将workerCount数量减一
     * 如果completedAbruptly为false,说明在getTask方法中已经对workerCount进行减一,这里不用再减
     */
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        //从workers中移除,也就表示从线程池中移除一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //钩子函数,根据线程池的状态来判断是否结束线程池
    tryTerminate();

    int c = ctl.get();
    /**
     * 当前线程是RUNNING或SHUTDOWN时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,那么等待队列有任务,至少保留一个worker;
     * 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

线程关闭

shutdown

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        // 为保证线程安全,使用 mainLock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // SecurityManager 检查
            checkShutdownAccess();
            // 设置状态为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲的 Worker, 即相当于依次关闭每个空闲线程
            interruptIdleWorkers();
            // 关闭钩子,默认实现为空操作,为方便子类实现自定义清理功能
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 再
        tryTerminate();
    }
    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 自身CAS更新成功或者被其他线程更新成功
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    // 关闭空闲线程(非 running 状态)
    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        // 上文已介绍, 此处 ONLY_ONE 为 false, 即是最大可能地中断所有 Worker
        interruptIdleWorkers(false);
    }

shutdownNow

与 shutdown 对应的,有一个 shutdownNow, 其语义是 立即停止所有任务。

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 与 shutdown 的差别,设置的状态不一样
            advanceRunState(STOP);
            // 强行中断线程
            interruptWorkers();
            // 将未完成的任务返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                // 调用 worker 的提供的中断方法
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
        // ThreadPoolExecutor.Worker#interruptIfStarted
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 直接调用任务的 interrupt
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

shutdown 和 shutdownNow 区别:

  • shutdown 会执行完成已提交的任务后关闭线程池,而 shutdownNow 则会踢除已提交的任务。
  • shutdown 调用 interruptIdleWorkers 关闭空闲的线程,而 shutdownNow 调用 interruptWorkers 强行中断所有的线程。

interruptIdleWorkers 和 interruptWorkers

  • interruptIdleWorkers 只会尝试获取锁,因此只会中断空闲线程。而 interruptWorkers 不需要获取锁,强行中断线程。实际上业务线程必须对 interrupt 做出响应才能中断线程,否则会一直等线程执行结束才会销毁。
  • 而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 所有线程, 因此大部分线程将立刻被中断。之所以是大部分,而不是全部,是因为 interrupt() 方法能力有限。 如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,ShutdownNow() 并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。 如下面这个线程永远不会中断,因为该线程没有响应 Thread.interrupted() 或者是直接将 InterruptedException 异常 catch 了。

参考

相关文章

网友评论

      本文标题:concurrency-threadpoolexecutor

      本文链接:https://www.haomeiwen.com/subject/fvpwbltx.html