线程池结构图:

一、关于Executor:
1.1 线程池的顶层接口Executor:
public interface Executor {
void execute(Runnable command);
}
1.2 线程池的第二层接口ExecutorService :
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
1、我们经常用到的几个线程池都是实现于ExecutorService接口, ExecutorService对Executor进行了扩展, 支持Callable和Runnable;
2、先不考虑ExecutorService每个方法到底什么作用, 待后边分析到线程池管理线程流程时自然明了;
1.3 线程池的提供者Executors:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {...}
public static ExecutorService newWorkStealingPool() {...}
public static ExecutorService newSingleThreadExecutor() {...}
public static ExecutorService newCachedThreadPool() {...}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {...}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {...}
private Executors() {}
}
1、先忽略这些方法的内部实现, 以及传入的参数, 现在分析还为时过早, 属于强行记忆;
2、这些方法主要返回两大类线程池ExecutorService和ScheduledExecutorService, 而ScheduledExecutorService又是继承于ExecutorService, 所以后续会重点关注这两个类;
3、接下来开始分析ScheduledExecutorService和ExecutorService是如何管理内部维护的线程池;
4、对Executors 方法展开, 发现ExecutorService和ScheduledExecutorService分别指向ThreadPoolExecutor和ScheduledThreadPoolExecutor, 而这两个类又都间接实现ExecutorService接口, 所以都支持execute(RunnableImpl)和submit(CallableImpl)方式;
先分析execute(RunnableImple)这种方式;
5、关于四个方法的具体分析在模块三
在进行分析之前, 先列出线程池中线程的几个状态以及构建线程池时所需要的参数, 后续分析源码时遇到即回头来进行补充说明:
线程的几个状态值 :
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
构建线程池时所需参数 :
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
- corePoolSize : 线程池核心线程数, 当调用execute(RunnableImpl)时会先判断当前工作线程数量是否小于核心线程数, 如果是, 则尝试新建工作线程, 如果否, 则尝试复用线程或新建线程;
- workQueue :Worker队列, 当调用execute(RunnableImpl)时如果当前线程数量 ≥ 核心工作线程时新到的task会首先被添加至Worker队列(workQueue)中, 然后Worker从workQueue中取出task执行, worker从队列中调用task的方式由我们此时传入的BlockingQueue决定;
二、任务提交:
2.1 ThreadPoolExecutor.execute:
- 注意Thread与Worker的关系;
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
int c = ctl.get();
<<1>>
//**
* 1. 首先获取线程池中Worker的数量, 然后与核心线程数量进行比较, 如果小于核心线程数量, 进入<<2>>
* 2. 如果 当前线程池中线程数量 ≥ 核心线程数, 则进入到<<3>>
*/
if (workerCountOf(c) < corePoolSize) {
<<2>>
//**
* 1. 后续还会根据情况不同, 连续多次调用addWorker方法, 但是传入的参数不同, 这里需要注意;
* 2. 这里传入的是command, true;
* 3. 如果addWorker() = true, 由下文<<3.1>>模块可知, 此时满足以下两个条件:
* (1) 线程池处于获取RUNNING状态;
* (2) 或者firstTask = null, 而firstTask此时一定 != null;
* 所以此时如果线程池处于RUNNGIN状态, 则addWorker一定返回true, 然后直接执行return操作;
*/
if (addWorker(command, true)) 模块<<3.1>>
return;
c = ctl.get();
}
<<3>>
//**
* 1. 结合<<1>>可知能执行到这里有以下几种可能:
* (1) workerCountOf(c) < corePoolSize, 且addWorker(command, true) = true;
* (2) workerCountOf(c) ≥ corePoolSize;
* 2. 如果忽略线程池的运行状态, 则if = true/false取决于workQueue.offer(command):
* (1) 返回true, 则表示当前workQueue未满, 进入<<4>>;
* (2) 返回false, 则表示当前workQueue已满, command入队失败, 进入<<7>>;
* 3. 到这里可以知道, 当向线程池中提交任务时, 首先会判断 workerCountOf(c)是否小于corePoolSize,
* 如果 < corePoolSize, 则直接创建线程执行该command, 如果 ≥ corePoolSize, 则先尝试将command
* 添加到workQueue中;
* 4. 如果workQueue.offer = false, 进入模块<<7>>;
* 5. 这里的command被添加到workQueue的方式取决于在构造ThreadPoolExecutor时传入的BlockingQueue
* 的具体实现类;
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
<<4>>
//**
* 1. 任务被添加到workQueue成功, 此时如果线程池处于关闭状态, 则从workQueue中移除command;
* 2. 如果线程池处于运行状态, 进入<<6>>;
*/
if (! isRunning(recheck) && remove(command))
<<5>>
reject(command);
else if (workerCountOf(recheck) == 0)
<<6>>
//**
* 1. 如果当前线程池处于RUNNING状态, 会一路执行到这里执行addWorker(null, false)操作;
* 2. 注意此时传入的参数与<<2>>传入的参数的区别, null---false, 分析到模块<三>时可知, 这
* 里之所以传入null, 是因为command已经被添加到workQueue中了, 后续创建Thread时直接
* 从workQueue中读取command;
*/
addWorker(null, false); 模块<3.1>
}
<<7>>
//**
* 1. 能执行到这里有以下几种情况:
* (1) workerCountOf(c) ≥ corePoolSize且workQueue已满;
* 2. 结合模块<<3.1>>的分析可知, addWorker返回false有以下几种可能:
* (1) 线程池处于SHUTDOWN状态;
* (2) workerCount > CAPACITY(即Worker数量大于当前线程池最大线程数);
* (3) wc ≥ maximumPoolSize(即Worker数量大于当前线程池最大线程数);
* 2. 如果能够返回true, 则说明一定执行了Worker w = new Worker操作;
* 3. 如果addWorker(...)返回false, 属于异常情况, 进入<<8>>去执行command拒绝策略;
*/
else if (!addWorker(command, false))
<<8>>
//**
* 1. 执行到这里有以下几种情况:
* (1) 线程池关闭;
* (2) workerCountOf(c) > maximumPoolSize;
* 2. command拒绝策略取决于在构建ThreadPoolExecutor时传入的拒绝策略的具体实现;
*/
reject(command); <<4.1>>
}
}
- 关于execute的流程图:

三 任务处理:
3.1 ThreadPoolExecutor.addWorker:
public class ThreadPoolExecutor extends AbstractExecutorService {
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
<<1>>
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
<<2>>
//**
* 1. 针对 curThread < corePoolSize 这种情况, 如果能进入if内部, 只有rs ≥ SHUTDOWN即当前线程池
* 处于中断情况, 否则跳过该if语句;
* 2. 针对curThread ≥ corePoolSize这种情况, 结合模块<<2.1>>
*/
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
<<3>>
for (;;) {
//**
* 获取当前线程池中线程的数量;
*/
int wc = workerCountOf(c);
<<4>>
//**
* 此时先只考虑wc < CAPACITY的情况;
* 1. 结合模块<<2.1>>可知, 只有当 curThread < corePoolSize时, core传入true, 其他情况均
* 传入false, 此时 wc < corePoolSize, 跳过if语句;
* 2. 结合模块<<2.1>>可知, 如果core = false, 有以下两种情况:
* (1) workerCountOf(c) ≥ corePoolSize, 且workQueue未满, command入队workQueue成功,
* 此时firstTask = null;
* (2) workerCountOf(c) ≥ corePoolSize, 且workQueue已满, command入队失败,
* 此时firstTask = command;
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
<<5>>
//**
* 结合<<4>>可知, 如果能够执行到这里, 说明两种情况:
* (1) core = true, 且 wc < corePoolSize;
* (2) core = false, 且 wc < maximumPoolSize;
*/
if (compareAndIncrementWorkerCount(c))
break retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
<<6>>
//**
* 1. 结合模块<<2.1>>可知, firstTask有两种值:
* (1) command:
* (1.1) workerCountOf(c) < corePoolSize, 此时core = true;
* (1.2) workerCountOf(c) ≥ corePoolSize, workQueue已满, command入队失败, 此时core = false;
* (2) null:
* (2.1) workerCountOf(c) ≥ corePoolSize且workQueue未满, command入队成功, 此时core = false;
*/
w = new Worker(firstTask); 模块<<3.2>>
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
<<7>>
//**
* 如果线程池处于RUNNING状态, 进入if内部, 主要操作将workerAdded置为true;
*/
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
<<8>>
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
<<8>>
//**
* t内部持有Worker的引用, 所以t.start实际会触发Worker.run方法的执行;
*/
t.start(); 模块<<5.1>>
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
3.2 Worker结构:
//**
* 1. Worker这里实现了AQS锁, 理所当然具有AQS锁的特性;
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//** Thread this worker is running in. Null if factory fails. */
final Thread thread;
//** Initial task to run. Possibly null. */
Runnable firstTask;
//** Per-thread task counter */
volatile long completedTasks;
//**
* 1. 结合模块**2.1、2.2**可知, 在构建Worker时, 传入的firstTask可能为null:
* (1) firstTask = null, 此时corePoolSize < workerCount < maximumPoolSize, command存在于WorkQueue中;
* (2) firstTask != null, 此时 corePoolSize > workerCount; 或者corePoolSize < workerCount
* 且WorkerQueue已满;
* 2. Worker持有Thread引用, Thread持有Worker的引用, 在**2.2**模块worker.thread.start会触发worker.run
* 方法的执行;
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
- 关于模块2.2 ~ 2.3的流程图:

四、任务拒绝:
4.1 ThreadPoolExecutor.reject:
public class ThreadPoolExecutor {
final void reject(Runnable command) {
//**
* 1. 任务的拒绝完全取决于在构造ThreadPoolExecutor时传入的RejectedExecutionHandler;
*/
handler.rejectedExecution(command, this); 模块<4.2>
}
}
4.2 RejectedExecutionHandler具体实现类:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {...}
public static class AbortPolicy implements RejectedExecutionHandler {...}
public static class CallerRunsPolicy implements RejectedExecutionHandler {...}
public static class DiscardPolicy implements RejectedExecutionHandler {...}
五、任务执行:
5.1 Worker.run:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
public void run() {
runWorker(this);
}
public class ThreadPoolExecutor extends AbstractExecutorService {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//**
* 这里也算是一个思想, 缓存复用, 将Worker.firstTask赋值给一个变量, 然后将Worker.firstWork置为null;
*/
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
<<1>>
//**
* 1. 结合模块<<三>>和模块<<四>>可知, task两种取值的情况如下:
* (1) task = null:
* (1.1) workerCountOf(c) ≥ corePoolSize, 且workQueue未满;
* (2) task = command:
* (2.1) workerCountOf(c) < corePoolSize;
* (2.2) workerCountOf(c) ≥ corePoolSize, 且workQueue已满;
* 2. 如果task != null, 直接执行该任务;
* 3. 如果task == null, 则说明command被添加至workQueue中, 进入模块<5.2>, 这里的
* getTask与模块<<2.1>>的workQueue.offer使用的是生产者--消费者模式;
*/
while (task != null || (task = getTask()) != null) {
w.lock();
<<2>>
task.run();
task = null;
w.completedTasks++;
w.unlock();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
5.2 ThreadPoolExecutor.getTask:
public class ThreadPoolExecutor extends AbstractExecutorService {
<<1>>
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
<<2>>
//**
* 1. allowCoreThreadTimeOut表示是否允许核心线程超时, 默认为false, 表示不允许核心线程超时,
* 通过allowCoreThreadTimeOut显示控制该变量的值, 结合下面代码说明该变量的作用;
* 2. timed取值有以下两种情况:
* (1) timed = true:
* (1.1) allowCoreThreadTimeOut = true, 即允许核心线程超时;
* (1.2) allowCoreThreadTimeOut = false(不允许核心线程超时), 且wc > corePoolSize;
* (2) timed = false:
* (2.1)allowCoreThreadTimeOut = false(不允许核心线程超时), wc ≤ corePoolSize;
* 3. 获取timed值之后, 切入到<<5>>
* 4. 结合对<<5>>的分析知道:
* (1) 当allowCoreThreadTimeOut = true, 此时如果command队列为空, 则所有线程(包括核心与普通线程)
* 在等待keepAliveTime时间后会销毁;
* (2) allowCoreThreadTimeOut = false, wc > corePoolSize, 线程在挂起keepAliveTime时会被销毁,
* (3) allowCoreThreadTimeOut = false, wc ≤ corePoolSize, 线程会一直处于挂起状态, 直到外部唤醒;
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
<<6>>
//**
* 1. 如果timed = true, 执行r = workQueue.poll操作;
* 2. 如果timed = false, 执行r = workQueue.take操作;
* 3. 结合BlockingQueue代码分析可知, poll(...)时, 如果元素队列为empty时, 当前线程挂起
* keepAliveTime如果还未获取到元素, 则线程被销毁;
* 4. 如果是take操作, 则当元素队列为empty时, 当前线程会进入线程等待队列被挂起, 直到被唤醒,
* 关于线程池, 线程被唤醒的时机在模块<<2.1>>的workQueue.offer处;
* 5. 关于poll与take时, 元素具体的出入栈方式则取决于在构造ThreadPoolExecutor时传入的
* BlockingQueue的具体实现;
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
四、Executors四个常见线程池:
4.1 Executors.newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
4.2 Executors.newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.3 Executors.newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4.4 Executors.newScheduledThreadPool:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
}
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
- 结合上面四个工具类的源码分析, 具体参数对比:
方法 | corePoolSize | maximumPoolSize | keepAliveTime | BlockingQueue |
---|---|---|---|---|
newFixedThreadPool | nThreads | nThreads | 0 | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | SynchronousQueue |
newScheduledThreadPool | corePoolSize | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
- 1、针对newCachedThreadPool()把maximumPoolSize设置成Integer.MAX_VALUE, 来一个command, 创建一个Worker, command执行完以后, keepAliveTime时间之后Worker销毁;
网友评论