ThreadPoolExecutor
刚开始用的 时候 一直使用 Executors
但是 由于 idea 装了 阿里代码检查插件 说明 这样是不合规范的 那么 我们就来彻底了解一下 ThreadPoolExecutor 的奥秘。
Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:线程池的核心线程数(线程池的基本大小),线程池中运行的线程数也永远不会超过 corePoolSize 个,默认情况下可以一直存活。可以通过设置allowCoreThreadTimeOut为True,此时 核心线程数就是0,此时keepAliveTime控制所有线程的超时时间。如果调用了prestartCoreThread() 会提前创建 核心线程
-
maximumPoolSize:线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了;
-
keepAliveTime: 线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效;
-
unit :是一个枚举,表示 keepAliveTime 的单位;
-
workQueue:表示存放任务的BlockingQueue<Runnable队列。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
- PriorityBlockingQueue:具有优先界别的阻塞队列。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition(显示锁(Lock)及Condition的学习与使用)来实现阻塞和唤醒。
- threadFactory
用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),也可以 通过 guava提供的ThreadFactoryBuilder 快速的的给 线程池设置有意义的名字 如下:
JDK 自带的 DefaultFactory如下
Executors.defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
使用默认线程工厂 创建就是 默认优先级为Thread.NORM_PRIORITY ,非守护线程
线程命名
这点得反复强调。对正在运行的JVM进行线程转储(thread dump)或者调试时,线程池默认的命名机制是pool-N-thread-M,这里N是线程池的序号(每新创建一个线程池,这个N都会加一),而M是池 里线程的序号。比方说,pool-2-thread-3指的是JVM生命周期中第二个线程池里的第三个线程。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
- handler
RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。
线程池提供了四种拒绝策略:
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。
继续向下,再看execute() 方法之前 我们先看下 线程池的 字段
//ctl是一个原子整数包装
//两个概念领域
// workerCount,表示有效的线程数
// runState,指示是否运行,关闭等
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/* runState提供了主要的生命周期控制,取值为:
*
*RUNNING:接受新的任务和处理排队的任务
*SHUTDOWN:不接受新的任务,但处理排队的任务
*STOP:不接受新的任务,不处理排队的任务,并中断正在进行的任务
*TIDYING:所有任务已经终止,workerCount为零,线程转换到状态TIDYING,将运行terminate()钩子方法
* TERMINATED:已终止()已完成
*/
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
变量 ctl 定义为AtomicInteger,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
RUNNING -- 对应的高3位值是111。
SHUTDOWN -- 对应的高3位值是000。
STOP -- 对应的高3位值是001。
TIDYING -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。
再看最为核心的方法吧:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//1.start
int c = ctl.get();
// 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。
if (addWorker(command, true))
return;
c = ctl.get();
}//1.end
//2 .start
if (isRunning(c) && workQueue.offer(command)) {//2.end
int recheck = ctl.get();
//2.1 start
if (! isRunning(recheck) && remove(command))//2.1 end
reject(command);
//2.2start
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//2,2 end
}
//3.start
else if (!addWorker(command, false))
reject(command);
//3.end
}
- 1.如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。
- 2.如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,当前正在执行,且往工作队列中添加成功,就再次获取当前工作线程数
- , 我们还需要再次检查一下此时线程池是否还是 RUNNING 状态, 如果不是的话就会将原来插入队列中的那个任务删除, 然后调用 reject 方法拒绝此任务的提交;//****执行了shutdown 之类的代码***
- 接着考虑到在我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了,(没有线程了) 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程.
- 3.当workQueue.offer失败时,也就是说现在队列已满,不能再向队列里放,此时工作线程大于等于corePoolSize,创建新的线程执行该task; 加入失败表示 最大线程也满了 ,实施拒绝策略
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//
int c = ctl.get();
//获取当前线程状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1.start
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))//1.end
return false;
for (;;) {
//线程数量
int wc = workerCountOf(c);
//2. start
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//2.end
return false;
//3.
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//4.
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//获取主锁:mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//5 .strat
// 线程状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 线程处于RUNNING状态
// 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当前线程已经启动,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet<Worker>
workers.add(w);
// 设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
//5.end
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 1.判断是否可以执行任务
- rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态
- rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
- rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。
- 2.wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)
- 如果当前线程数大于线程最大上限CAPACITY return false
- 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
- 3 成功CAS workCount 数量 跳出retry循环
- 4重新读一遍工作状态,如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
- 5 获取线程状态 ,如果线程状态是 Runing 的或者SHUTDOWN时 但是firstTask 为空 workQueue 可能不为空
addWorker的4种调用方式(Runnable firstTask, boolean core)
firstTask 是否是新的任务 core 大小限制是用corecount 还是 maxcount
- addWorker(command, true)
- execute(Runnable ) 时调用
- 线程数 < coreSize时,将task放入workers,如果线程数 >=
coreSize(并发),返回false;
- addWorker(command, false)
- execute(Runnable ) 时调用
- 当阻塞对列已满,尝试将新的task放入workers,如果线程数 >= maximumPoolSize,返回false;
- addWorker(null, false)
- execute(Runnable ) 时调用 ----- (ScheduledThreadPoolExecutor)ensurePrestart()调用 先不理 ---------(Worker run 方法最后 finally() 时候调用 )processWorkerExit()
- 放入一个空的task到workers,此时线程数的限制是maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
- addWorker(null, true)
放入一个空的task到workers,线程数 < coreSize。实际的使用是在prestartCoreThread()等方法,(不是ThreadPoolExecutor框架在QuantumRenderer) 有兴趣的读者可以自行阅读,在此不做详细赘述。
在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类
Worker具体实现
-
在addWorker中,t.start()使线程就绪,我们来看看Worker类的具体设计。
-
Worker继承AbstractQueuedSynchronizer,方便实现工作线程的中止等操作;
-
Worker实现Runnable接口,将自身作为一个task在工作线程中执行;
addWoker中的t.start()实质上是执行Worker的run()方法:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
//1.start
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//1.end
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//2.start
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//2.end
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
解析
- 1.线程启动后,释放锁,设AQS状态为0;
获取firstTask任务并执行,执行任务前后可定制beforeExecute和afterExecute; 如果worker自己的task为null,调用getTask从阻塞队列获取等待任务执行,否则,阻塞该方法。 getTask() 方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//必要情况下需要检查workQueue是否为空 和 线程池状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//尝试CAS-递减workerCount
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果线程池允许线程超时或者当前线程数大于核心线程数,则会进行超时处理
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列中获取task(阻塞)
// 如果需要超时控制,则调用poll(),否则调用take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask循环实现:
workQueue.poll:如果在keepAliveTime时间内阻塞队列有任务,返回该任务并执行;
workQueue.take:如果阻塞队列为空,当前线程阻塞,当队列有任务时,线程被唤醒,执行take返回的任务。
- 2
- 如果 当线程池 是stoping时,才会被设置为中断
- 否者清除中断 ,如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()。
运行流程
- 根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >= 0时才会执行;
- 通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;
- 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;
- 如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞;
- task == null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。
在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//执行失败 线程数 -1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从workers 中移除 worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认 false
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//增加线程
addWorker(null, false);
}
}
总结:
首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly == true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。
在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 从HashSet中移除该worker
workers.remove(w);
// 线程数 - 1
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
tryTerminate()
当线程池涉及到要移除worker时候都会调用tryTerminate(),该方法主要用于判断线程池中的线程是否已经全部移除了,如果是的话则关闭线程池。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//1.start
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//1.end
// 2.start
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//2.end
final ReentrantLock mainLock = this.mainLock;
//3.start
/**
* 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
*/
mainLock.lock();
try {
//CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法)
//期间ctl有变化就会失败,会再次for循环
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));//将线程池的ctl变成TERMINATED
termination.signalAll();//唤醒调用了 等待线程池终止的线程 awaitTermination()
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- 如果线程处于run
- 或 如果 线程池已经终止了
- 或 线程池处于ShutDown状态,但是阻塞队列不为空
-
- 如果 线程处于STOP状态,要么处于SHUTDOWN且阻塞队列为空 如果 工作线程 不为空 中断操作.
- 2.在interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的故shutdown()只有对能获取到worker锁的空闲线程(正在从workQueue中getTask(),此时worker没有加锁)发送中断信号
由此可以将worker划分为: 1、空闲worker:正在从workQueue阻塞队列中获取任务的worker 2、运行中worker:正在task.run()执行任务的worker
3.如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程
线程终止
shutdown()
/**
*发起一个有序的关闭,在此之前提交的任务被执行,
*但是没有新的任务被接受。如果已经关闭,调用没有额外的效果。
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
*当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//是否有权限
checkShutdownAccess();
//CAS+循环设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲线程 (正在运行的不会被中断 同时 getTask()时 可以跳出阻塞)
interruptIdleWorkers();
//预留 子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//清理 线程池
tryTerminate();
}
-
1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock
-
2、判断调用者是否有权限shutdown线程池
-
3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务
-
4、中断所有空闲线程 interruptIdleWorkers()
-
5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理
-
6、解锁
-
7、尝试终止线程池 tryTerminate()
--
shutdownNow()
/**
*尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
*这个任务列表是从任务队列中排出(删除)的
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
*这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
*
*除了尽力尝试停止运行中的任务,没有任何保证
* 取消任务是通过Thread.interrupt()实现的,
* 所以任何响应中断失败的 任务可能永远不会结束
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//CAS+循环设置线程池状态为stop
advanceRunState(STOP);
//中断所有线程,包括正在运行任务的
interruptWorkers();
//将workQueue中的元素放入一个List并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks; //返回workQueue中未执行的任务
}
interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()
需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束
shutdownNow() 和 shutdown()的大体流程相似,差别是:
-
1、将线程池更新为stop状态
-
2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
-
3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务
awaitTermination() -- 等待线程池终止
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
//如果结束 返回
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//超时
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞
- 1、所有任务完成执行
- 2、到达超时时间
- 3、当前线程被中断
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间
termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待
阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):
1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出
2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败
3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞
故终止线程池并需要知道其是否终止可以用如下方式:
executorService.shutdown();
try{
while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for terminate");
}
}
catch (InterruptedException e) {
//中断处理
}
参考文献
【死磕Java并发】—–J.U.C之线程池:ThreadPoolExecutor
Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理
《Java 并发编程的艺术》
网友评论