提前准备:
Thread 类源码阅读
FutureTask 源码阅读
类结构如上图所示
先看接口 Executor
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
* 在将来的某个时间执行给定的命令。
* 根据 Executor 实现的判断,命令可以在新线程,池线程或调用线程中执行。
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
然后看接口 ExecutorService
/**
* An {@link Executor} that provides methods to manage termination and
* methods that can produce a {@link Future} for tracking progress of
* one or more asynchronous tasks.
* 一种 Executor,提供管理终止的方法以及可以产生 Future 来跟踪一个或多个异步任务的进度的方法。
* <p>An {@code ExecutorService} can be shut down, which will cause
* it to reject new tasks. Two different methods are provided for
* shutting down an {@code ExecutorService}. The {@link #shutdown}
* method will allow previously submitted tasks to execute before
* terminating, while the {@link #shutdownNow} method prevents waiting
* tasks from starting and attempts to stop currently executing tasks.
* Upon termination, an executor has no tasks actively executing, no
* tasks awaiting execution, and no new tasks can be submitted. An
* unused {@code ExecutorService} should be shut down to allow
* reclamation of its resources.
* 可以关闭 ExecutorService,这将导致它拒绝新任务。
* 提供了两种不同的方法来关闭 ExecutorService。
* shutdown 方法将允许先前提交的任务在终止之前执行,
* 而 shutdownNow 方法可防止等待的任务启动并尝试停止当前正在执行的任务。
* 终止后,执行者将没有正在执行的任务,没有正在等待执行的任务,并且无法提交新任务。
* 应该关闭未使用的 ExecutorService 以便回收其资源。
*
* <p>Method {@code submit} extends base method {@link
* Executor#execute(Runnable)} by creating and returning a {@link Future}
* that can be used to cancel execution and/or wait for completion.
* Methods {@code invokeAny} and {@code invokeAll} perform the most
* commonly useful forms of bulk execution, executing a collection of
* tasks and then waiting for at least one, or all, to
* complete. (Class {@link ExecutorCompletionService} can be used to
* write customized variants of these methods.)
* 方法 submit 通过创建并返回一个 Future 扩展了基本方法 Executor#execute (Runnable),
* 可用于取消执行和等待完成。方法 invokeAny 和 invokeAll 执行最常用的批量执行形式,
* 执行一组任务,然后等待至少一个或全部完成。
*
*
* @since 1.5
* @author Doug Lea
*/
public interface ExecutorService extends Executor {
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。
* 如果已关闭,则调用不会产生任何其他影响。
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
* 尝试停止所有正在执行的任务,暂停正在等待的任务的处理,
* 并返回正在等待执行的任务的列表。
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();
/**
* Returns {@code true} if this executor has been shut down.
* executor 是否已经被关闭
*
* @return {@code true} if this executor has been shut down
*/
boolean isShutdown();
/**
* Returns {@code true} if all tasks have completed following shut down.
* Note that {@code isTerminated} is never {@code true} unless
* either {@code shutdown} or {@code shutdownNow} was called first.
* 如果所有任务在关闭后都已完成,则返回 true。
* 注意: isTerminated 永远不是true,除非 shutdown 或者 shutdownNow 被调用
*
* @return {@code true} if all tasks have completed following shut down
*/
boolean isTerminated();
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
* 如果等待指定时间所有任务在关闭后都已完成,则返回 true。
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
* 提交要执行 Callable 任务,并返回表示任务的未决结果的Future。
* Future的 get 方法将在成功完成后返回任务的结果。
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
* 提交要执行的 Runnable 任务,并返回表示任务的未决结果的Future。
* Future的 get 方法将在成功完成后返回任务的结果。
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
* 提交要执行的 Runnable 任务,并返回表示任务的未决结果的Future。
* Future的 get 方法将在成功完成后返回任务的结果。
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* 执行给定的任务,返回Future集合。返回集合的每个元素isDone返回这为true
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list, each of which has completed
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks or any of its elements are {@code null}
* @throws RejectedExecutionException if any task cannot be
* scheduled for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* 执行给定的任务,返回Future集合
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* 当全部完成或超时到期时,以先发生的为准。对于返回列表的每个元素,Future#isDone}为true。
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
* 执行给定的任务,如果成功,则返回成功完成的一个任务(即不引发异常)的结果。
* 在正常或异常返回时,尚未完成的任务将被取消。
* 如果在进行此操作时修改了给定的集合,则此方法的结果不确定。
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks or any element task
* subject to execution is {@code null}
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
* 执行给定的任务,如果成功,则返回在超时时间之前成功完成的一个任务(即不引发异常)的结果。
* 在正常或异常返回时,尚未完成的任务将被取消。
* 如果在进行此操作时修改了给定的集合,则此方法的结果不确定。
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks, or unit, or any element
* task subject to execution is {@code null}
* @throws TimeoutException if the given timeout elapses before
* any task successfully completes
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
再看 AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
* 为提供的 runable 返回 一个RunnableFuture 结果和默认值
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* Returns a {@code RunnableFuture} for the given callable task.
* 为callable任务返回一个RunnableFuture结果
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* 将 Runnable 接口包装成RunnableFuture,执行任务并返回RunnableFuture结果
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**将 Runnable 接口包装成RunnableFuture,执行任务并返回RunnableFuture结果, 带一个默认返回值
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**将 Callable接口包装成RunnableFuture,执行任务并返回RunnableFuture结果
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* the main mechanics of invokeAny.
* invokeAny 调用的主要机制
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// ExecutorCompletionService 可以用于管理已完成的 task
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
// 为了提高效率(尤其是在并行性有限的执行器中),在提交更多任务之前检查是否完成了先前提交的任务。这种交织加上异常机制解决了主循环的混乱问题
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
// 用于记录异常
ExecutionException ee = null;
// 超时 deadline
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
// 确定开始一个task, 剩余的减一
//
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
// 完成队列 poll 一个元素
Future<T> f = ecs.poll();
// 没有已完成的任务
if (f == null) {
// 还有剩余任务,剩余任务减一,异步结果放入 futures, 运行中的任务加一
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 运行中任务为0, 结束循环
else if (active == 0)
break;
// 设置了超时,超时方式获取一个完成的任务,还没有完成,抛出异常
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 否则,阻塞方式获取完成的任务
else
f = ecs.take();
}
// 重新判断获取到的已完成的任务是否为 null, 不为null 则活跃任务减一, 返回当前完成任务的结果
// 如果执行发生异常,需要记录异常
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
// 抛出异常
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 取消所有任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 执行所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 所有任务 包装成RunnableFuture,放入futures集合,并调用 execute 方法
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 遍历 RunnableFuture,如果已完成,执行get方法
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// 标记完成,返回结果
done = true;
return futures;
} finally {
// 如果没有正常完成,将所有task cancel
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
// 执行所有任务, 设置超时时间
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 遍历所有任务 包装成RunnableFuture,放入futures集合
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
// 执行任务的deadline
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
// 遍历 futures 集合,调用 execute 方法,并判断是否超时,如果超时,直接返回
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
// 遍历 futures 集合,如果 RunnableFuture 已结束
// 如果超时,返回futures, 如果未超时,执行 RunnableFuture的get方法, 如果get异常 直接返回
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// // 如果没有正常完成,将所有task cancel
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
最后看 ThreadPoolExecutor。
ThreadPoolExecutor 代码较多,先看下 idea 截图,有四个内部类和很多函数,一步一步来,先看四个内部类。
image.png
Worker
Worker 继承了 AbstractQueuedSynchronizer,实现简单独占锁。
Worker 实现了 Runable 接口,是一个线程,用来执行主循环。
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
* Class Worker主要维护线程运行任务的中断控制状态,以及其他次要簿记。
* 此类适时地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁定。
* 这可以防止旨在唤醒工作线程等待任务的中断,而不是中断正在运行的任务。
* 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。
* 此外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
* 此类永远不会序列化,但是我们提供了serialVersionUID来禁止Javac警告。
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// worker程序正在其中运行的线程。如果工厂失败,则为null。
final Thread thread;
/** Initial task to run. Possibly null. */
// 要运行的初始任务。可能为空
Runnable firstTask;
/** Per-thread task counter */
// 运行完成的任务计数
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
* 构造,参数为第一个执行的任务
* 占有锁,禁止中断,直到runWorker方法执行,
* 分别赋值 firstTask 和 thread
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// 发起主循环,runWorker 在外部类 ThreadPoolExecutor 中,后文再介绍
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 资源状态state==0: 锁可用
// 资源状态state==0: 锁已可用
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 独占模式获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 独占模式释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 如果已运行,中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
CallerRunsPolicy
CallerRunsPolicy 实现接口 RejectedExecutionHandler,是ThreadPoolExecutor 无法执行的任务的处理程序。CallerRunsPolicy 的策略是,如果 executor 拒绝了任务,则有调用线程执行被拒绝的任务,即任务不丢失,除非 executor 被关闭。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
* CallerRunsPolicy 在任务被拒绝添加后,会在调用 execute 方法的的线程来执行被拒绝的任务。
* 除非 executor 被关闭,否则任务不会被丢弃。
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* 调用线程执行任务,除非 executor 被关闭,否则任务不会被丢弃。
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
* ThreadPoolExecutor 无法执行的任务的处理程序。
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
* ThreadPoolExecutor#execute无法接受任务时,ThreadPoolExecutor 可以调用此方法。
* 当没有更多的线程或队列(因为超出其范围)可用时,或者在执行器关闭时,可能会发生这种情况。
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
* 在没有其他选择的情况下,该方法可能会引发未经检查的 RejectedExecutionException,该异常将传播到{@code execute}的调用方。
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
AbortPolicy
同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。AbortPolicy 的策略是,如果 executor 拒绝了任务,则抛出异常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
* 被拒任务处理器,会抛出 RejectedExecutionException 异常
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* 总是抛出 RejectedExecutionException 异常
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy
同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。DiscardPolicy 的策略是,如果 executor 拒绝了任务,则直接抛弃。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
* 被拒任务处理器,丢弃任务
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* 什么都不做,结果是丢弃任务
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
同 CallerRunsPolicy 一样,也实现了接口 RejectedExecutionHandler 是ThreadPoolExecutor 无法执行的任务的处理程序。DiscardOldestPolicy 的策略是,如果 executor 拒绝了任务,则直接抛弃等待队列中最久的任务,并重新调用 executor 方法。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
* 被拒任务处理器,丢弃最旧任务,并重试execute方法
* 除非 executor 已关闭,否则不丢失
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
* 获取并忽略执行者将立即执行的下一个任务(如果有一个任务立即可用),
* 然后重试任务r的执行,除非 executor 被关闭,否则不会放弃任务r。
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor 成员变量介绍
// 成员变量 ctl 是原子变量AtomicInteger,用来记录线程池状态 和 线程池中线程个数,使用一个变量存放两种信息。
// 用来标记线程池状态(高3位),线程个数(低29位), 假设 Integer 类型是32位
// 默认是 RUNNING 状态,线程个数为 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才是线程的个数,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大个数,2的29次方 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的运行状态
private static final int RUNNING = -1 << COUNT_BITS; // -536870912
private static final int SHUTDOWN = 0 << COUNT_BITS; // 0
private static final int STOP = 1 << COUNT_BITS; // 536870912
private static final int TIDYING = 2 << COUNT_BITS; // 1073741824
private static final int TERMINATED = 3 << COUNT_BITS; // 1610612736
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
* 队列用于放置任务并投递给worker线程。
* 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),
* 因此仅依靠isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN过渡到TIDYING时必须执行此操作) 。
* 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,即使该延迟稍后在延迟到期时也可以返回non-null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
* 持有访问 worker 集合的权限并且和笔记有关。
* 虽然我们可以使用某种并发集,但事实证明,使用锁通常是更可取的。
* 其中一个原因是,这会序列化interruptIdleWorkers,从而避免了不必要的中断风暴,尤其是在shutdown期间。
* 否则,退出线程将同时中断那些尚未中断的线程。它还简化了一些相关的统计数据,如largePoolSize等。我们还在shutdown和shutdownNow上保留mainLock,
* 以确保在单独检查中断和实际中断的权限时,worker 集合是稳定的。
*
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 包含所有池中的所有 worker 线程,只要获得 mainLock 才可访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
* 等待condition以支持awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
* 跟踪达到的最大池大小。仅在mainLock下访问
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
* 已完成任务计数器,
*/
private long completedTaskCount;
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* 线程工厂。所有 的线程都是用此工厂常见(通过方法 addWorker)。
* 所有的caller 必须为 addWorker 失败做好准备(这可以反应系统或用户的线程线程数目的策略)。
* 即使不被当作error对待,创建线程失败也可能导致新任务被拒绝或者旧任务阻塞在队列
*
*
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
* 拒绝任务处理器,在 execute 饱和或者关闭的情况下调用
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
* 空闲线程等待工作的超时时间,单位纳秒。
* 如果当前线程数目小于 corePoolSize或者设置了allowCoreThreadTimeOut=true,使用此超时时间
*
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
* 默认是false, 即核心线程就是处于空闲状态也会保持alive
* 如果设置为true, 核心线程使用 keepAliveTime 作为超时时间等待工作,超时则销毁
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
* 核心线程池大小是最小保留存活的线程数目(不允许超时),除非allowCoreThreadTimeOut设置为true,这种情况下,最小存活线程数是0
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
* 最大线程池大小。注意实际线程池最大值受 CAPACITY 限制,即取 CAPACITY 和 maximumPoolSize的较小值
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
* 默认 拒绝任务处理器,抛出异常
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* Permission required for callers of shutdown and shutdownNow.
* We additionally require (see checkShutdownAccess) that callers
* have permission to actually interrupt threads in the worker set
* (as governed by Thread.interrupt, which relies on
* ThreadGroup.checkAccess, which in turn relies on
* SecurityManager.checkAccess). Shutdowns are attempted only if
* these checks pass.
*
* All actual invocations of Thread.interrupt (see
* interruptIdleWorkers and interruptWorkers) ignore
* SecurityExceptions, meaning that the attempted interrupts
* silently fail. In the case of shutdown, they should not fail
* unless the SecurityManager has inconsistent policies, sometimes
* allowing access to a thread and sometimes not. In such cases,
* failure to actually interrupt threads may disable or delay full
* termination. Other uses of interruptIdleWorkers are advisory,
* and failure to actually interrupt will merely delay response to
* configuration changes so is not handled exceptionally.
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* The context to be used when executing the finalizer, or null. */
// 执行 finalize 方法使用的上下文
private final AccessControlContext acc;
ThreadPoolExecutor 的方法阅读
压缩和解压成员变量 ctl 的方法
不需要解压缩 ctl 的位字段访问方法。这些取决于 bit 设计和 workerCount 永远不会为负。
// 获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 线程池状态小于 s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 线程池状态大于等于 s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 线程池状态是 RUNNING
// RUNNING = -1, SHUTDOWN=0,STOP=1,TIDYING=2,TERMINATED=3
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
* CAS 自增 ctl 中的 workerCount
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
* CAS 自减 ctl 中的 workerCount
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
* 减少 ctl 的workerCount字段。仅在线程突然终止时调用此方法(请参阅processWorkerExit)。其他在getTask中执行。
* 带自旋
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
设置控制状态的方法
/**
* Transitions runState to given target, or leaves it alone if
* already at least the given target.
* 将runState转换为给定的目标,或者如果已经大于等于给定的目标,则不用变更。
*
* @param targetState the desired state, either SHUTDOWN or STOP
* (but not TIDYING or TERMINATED -- use tryTerminate for that)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
* 如果(关闭和池和队列为空)或(停止和池为空),则转换为TERMINATED状态。
* 如果可以终止,但workerCount不为零,则中断一个空闲的worker,以确保关闭信号传播。
* 必须在可能终止操作的任何操作之后调用此方法-worker count 或在关闭过程中从队列中删除任务。
* 该方法是非私有的,以允许从ScheduledThreadPoolExecutor进行访问。
*/
final void tryTerminate() {
// 死循环
for (;;) {
// 如果线程池状态是 RUNNING 或 线程池状态最少是 TIDYING 或 (线程池状态是 SHUTDOWN 且 workQueue 为空), 则结束循环
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果 线程池线程数目不为0,即有资格 terminate,则 中断一个空闲线程,结束循环
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 获取锁
// cas 更新 ctl 为状态 TIDYING, 线程数 0,然后执行 terminated 方法并更新 ctl 为 TERMINATED,线程数目为0,condition termination 唤起所有等待线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
控制工作线程中断的方法
/**
* If there is a security manager, makes sure caller has
* permission to shut down threads in general (see shutdownPerm).
* If this passes, additionally makes sure the caller is allowed
* to interrupt each worker thread. This might not be true even if
* first check passed, if the SecurityManager treats some threads
* specially.
* 如果有安全管理器,确保调用者通常具有关闭线程的权限。
* 如果通过,则另外确保允许调用方中断每个工作线程。
* 如果Security Manager特别对待某些线程,即使通过了第一次检查也可能不正确。
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
* 中断所有线程,即使线程是 active 状态。忽略异常 SecurityExceptions
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
* 中断可能正在等待任务的线程(如未锁定所示),以便它们可以检查终止或配置更改。
* 忽略SecurityExceptions(在这种情况下,某些线程可能保持不中断)。
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历 Worker 线程集合,如果线程没有中断,且worker获取锁成功,则中断线程
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果传参 onlyOne == true, 即最多中断一个线程,结束循环
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
* 中断所有空闲线程
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
其他实用工具方法
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
* 拒绝任务处理程序
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* Performs any further cleanup following run state transition on
* invocation of shutdown. A no-op here, but used by
* ScheduledThreadPoolExecutor to cancel delayed tasks.
* 执行更多清理动作,shutdown执行后
*/
void onShutdown() {
}
/**
* State check needed by ScheduledThreadPoolExecutor to
* enable running tasks during shutdown.
* 判断状态是 RUNNING 或者 SHUTDOWN
* shutdownOK: 当状态为 SHUTDOWN 时,是否需要返回true
*
* @param shutdownOK true if should return true if SHUTDOWN
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
* 通常使用drainTo将任务队列放到新列表中.
* 但是,如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,则将其逐个删除。
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
创建 worker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 检查是否可以针对当前池状态和给定的边界(核心线程或最大值线程)添加新的worker。
* 如果可以,将相应地调整worker count,并且如果可能,将创建并启动一个新的worker,并运行firstTask作为其第一个任务。
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
* 参数 firstTask 表示新线程(woker)第一个执行的任务(如果 firstTask 为 null,啥也不干)。
* 当线程数少于corePoolSize线程(在这种情况下,我们总是启动一个线程),
* 或者队列已满(在这种情况下,我们必须绕过队列),使用初始的第一个任务(在execute()方法中)创建工作程序以绕过队列。
* 最初,空闲线程通常是通过prestartCoreThread创建的,或者用于替换其他垂死的工作线程
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* 参数 core: 如果 core 为true,表示使用 corePoolSize 作为边界约束,否则使用 maximumPoolSize 作为边界约束。
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// goto 标签, 配合 break,continue 使用
retry:
// 第一层循环
for (;;) {
// 获取运行状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果运行状态大于等于 SHUTDOWN 且 非(运行状态为 SHUTDOWN 且 firstTask 为 null 且 workQueue 不为空),返回false,结束
if (rs >= SHUTDOWN
&&
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
// 第二层循环
for (;;) {
// 获取线程池线程数目
int wc = workerCountOf(c);
// 如果线程池线程数目超过边界约束,
// 即大于等于CAPACITY 或者 大于等于 corePoolSize(core==true) 或 大于等于maximumPoolSize(core == false),返回false, 结束
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程数量加一,并跳出第一层循环,结束循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取 ctl
c = ctl.get(); // Re-read ctl
// 如果线程池状态变化了,跳到第一层循环,继续第一层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新的 worker
w = new Worker(firstTask);
// 线程 t 是新worker 的线程
final Thread t = w.thread;
if (t != null) {
// 获取mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 如果ThreadFactory失败或在获得锁之前关闭,回退。
int rs = runStateOf(ctl.get());
// 如果 rs 小于 SHUTDOWN(RUNNING) 或 (rs是SHUTDOWN 且 firstTask == null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 重新检查线程状态,确认可开始
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 新 worker 加入队列,并更新最大线程池大小,标记新 worker 添加成功
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果新worker已添加,开始线程,并标记新worker以启动
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 新worker启动失败,进入 addWorkerFailed
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* Rolls back the worker thread creation.
*
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
* 新worker创建失败回滚
* 删除新worker
* worker count 减一
* 重新检查终止动作,防止这个worker被终止动作持有
*
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
清理worker 线程
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 为垂死的worker进行清理和簿记。
* 仅由 worker 线程调用。除非设置了completedAbruptly,否则假定已对workerCount进行调整以解决退出。
* 如果由于用户任务异常而退出线程,或者正在运行的线程少于corePoolSize或队列为非空但没有工作者,
* 则此方法从工作者集中删除线程,并可能终止该池或替换该工作者。
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
从阻塞队列获取任务 getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
* 根据当前配置设置执行阻塞或定时等待任务,或者如果此worker由于以下任何原因而必须退出,则返回null
* 1.超过了maximumPoolSize的worker
* 2.线程池已停止
* 3.线程池已经shutdown且队列为空
* 4.worker 等待任务超时,超时的worker已经结束
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 循环
for (;;) {
// 获取 ctl 和 线程池状态
int c = ctl.get();
int rs = runStateOf(c);
//
// Check if queue empty only if necessary.
// 如果运行状态 大于等于 SHUTDOWN && (运行状态大于 STOP 或 队列为空),则调用decrementWorkerCount,直接返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取 worker count
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否允许 worker超时判断条件: allowCoreThreadTimeOut 或则 当前线程数目大于corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// (线程池大小 大于 maximumPoolSize 或者 (允许超时且已超时) )
// 且 (线程池大小 大于 1 或者 workQueue 为空)
// 满足以上条件,即无法获取到任务,如果cas 更新线程数量成功,则返回null,否则继续循环
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 如果允许超时,使用poll(long, TimeUnit)获取任务
// 否则,使用take方法获取
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果获取到的任务非空,直接返回任务
// 否则设置timedOut=true,继续循环
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
worker 主循环runWorker
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
* 主worker 执行循环,从阻塞队列获取task并执行,期间应付一下几种问题:
* 1. 我们可能从一个初始任务开始,这种情况下,不需要获取第一个。另外,只要线程池的状态是RUNNING,就一直使用getTask获取任务。
* 如果返回null,则worker将由于更改的线程池状态或配置参数而退出。
* 2. 在运行任何任务之前,先获取锁,以防止任务执行时其他线程池中断,然后确保(除非池正在停止)此线程不会设置其中断。
* 3. 每个任务运行之前,都会调用beforeExecute,可能会抛出异常,这种情况下,我们让线程不执行任务就死亡(当completedAbruptly为true时中断循环)
* 4. 假设beforeExecute正常完成,我们将运行任务,并收集所有引发的异常以发送给afterExecute。
* 我们区别处理RuntimeException,Error(规范保证我们可以捕获它们)和任意Throwables。
* 因为我们无法在Throwable.run中抛出Throwable,所以我们将它们包装在Errors中(输出到线程的UncaughtExceptionHandler)。 任何抛出的异常也会保守地导致线程死亡。
* 5. task.run完成后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。 根据JLS Sec 14.20,此异常是即使task.run抛出也会生效的异常。
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
公共的构造函数
// 默认线程工厂和默认添加任务失败异常处理程序
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 默认添加任务失败异常处理程序
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 默认线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
* 给定参数构造方法
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
实现Executor接口方法execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 在一段时间后,执行给定任务。任务可能由新线程执行或者线程池中已存在线程执行。
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* 如果线程提交失败,不管因为 executor 已关闭还是因为executor容量达上限,这个任务都要立刻被RejectedExecutionHandler处理
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 三步执行:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 1. 如果线程池中正在运行的线程数小于corePoolSize,创建新的线程(给定任务作为firstTask)。
* 调用addWorker方法原子的检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下发出虚假警报。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 2. 如果一个任务可以成功进入队列,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或该池自进入此方法后就关闭了。
* 因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 3. 如果我们无法入队task, 就添加一个新的线程,如果失败,线程池就被关闭或者已饱和,只能拒绝任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
继承自ExecutorService的方法
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 发起有序的关闭,之前提交的任务继续执行,新任务不再接受。
* 如果已经关闭了,调用此方法没有影响
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。 从此方法返回后,这些任务将从任务队列中耗尽(删除)。
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
剩下都是getter setter 方法类似的,自处不在赘述
网友评论