Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor
Callable 也是因为线程池的需要,所以才有了这个接口。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。
Future -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture
FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口
ThreadPoolExecutor类的必要参数
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- workQueue:任务队列
- keepAliveTime:空闲线程存活时间,线程数大于核心线程数时起作用,也可以设置allowCoreThreadTimeOut(true)对核心线程起作用
- threadFactory:生成线程的线程工厂
- handle:线程池满时的执行策略
- CallerRunsPolicy:只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务
- AbortPolicy:不管怎样,直接抛出 RejectedExecutionException 异常(默认)
- DiscardPolicy:不做任何处理,直接忽略掉这个任务
- DiscardOldestPolicy:如果线程池没有被关闭的话,把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
线程池的状态 大于0不能提交任务
- RUNNING(-1):接受新的任务,处理等待队列中的任务
- SHUTDOWN(0):不接受新的任务提交,但是会继续处理等待队列中的任务
- STOP(1):不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程(shutDownNow())
- TIDYING(2):所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
- TERMINATED(3):terminated() 方法结束后,线程池的状态就会变成这个
- 调用shutdown方法时,线程池从Running-Shutdown
- 调用shtudownNow方法时,从Running/Shutdown-Stop
- shutdown 阻塞队列为空,线程池状态从shutdown-tidying
- stop线程池为空,从stop-tidying
rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,然后提交此任务这四种策略,默认为抛出异常。
线程池中的线程创建时机
- 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
- 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
-
如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
image.png
注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了。
- 第1种是:newFixedThreadExecutor固定大小线程池,特点是线程数固定,使用无界队列,适用于任务数量不均匀的场景、对内存压力不敏感,但系统负载比较敏感的场景;
- 第2种是:newCacheThreadExecutor线程池,特点是不限制线程数,适用于要求低延迟的短期任务场景;
- 第3种是:newSingleThreadExecutor单线程线程池,也就是一个线程的固定线程池,适用于需要异步执行但需要保证任务顺序的场景;
- 第4种是:newScheduleThreadExecutor线程池,适用于定期执行任务场景,支持按固定频率定期执行和按固定延时定期执行两种方式;
- 第5种是:工作窃取线程池,使用的ForkJoinPool,是固定并行度的多任务队列,适合任务执行时长不均匀的场景。
newFixedThreadPool 最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads(适用于任务数量不均,对内存压力不敏感但对系统负载敏感的场景)
newCachedThreadPool:核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。
任务执行过程中发生异常:如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。
执行拒绝策略
- workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
- workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
ExecutorService.java
public interface ExecutorService extends Executor{
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
// 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();
boolean isShutdown();// 线程池是否已关闭
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
// 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
boolean isTerminated();
// 等待所有任务完成,并设置超时时间
// 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);// 提交一个 Runnable 任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService.java
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现
// 前面也说了,FutureTask 间接实现了Runnable 接口。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
// 此方法目的:将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
// 第二个参数 timed 代表是否设置超时机制,超时时间为第三个参数,
// 如果 timed 为 true,同时超时了还没有一个线程返回结果,那么抛出 TimeoutException 异常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();//任务个数
if (ntasks == 0)
throw new IllegalArgumentException();
// ExecutorCompletionService 不是一个真正的执行器,参数 this 才是真正的执行器
// 它对执行器进行了包装,每个任务结束后,将结果保存到内部的一个 completionQueue 队列中
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 用于保存异常信息,此方法如果没有得到任何有效的结果,那么我们可以抛出最后得到的一个异常
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
// 首先先提交一个任务,后面的任务到下面的 for 循环一个个提交
futures.add(ecs.submit(it.next()));
--ntasks;// 提交了一个任务,所以任务数量减 1
int active = 1;// 正在执行的任务数(提交的时候 +1,任务结束的时候 -1)
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {// 为 null,说明刚刚提交的第一个线程还没有执行完成
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)// 这里的 active == 0,说明所有的任务都执行失败,那么这里是 for 循环出口
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 这里是 else。说明,没有任务需要提交,但是池中的任务没有完成,还没有超时(如果设置了超时)
// take() 方法会阻塞,直到有元素返回,说明有任务结束了
else
f = ecs.take();
}
if (f != null) {// 有任务结束了
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 执行所有的任务,返回任务结果。
// 先不要看这个方法,我们先想想,其实我们自己提交任务到线程池,也是想要线程池执行所有的任务
// 只不过,我们是每次 submit 一个任务,这里以一个集合作为参数提交
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 这个很简单
for (Callable<T> t : tasks) {
// 包装成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 提交任务
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
// 这是一个阻塞方法,直到获取到值,或抛出了异常
// 这里有个小细节,其实 get 方法签名上是会抛出 InterruptedException 的
// 可是这里没有进行处理,而是抛给外层去了。此异常发生于还没执行完的任务被取消了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 这个方法返回,不像其他的场景,返回 List<Future>,其实执行结果还没出来
// 这个方法返回是真正的返回,任务都结束了
return futures;
} finally {
// 为什么要这个?就是上面说的有异常的情况
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
// 带超时的 invokeAll,我们找不同吧
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
Iterator<Future<T>> it = futures.iterator();
// 提交一个任务,检测一次是否超时
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
// 超时
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
// 调用带超时的 get 方法,这里的参数 nanos 是剩余的时间,
// 因为上面其实已经用掉了一些时间了
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}
ThreadPoolExecutor.java
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//空闲存活时间
TimeUnit unit,//
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
// 很多初学者很喜欢在自己的代码中写很多 29 这种数字,或者某个特殊的字符串,然后分布在各个地方,这是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
// 以我们现在计算机的实际情况,这个数量还是够用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 我们说了,线程池的状态存放在高 3 位中
// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;// 用于存放此线程完全的任务数,注意了,这里用了 volatile,保证可见性
// Worker 只有这一个构造方法,传入 firstTask,也可以传 null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);// 调用 ThreadFactory 来创建一个新的线程
}
public void run() {
runWorker(this);
}
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
// 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
// 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
// 至于执行的结果,到时候会包装到 FutureTask 中。
// 返回 false 代表线程池不允许提交任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
// 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支
// 以 maximumPoolSize 为界创建新的 worker,
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 第一个参数是准备提交给这个线程执行的任务,之前说了,可以为 null
// 第二个参数为 true 代表使用核心线程数 corePoolSize 作为创建线程的界线,也就说创建这个线程的时候,
// 如果线程池中的线程总数已经达到 corePoolSize,那么不能响应这次创建线程的请求
// 如果是 false,代表使用最大线程数 maximumPoolSize 作为界线
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&//如果线程池状态大于shutdown
! (rs == SHUTDOWN &&//
firstTask == null &&
! workQueue.isEmpty()))// 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
// 这里失败的话,说明有其他线程也在尝试往线程池中创建线程
if (compareAndIncrementWorkerCount(c))
break retry;
// 由于有并发,重新再读取一下 ctl
c = ctl.get(); // Re-read ctl
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;// worker 是否已经启动
boolean workerAdded = false;// 是否已将这个 worker 添加到 workers 这个 HashSet 中
Worker w = null;
try {
w = new Worker(firstTask);// 把 firstTask 传给 worker 的构造方法
final Thread t = w.thread;// 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 小于 SHUTTDOWN 那就是 RUNNING,这个自不必说,是最正常的情况
// 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();// 添加成功的话,启动这个线程
workerStarted = true;
}
}
} finally {
// 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调用 getTask 获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;// 置空 task,准备 getTask 获取下一个任务
w.completedTasks++;// 累加完成的任务数
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 此方法有三种可能:
// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,
// 它们会一直等待任务
// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭
// 3. 如果发生了以下条件,此方法必须返回 null:
// - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)
// - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务
// - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 两种可能
// 1. rs == SHUTDOWN && workQueue.isEmpty()
// 2. rs >= STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();// CAS 操作,减少工作线程数
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// wc <= maximumPoolSize 同时没有超时
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 这个相对霸道一点,如果线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
网友评论