美文网首页
ThreadPoolExecutor 源码阅读

ThreadPoolExecutor 源码阅读

作者: 六位的象牙塔 | 来源:发表于2021-03-02 22:01 被阅读0次

    提前准备:
    Thread 类源码阅读
    FutureTask 源码阅读

    image.png

    类结构如上图所示
    先看接口 Executor

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         * 在将来的某个时间执行给定的命令。
         * 根据 Executor 实现的判断,命令可以在新线程,池线程或调用线程中执行。
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    然后看接口 ExecutorService

    /**
     * An {@link Executor} that provides methods to manage termination and
     * methods that can produce a {@link Future} for tracking progress of
     * one or more asynchronous tasks.
     * 一种 Executor,提供管理终止的方法以及可以产生 Future 来跟踪一个或多个异步任务的进度的方法。
     * <p>An {@code ExecutorService} can be shut down, which will cause
     * it to reject new tasks.  Two different methods are provided for
     * shutting down an {@code ExecutorService}. The {@link #shutdown}
     * method will allow previously submitted tasks to execute before
     * terminating, while the {@link #shutdownNow} method prevents waiting
     * tasks from starting and attempts to stop currently executing tasks.
     * Upon termination, an executor has no tasks actively executing, no
     * tasks awaiting execution, and no new tasks can be submitted.  An
     * unused {@code ExecutorService} should be shut down to allow
     * reclamation of its resources.
     * 可以关闭  ExecutorService,这将导致它拒绝新任务。
     * 提供了两种不同的方法来关闭 ExecutorService。
     * shutdown 方法将允许先前提交的任务在终止之前执行,
     * 而 shutdownNow 方法可防止等待的任务启动并尝试停止当前正在执行的任务。
     * 终止后,执行者将没有正在执行的任务,没有正在等待执行的任务,并且无法提交新任务。
     * 应该关闭未使用的 ExecutorService 以便回收其资源。
     *
     * <p>Method {@code submit} extends base method {@link
     * Executor#execute(Runnable)} by creating and returning a {@link Future}
     * that can be used to cancel execution and/or wait for completion.
     * Methods {@code invokeAny} and {@code invokeAll} perform the most
     * commonly useful forms of bulk execution, executing a collection of
     * tasks and then waiting for at least one, or all, to
     * complete. (Class {@link ExecutorCompletionService} can be used to
     * write customized variants of these methods.)
     * 方法 submit 通过创建并返回一个 Future 扩展了基本方法 Executor#execute (Runnable),
     * 可用于取消执行和等待完成。方法 invokeAny 和 invokeAll 执行最常用的批量执行形式,
     * 执行一组任务,然后等待至少一个或全部完成。 
     *
     *
     * @since 1.5
     * @author Doug Lea
     */
    public interface ExecutorService extends Executor {
    
        /**
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be accepted.
         * Invocation has no additional effect if already shut down.
         * 启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。
         * 如果已关闭,则调用不会产生任何其他影响。
         * <p>This method does not wait for previously submitted tasks to
         * complete execution.  Use {@link #awaitTermination awaitTermination}
         * to do that.
         *
         * @throws SecurityException if a security manager exists and
         *         shutting down this ExecutorService may manipulate
         *         threads that the caller is not permitted to modify
         *         because it does not hold {@link
         *         java.lang.RuntimePermission}{@code ("modifyThread")},
         *         or the security manager's {@code checkAccess} method
         *         denies access.
         */
        void shutdown();
    
        /**
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution.
         * 尝试停止所有正在执行的任务,暂停正在等待的任务的处理,
         * 并返回正在等待执行的任务的列表。
         * 
         * <p>This method does not wait for actively executing tasks to
         * terminate.  Use {@link #awaitTermination awaitTermination} to
         * do that.
         *
         * <p>There are no guarantees beyond best-effort attempts to stop
         * processing actively executing tasks.  For example, typical
         * implementations will cancel via {@link Thread#interrupt}, so any
         * task that fails to respond to interrupts may never terminate.
         *
         * @return list of tasks that never commenced execution
         * @throws SecurityException if a security manager exists and
         *         shutting down this ExecutorService may manipulate
         *         threads that the caller is not permitted to modify
         *         because it does not hold {@link
         *         java.lang.RuntimePermission}{@code ("modifyThread")},
         *         or the security manager's {@code checkAccess} method
         *         denies access.
         */
        List<Runnable> shutdownNow();
    
        /**
         * Returns {@code true} if this executor has been shut down.
         * executor 是否已经被关闭
         *
         * @return {@code true} if this executor has been shut down
         */
        boolean isShutdown();
    
        /**
         * Returns {@code true} if all tasks have completed following shut down.
         * Note that {@code isTerminated} is never {@code true} unless
         * either {@code shutdown} or {@code shutdownNow} was called first.
         * 如果所有任务在关闭后都已完成,则返回 true。
         * 注意: isTerminated 永远不是true,除非 shutdown 或者 shutdownNow 被调用
         *
         * @return {@code true} if all tasks have completed following shut down
         */
        boolean isTerminated();
    
        /**
         * Blocks until all tasks have completed execution after a shutdown
         * request, or the timeout occurs, or the current thread is
         * interrupted, whichever happens first.
         * 如果等待指定时间所有任务在关闭后都已完成,则返回 true。
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return {@code true} if this executor terminated and
         *         {@code false} if the timeout elapsed before termination
         * @throws InterruptedException if interrupted while waiting
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Submits a value-returning task for execution and returns a
         * Future representing the pending results of the task. The
         * Future's {@code get} method will return the task's result upon
         * successful completion.
         * 提交要执行 Callable 任务,并返回表示任务的未决结果的Future。
         * Future的 get 方法将在成功完成后返回任务的结果。
         *
         * @param task the task to submit
         * @param <T> the type of the task's result
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Callable<T> task);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return the given result upon successful completion.
         * 提交要执行的 Runnable 任务,并返回表示任务的未决结果的Future。
         * Future的 get 方法将在成功完成后返回任务的结果。
         * @param task the task to submit
         * @param result the result to return
         * @param <T> the type of the result
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        <T> Future<T> submit(Runnable task, T result);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return {@code null} upon <em>successful</em> completion.
         * 提交要执行的 Runnable 任务,并返回表示任务的未决结果的Future。
         * Future的 get 方法将在成功完成后返回任务的结果。
         * @param task the task to submit
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        Future<?> submit(Runnable task);
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results when all complete.
         * {@link Future#isDone} is {@code true} for each
         * element of the returned list.
         * 执行给定的任务,返回Future集合。返回集合的每个元素isDone返回这为true
         * Note that a <em>completed</em> task could have
         * terminated either normally or by throwing an exception.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @param <T> the type of the values returned from the tasks
         * @return a list of Futures representing the tasks, in the same
         *         sequential order as produced by the iterator for the
         *         given task list, each of which has completed
         * @throws InterruptedException if interrupted while waiting, in
         *         which case unfinished tasks are cancelled
         * @throws NullPointerException if tasks or any of its elements are {@code null}
         * @throws RejectedExecutionException if any task cannot be
         *         scheduled for execution
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results
         * 执行给定的任务,返回Future集合
         * when all complete or the timeout expires, whichever happens first.
         * {@link Future#isDone} is {@code true} for each
         * element of the returned list.
         * 当全部完成或超时到期时,以先发生的为准。对于返回列表的每个元素,Future#isDone}为true。
         * Upon return, tasks that have not completed are cancelled.
         * Note that a <em>completed</em> task could have
         * terminated either normally or by throwing an exception.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         *
         * @param tasks the collection of tasks
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @param <T> the type of the values returned from the tasks
         * @return a list of Futures representing the tasks, in the same
         *         sequential order as produced by the iterator for the
         *         given task list. If the operation did not time out,
         *         each task will have completed. If it did time out, some
         *         of these tasks will not have completed.
         * @throws InterruptedException if interrupted while waiting, in
         *         which case unfinished tasks are cancelled
         * @throws NullPointerException if tasks, any of its elements, or
         *         unit are {@code null}
         * @throws RejectedExecutionException if any task cannot be scheduled
         *         for execution
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do. Upon normal or exceptional return,
         * tasks that have not completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         * 执行给定的任务,如果成功,则返回成功完成的一个任务(即不引发异常)的结果。
         * 在正常或异常返回时,尚未完成的任务将被取消。
         * 如果在进行此操作时修改了给定的集合,则此方法的结果不确定。
         * @param tasks the collection of tasks
         * @param <T> the type of the values returned from the tasks
         * @return the result returned by one of the tasks
         * @throws InterruptedException if interrupted while waiting
         * @throws NullPointerException if tasks or any element task
         *         subject to execution is {@code null}
         * @throws IllegalArgumentException if tasks is empty
         * @throws ExecutionException if no task successfully completes
         * @throws RejectedExecutionException if tasks cannot be scheduled
         *         for execution
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do before the given timeout elapses.
         * Upon normal or exceptional return, tasks that have not
         * completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         * 执行给定的任务,如果成功,则返回在超时时间之前成功完成的一个任务(即不引发异常)的结果。
         * 在正常或异常返回时,尚未完成的任务将被取消。
         * 如果在进行此操作时修改了给定的集合,则此方法的结果不确定。
         * @param tasks the collection of tasks
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @param <T> the type of the values returned from the tasks
         * @return the result returned by one of the tasks
         * @throws InterruptedException if interrupted while waiting
         * @throws NullPointerException if tasks, or unit, or any element
         *         task subject to execution is {@code null}
         * @throws TimeoutException if the given timeout elapses before
         *         any task successfully completes
         * @throws ExecutionException if no task successfully completes
         * @throws RejectedExecutionException if tasks cannot be scheduled
         *         for execution
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    再看 AbstractExecutorService

    public abstract class AbstractExecutorService implements ExecutorService {
    
        /**
         * Returns a {@code RunnableFuture} for the given runnable and default
         * value.
         * 为提供的 runable 返回 一个RunnableFuture 结果和默认值
         *
         * @param runnable the runnable task being wrapped
         * @param value the default value for the returned future
         * @param <T> the type of the given value
         * @return a {@code RunnableFuture} which, when run, will run the
         * underlying runnable and which, as a {@code Future}, will yield
         * the given value as its result and provide for cancellation of
         * the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        /**
         * Returns a {@code RunnableFuture} for the given callable task.
         * 为callable任务返回一个RunnableFuture结果
         *
         * @param callable the callable task being wrapped
         * @param <T> the type of the callable's result
         * @return a {@code RunnableFuture} which, when run, will call the
         * underlying callable and which, as a {@code Future}, will yield
         * the callable's result as its result and provide for
         * cancellation of the underlying task
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        /**
         * 将 Runnable 接口包装成RunnableFuture,执行任务并返回RunnableFuture结果
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        /**将 Runnable 接口包装成RunnableFuture,执行任务并返回RunnableFuture结果, 带一个默认返回值
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        /**将 Callable接口包装成RunnableFuture,执行任务并返回RunnableFuture结果
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        /** 
         * the main mechanics of invokeAny.
         * invokeAny 调用的主要机制
         */
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                  boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            // ExecutorCompletionService 可以用于管理已完成的 task
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
            // 为了提高效率(尤其是在并行性有限的执行器中),在提交更多任务之前检查是否完成了先前提交的任务。这种交织加上异常机制解决了主循环的混乱问题
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                // 用于记录异常
                ExecutionException ee = null;
                // 超时 deadline
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                // 确定开始一个task, 剩余的减一
                // 
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    // 完成队列 poll 一个元素
                    Future<T> f = ecs.poll();
                    // 没有已完成的任务
                    if (f == null) {
                        // 还有剩余任务,剩余任务减一,异步结果放入 futures, 运行中的任务加一
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        }
                        // 运行中任务为0, 结束循环
                        else if (active == 0)
                            break;
                        // 设置了超时,超时方式获取一个完成的任务,还没有完成,抛出异常
                        else if (timed) {
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        }
                        // 否则,阻塞方式获取完成的任务
                        else
                            f = ecs.take();
                    }
                    // 重新判断获取到的已完成的任务是否为 null,  不为null 则活跃任务减一, 返回当前完成任务的结果
                    // 如果执行发生异常,需要记录异常
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
                // 抛出异常
                if (ee == null)
                    ee = new ExecutionException();
                throw ee;
    
            } finally {
                // 取消所有任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
    
        // 执行所有任务
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                // 所有任务 包装成RunnableFuture,放入futures集合,并调用 execute 方法
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                // 遍历 RunnableFuture,如果已完成,执行get方法
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        try {
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                // 标记完成,返回结果
                done = true;
                return futures;
            } finally {
                // 如果没有正常完成,将所有task cancel
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
    
        // 执行所有任务, 设置超时时间
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                // 遍历所有任务 包装成RunnableFuture,放入futures集合
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
                // 执行任务的deadline
                final long deadline = System.nanoTime() + nanos;
                final int size = futures.size();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                // 遍历 futures 集合,调用 execute 方法,并判断是否超时,如果超时,直接返回
                for (int i = 0; i < size; i++) {
                    execute((Runnable)futures.get(i));
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
                
                // 遍历 futures 集合,如果 RunnableFuture  已结束
                // 如果超时,返回futures, 如果未超时,执行 RunnableFuture的get方法, 如果get异常 直接返回
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        if (nanos <= 0L)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime();
                    }
                }
                done = true;
                return futures;
            } finally {
                // // 如果没有正常完成,将所有task cancel
                if (!done)
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);
            }
        }
    
    }
    

    最后看 ThreadPoolExecutor。
    ThreadPoolExecutor 代码较多,先看下 idea 截图,有四个内部类和很多函数,一步一步来,先看四个内部类。


    image.png

    Worker

    Worker 继承了 AbstractQueuedSynchronizer,实现简单独占锁。
    Worker 实现了 Runable 接口,是一个线程,用来执行主循环。

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     * Class Worker主要维护线程运行任务的中断控制状态,以及其他次要簿记。
     * 此类适时地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁定。
     * 这可以防止旨在唤醒工作线程等待任务的中断,而不是中断正在运行的任务。
     * 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。
     * 此外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         * 此类永远不会序列化,但是我们提供了serialVersionUID来禁止Javac警告。
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        // worker程序正在其中运行的线程。如果工厂失败,则为null。
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 要运行的初始任务。可能为空
        Runnable firstTask;
        /** Per-thread task counter */
        // 运行完成的任务计数
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         * 构造,参数为第一个执行的任务
         * 占有锁,禁止中断,直到runWorker方法执行,
         * 分别赋值 firstTask 和 thread
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        // 发起主循环,runWorker 在外部类 ThreadPoolExecutor 中,后文再介绍
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        // 资源状态state==0: 锁可用
        // 资源状态state==0: 锁已可用
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        // 独占模式获取锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        // 独占模式释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        // 如果已运行,中断
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    CallerRunsPolicy

    CallerRunsPolicy 实现接口 RejectedExecutionHandler,是ThreadPoolExecutor 无法执行的任务的处理程序。CallerRunsPolicy 的策略是,如果 executor 拒绝了任务,则有调用线程执行被拒绝的任务,即任务不丢失,除非 executor 被关闭。

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     * CallerRunsPolicy 在任务被拒绝添加后,会在调用 execute 方法的的线程来执行被拒绝的任务。
     * 除非 executor 被关闭,否则任务不会被丢弃。
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }
    
        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *  调用线程执行任务,除非 executor 被关闭,否则任务不会被丢弃。
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    /**
     * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
     * ThreadPoolExecutor 无法执行的任务的处理程序。
     * @since 1.5
     * @author Doug Lea
     */
    public interface RejectedExecutionHandler {
    
        /**
         * Method that may be invoked by a {@link ThreadPoolExecutor} when
         * {@link ThreadPoolExecutor#execute execute} cannot accept a
         * task.  This may occur when no more threads or queue slots are
         * available because their bounds would be exceeded, or upon
         * shutdown of the Executor.
         * ThreadPoolExecutor#execute无法接受任务时,ThreadPoolExecutor 可以调用此方法。
         * 当没有更多的线程或队列(因为超出其范围)可用时,或者在执行器关闭时,可能会发生这种情况。
         *
         * <p>In the absence of other alternatives, the method may throw
         * an unchecked {@link RejectedExecutionException}, which will be
         * propagated to the caller of {@code execute}.
         * 在没有其他选择的情况下,该方法可能会引发未经检查的 RejectedExecutionException,该异常将传播到{@code execute}的调用方。
         *
         * @param r the runnable task requested to be executed
         * @param executor the executor attempting to execute this task
         * @throws RejectedExecutionException if there is no remedy
         */
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    AbortPolicy

    同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。AbortPolicy 的策略是,如果 executor 拒绝了任务,则抛出异常。

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     * 被拒任务处理器,会抛出 RejectedExecutionException 异常
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
    
        /**
         * Always throws RejectedExecutionException.
         * 总是抛出 RejectedExecutionException 异常
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    

    DiscardPolicy

    同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。DiscardPolicy 的策略是,如果 executor 拒绝了任务,则直接抛弃。

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     *  被拒任务处理器,丢弃任务
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
    
        /**
         * Does nothing, which has the effect of discarding task r.
         * 什么都不做,结果是丢弃任务
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    

    DiscardOldestPolicy

    同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。DiscardOldestPolicy 的策略是,如果 executor 拒绝了任务,则直接抛弃等待队列中最久的任务,并重新调用 executor 方法。

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     * 被拒任务处理器,丢弃最旧任务,并重试execute方法
     * 除非 executor 已关闭,否则不丢失
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
    
        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         * 获取并忽略执行者将立即执行的下一个任务(如果有一个任务立即可用),
         * 然后重试任务r的执行,除非 executor 被关闭,否则不会放弃任务r。
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    

    ThreadPoolExecutor 成员变量介绍

    // 成员变量 ctl 是原子变量AtomicInteger,用来记录线程池状态 和 线程池中线程个数,使用一个变量存放两种信息。
    // 用来标记线程池状态(高3位),线程个数(低29位), 假设 Integer 类型是32位
    // 默认是 RUNNING 状态,线程个数为 0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 线程最大个数,2的29次方 - 1 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 线程池的运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;  // -536870912
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 0
    private static final int STOP       =  1 << COUNT_BITS;  // 536870912
    private static final int TIDYING    =  2 << COUNT_BITS;  // 1073741824
    private static final int TERMINATED =  3 << COUNT_BITS;  // 1610612736
    
    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     * 队列用于放置任务并投递给worker线程。
     * 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),
     * 因此仅依靠isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN过渡到TIDYING时必须执行此操作) 。
     * 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,即使该延迟稍后在延迟到期时也可以返回non-null。
     */
    private final BlockingQueue<Runnable> workQueue;
    
    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     * 持有访问 worker 集合的权限并且和笔记有关。
     * 虽然我们可以使用某种并发集,但事实证明,使用锁通常是更可取的。
     * 其中一个原因是,这会序列化interruptIdleWorkers,从而避免了不必要的中断风暴,尤其是在shutdown期间。
     * 否则,退出线程将同时中断那些尚未中断的线程。它还简化了一些相关的统计数据,如largePoolSize等。我们还在shutdown和shutdownNow上保留mainLock,
     * 以确保在单独检查中断和实际中断的权限时,worker 集合是稳定的。
     * 
     */
    private final ReentrantLock mainLock = new ReentrantLock();
    
    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     * 包含所有池中的所有 worker 线程,只要获得 mainLock 才可访问
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    
    /**
     * Wait condition to support awaitTermination
     * 等待condition以支持awaitTermination
     */
    private final Condition termination = mainLock.newCondition();
    
    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     * 跟踪达到的最大池大小。仅在mainLock下访问
     */
    private int largestPoolSize;
    
    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     * 已完成任务计数器,
     */
    private long completedTaskCount;
    
    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * 线程工厂。所有 的线程都是用此工厂常见(通过方法 addWorker)。
     * 所有的caller 必须为 addWorker 失败做好准备(这可以反应系统或用户的线程线程数目的策略)。
     * 即使不被当作error对待,创建线程失败也可能导致新任务被拒绝或者旧任务阻塞在队列
     *
     * 
     */
    private volatile ThreadFactory threadFactory;
    
    /**
     * Handler called when saturated or shutdown in execute.
     * 拒绝任务处理器,在 execute 饱和或者关闭的情况下调用
     */
    private volatile RejectedExecutionHandler handler;
    
    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     * 空闲线程等待工作的超时时间,单位纳秒。
     * 如果当前线程数目小于 corePoolSize或者设置了allowCoreThreadTimeOut=true,使用此超时时间
     * 
     */
    private volatile long keepAliveTime;
    
    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     * 默认是false, 即核心线程就是处于空闲状态也会保持alive
     * 如果设置为true, 核心线程使用 keepAliveTime 作为超时时间等待工作,超时则销毁
     */
     
    private volatile boolean allowCoreThreadTimeOut;
    
    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     * 核心线程池大小是最小保留存活的线程数目(不允许超时),除非allowCoreThreadTimeOut设置为true,这种情况下,最小存活线程数是0
     */
    private volatile int corePoolSize;
    
    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     * 最大线程池大小。注意实际线程池最大值受 CAPACITY 限制,即取 CAPACITY 和 maximumPoolSize的较小值
     */
    private volatile int maximumPoolSize;
    
    /**
     * The default rejected execution handler
     * 默认 拒绝任务处理器,抛出异常
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    
    /**
     * Permission required for callers of shutdown and shutdownNow.
     * We additionally require (see checkShutdownAccess) that callers
     * have permission to actually interrupt threads in the worker set
     * (as governed by Thread.interrupt, which relies on
     * ThreadGroup.checkAccess, which in turn relies on
     * SecurityManager.checkAccess). Shutdowns are attempted only if
     * these checks pass.
     *
     * All actual invocations of Thread.interrupt (see
     * interruptIdleWorkers and interruptWorkers) ignore
     * SecurityExceptions, meaning that the attempted interrupts
     * silently fail. In the case of shutdown, they should not fail
     * unless the SecurityManager has inconsistent policies, sometimes
     * allowing access to a thread and sometimes not. In such cases,
     * failure to actually interrupt threads may disable or delay full
     * termination. Other uses of interruptIdleWorkers are advisory,
     * and failure to actually interrupt will merely delay response to
     * configuration changes so is not handled exceptionally.
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
    
    /* The context to be used when executing the finalizer, or null. */
    // 执行 finalize 方法使用的上下文
    private final AccessControlContext acc;
    

    ThreadPoolExecutor 的方法阅读

    压缩和解压成员变量 ctl 的方法
    不需要解压缩 ctl 的位字段访问方法。这些取决于 bit 设计和 workerCount 永远不会为负。

    // 获取线程池状态,通过按位与操作,低29位将全部变成0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取线程池worker数量,通过按位与操作,高3位将全部变成0
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根据线程池状态和线程池worker数量,生成ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    // 线程池状态小于 s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 线程池状态大于等于 s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 线程池状态是 RUNNING 
    // RUNNING = -1, SHUTDOWN=0,STOP=1,TIDYING=2,TERMINATED=3
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    
    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     * CAS 自增 ctl 中的 workerCount
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    
    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     * CAS 自减 ctl 中的 workerCount 
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    
    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     * 减少 ctl 的workerCount字段。仅在线程突然终止时调用此方法(请参阅processWorkerExit)。其他在getTask中执行。
     * 带自旋
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    

    设置控制状态的方法

    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     * 将runState转换为给定的目标,或者如果已经大于等于给定的目标,则不用变更。
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     * 如果(关闭和池和队列为空)或(停止和池为空),则转换为TERMINATED状态。
     * 如果可以终止,但workerCount不为零,则中断一个空闲的worker,以确保关闭信号传播。
     * 必须在可能终止操作的任何操作之后调用此方法-worker count 或在关闭过程中从队列中删除任务。
     * 该方法是非私有的,以允许从ScheduledThreadPoolExecutor进行访问。
     */
    final void tryTerminate() {
        // 死循环
        for (;;) {
            // 如果线程池状态是 RUNNING 或 线程池状态最少是 TIDYING 或 (线程池状态是 SHUTDOWN 且 workQueue 为空), 则结束循环
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            
            // 如果 线程池线程数目不为0,即有资格 terminate,则 中断一个空闲线程,结束循环
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            // 获取锁
            // cas 更新 ctl 为状态 TIDYING, 线程数 0,然后执行 terminated 方法并更新 ctl 为 TERMINATED,线程数目为0,condition termination 唤起所有等待线程
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    

    控制工作线程中断的方法

    /**
     * If there is a security manager, makes sure caller has
     * permission to shut down threads in general (see shutdownPerm).
     * If this passes, additionally makes sure the caller is allowed
     * to interrupt each worker thread. This might not be true even if
     * first check passed, if the SecurityManager treats some threads
     * specially.
     * 如果有安全管理器,确保调用者通常具有关闭线程的权限。
     * 如果通过,则另外确保允许调用方中断每个工作线程。
     * 如果Security Manager特别对待某些线程,即使通过了第一次检查也可能不正确。
     */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }
    
    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     * 中断所有线程,即使线程是 active 状态。忽略异常 SecurityExceptions 
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     * 中断可能正在等待任务的线程(如未锁定所示),以便它们可以检查终止或配置更改。
     * 忽略SecurityExceptions(在这种情况下,某些线程可能保持不中断)。
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍历 Worker 线程集合,如果线程没有中断,且worker获取锁成功,则中断线程
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果传参 onlyOne == true, 即最多中断一个线程,结束循环
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     * 中断所有空闲线程
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    private static final boolean ONLY_ONE = true;
    

    其他实用工具方法

    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     * 拒绝任务处理程序
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    
    /**
     * Performs any further cleanup following run state transition on
     * invocation of shutdown.  A no-op here, but used by
     * ScheduledThreadPoolExecutor to cancel delayed tasks.
     * 执行更多清理动作,shutdown执行后
     */
    void onShutdown() {
    }
    
    /**
     * State check needed by ScheduledThreadPoolExecutor to
     * enable running tasks during shutdown.
     * 判断状态是 RUNNING 或者 SHUTDOWN
     * shutdownOK: 当状态为 SHUTDOWN 时,是否需要返回true
     *
     * @param shutdownOK true if should return true if SHUTDOWN
     */
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }
    
    /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one. 
     * 通常使用drainTo将任务队列放到新列表中. 
     * 但是,如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,则将其逐个删除。
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
    

    创建 worker

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 检查是否可以针对当前池状态和给定的边界(核心线程或最大值线程)添加新的worker。
     * 如果可以,将相应地调整worker count,并且如果可能,将创建并启动一个新的worker,并运行firstTask作为其第一个任务。
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     * 参数 firstTask 表示新线程(woker)第一个执行的任务(如果 firstTask 为 null,啥也不干)。
     * 当线程数少于corePoolSize线程(在这种情况下,我们总是启动一个线程),
     * 或者队列已满(在这种情况下,我们必须绕过队列),使用初始的第一个任务(在execute()方法中)创建工作程序以绕过队列。 
     * 最初,空闲线程通常是通过prestartCoreThread创建的,或者用于替换其他垂死的工作线程
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * 参数 core: 如果 core 为true,表示使用 corePoolSize 作为边界约束,否则使用 maximumPoolSize 作为边界约束。
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // goto 标签, 配合 break,continue 使用
        retry:
        
        // 第一层循环
        for (;;) {
            // 获取运行状态
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            // 如果运行状态大于等于 SHUTDOWN 且 非(运行状态为 SHUTDOWN 且 firstTask 为 null 且 workQueue 不为空),返回false,结束
            if (rs >= SHUTDOWN 
                &&
                !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
    
            // 第二层循环
            for (;;) {
                // 获取线程池线程数目
                int wc = workerCountOf(c);
                // 如果线程池线程数目超过边界约束,
                // 即大于等于CAPACITY 或者 大于等于 corePoolSize(core==true) 或 大于等于maximumPoolSize(core == false),返回false, 结束
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程数量加一,并跳出第一层循环,结束循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新获取 ctl
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态变化了,跳到第一层循环,继续第一层循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建新的 worker
            w = new Worker(firstTask);
            // 线程 t 是新worker 的线程
            final Thread t = w.thread;
            if (t != null) {
                // 获取mainLock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 如果ThreadFactory失败或在获得锁之前关闭,回退。
                    int rs = runStateOf(ctl.get());
    
                    // 如果 rs 小于 SHUTDOWN(RUNNING) 或 (rs是SHUTDOWN 且 firstTask == null)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 重新检查线程状态,确认可开始
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 新 worker 加入队列,并更新最大线程池大小,标记新 worker 添加成功
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果新worker已添加,开始线程,并标记新worker以启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 新worker启动失败,进入 addWorkerFailed
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    /**
     * Rolls back the worker thread creation.
    *  
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     * 新worker创建失败回滚
     * 删除新worker
     * worker count 减一
     * 重新检查终止动作,防止这个worker被终止动作持有
     * 
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    

    清理worker 线程

    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     * 为垂死的worker进行清理和簿记。
     * 仅由 worker 线程调用。除非设置了completedAbruptly,否则假定已对workerCount进行调整以解决退出。
     * 如果由于用户任务异常而退出线程,或者正在运行的线程少于corePoolSize或队列为非空但没有工作者,
     * 则此方法从工作者集中删除线程,并可能终止该池或替换该工作者。
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        tryTerminate();
    
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    从阻塞队列获取任务 getTask

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     * 根据当前配置设置执行阻塞或定时等待任务,或者如果此worker由于以下任何原因而必须退出,则返回null
     *  1.超过了maximumPoolSize的worker
     *  2.线程池已停止
     *  3.线程池已经shutdown且队列为空
     *  4.worker 等待任务超时,超时的worker已经结束
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 循环
        for (;;) {
            // 获取 ctl 和 线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 
            // Check if queue empty only if necessary.
            // 如果运行状态 大于等于 SHUTDOWN && (运行状态大于 STOP 或 队列为空),则调用decrementWorkerCount,直接返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            // 获取 worker count
            int wc = workerCountOf(c);
    
            // Are workers subject to culling?
            // 是否允许 worker超时判断条件: allowCoreThreadTimeOut 或则 当前线程数目大于corePoolSize
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            //  (线程池大小 大于 maximumPoolSize 或者 (允许超时且已超时) )
            // 且 (线程池大小 大于 1 或者 workQueue 为空)
            // 满足以上条件,即无法获取到任务,如果cas 更新线程数量成功,则返回null,否则继续循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            // 如果允许超时,使用poll(long, TimeUnit)获取任务
            // 否则,使用take方法获取
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果获取到的任务非空,直接返回任务
                // 否则设置timedOut=true,继续循环
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    worker 主循环runWorker

    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     * 主worker 执行循环,从阻塞队列获取task并执行,期间应付一下几种问题:
     * 1. 我们可能从一个初始任务开始,这种情况下,不需要获取第一个。另外,只要线程池的状态是RUNNING,就一直使用getTask获取任务。
     *     如果返回null,则worker将由于更改的线程池状态或配置参数而退出。
     * 2. 在运行任何任务之前,先获取锁,以防止任务执行时其他线程池中断,然后确保(除非池正在停止)此线程不会设置其中断。
     * 3. 每个任务运行之前,都会调用beforeExecute,可能会抛出异常,这种情况下,我们让线程不执行任务就死亡(当completedAbruptly为true时中断循环)
     * 4. 假设beforeExecute正常完成,我们将运行任务,并收集所有引发的异常以发送给afterExecute。
     *     我们区别处理RuntimeException,Error(规范保证我们可以捕获它们)和任意Throwables。 
     *     因为我们无法在Throwable.run中抛出Throwable,所以我们将它们包装在Errors中(输出到线程的UncaughtExceptionHandler)。 任何抛出的异常也会保守地导致线程死亡。
     * 5. task.run完成后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。 根据JLS Sec 14.20,此异常是即使task.run抛出也会生效的异常。
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    公共的构造函数

    // 默认线程工厂和默认添加任务失败异常处理程序
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    // 默认添加任务失败异常处理程序
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    // 默认线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     * 给定参数构造方法
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    实现Executor接口方法execute

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * 在一段时间后,执行给定任务。任务可能由新线程执行或者线程池中已存在线程执行。
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果线程提交失败,不管因为 executor 已关闭还是因为executor容量达上限,这个任务都要立刻被RejectedExecutionHandler处理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 三步执行:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 1. 如果线程池中正在运行的线程数小于corePoolSize,创建新的线程(给定任务作为firstTask)。
         *    调用addWorker方法原子的检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下发出虚假警报。
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 2. 如果一个任务可以成功进入队列,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或该池自进入此方法后就关闭了。
         *   因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 3. 如果我们无法入队task, 就添加一个新的线程,如果失败,线程池就被关闭或者已饱和,只能拒绝任务
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    继承自ExecutorService的方法

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *  发起有序的关闭,之前提交的任务继续执行,新任务不再接受。
     * 如果已经关闭了,调用此方法没有影响
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。 从此方法返回后,这些任务将从任务队列中耗尽(删除)。
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
    
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }
    
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }   
    

    剩下都是getter setter 方法类似的,自处不在赘述

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor 源码阅读

          本文链接:https://www.haomeiwen.com/subject/xafvrqtx.html