总览
下图是 java 线程池几个相关类的继承结构:
先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) 接口方法定义。
ExecutorService 也是接口,在 Executor 接口的基础上添加了很多的接口方法,所以一般来说我们会使用这个接口。
然后再下来一层是 AbstractExecutorService,从名字我们就知道,这是抽象类,这里实现了非常有用的一些方法供子类直接使用,之后我们再细说。
然后才到我们的重点部分 ThreadPoolExecutor 类,这个类提供了关于线程池所需的非常丰富的功能。
同在并发包中的 Executors 类,类名中带字母 s,我们猜到这个是工具类,里面的方法都是静态方法,如以下我们最常用的用于生成 ThreadPoolExecutor 的实例的一些方法,也就是四大线程池:
- newCachedThreadPool的cpu核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue
这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。
- newFixedThreadPool最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。
- newSingleThreadExecutor生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
SynchronousQueue 是一个比较特殊的 BlockingQueue,其本身不储存任何元素,
它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,
如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;
如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,
可以看我的另一篇关于 BlockingQueue 的文章。
另外,由于线程池支持获取线程执行的结果,所以,引入了 Future 接口,RunnableFuture 继承自此接口,然后我们最需要关心的就是它的实现类 FutureTask。到这里,记住这个概念,在线程池的使用过程中,我们是往线程池提交任务(task),使用过线程池的都知道,我们提交的每个任务是实现了 Runnable 接口的,其实就是先将 Runnable 的任务包装成 FutureTask,然后再提交到线程池。这样,读者才能比较容易记住 FutureTask 这个类名:它首先是一个任务(Task),然后具有 Future 接口的语义,即可以在将来(Future)得到执行的结果。
当然,线程池中的 BlockingQueue 也是非常重要的概念,如果线程数达到 corePoolSize,我们的每个任务会提交到等待队列中,等待线程池中的线程来取任务并执行。这里的 BlockingQueue 通常我们使用其实现类 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每个实现类都有不同的特征,使用场景之后会慢慢分析。想要详细了解各个 BlockingQueue 的读者,可以参考我的前面的一篇对 BlockingQueue 的各个实现类进行详细分析的文章。
Executor 接口
/*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}
我们可以看到 Executor 接口非常简单,就一个 void execute(Runnable command) 方法,代表提交一个任务。
比如我们想知道执行结果、我们想知道当前线程池有多少个线程活着、已经完成了多少任务等等,这些都是这个接口的不足的地方。接下来我们要介绍的是继承自 Executor 接口的 ExecutorService 接口,这个接口提供了比较丰富的功能,也是我们最常使用到的接口。
ExecutorService
一般我们定义一个线程池的时候,往往都是使用这个接口:
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
因为这个接口中定义的一系列方法大部分情况下已经可以满足我们的需要了。
那么我们简单初略地来看一下这个接口中都有哪些方法:
public interface ExecutorService extends Executor {
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
// 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();
// 线程池是否已关闭
boolean isShutdown();
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
// 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
boolean isTerminated();
// 等待所有任务完成,并设置超时时间
// 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
// 执行所有任务,返回 Future 类型的一个 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 也是执行所有任务,但是这里设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
// 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这些方法都很好理解,一个简单的线程池主要就是这些功能,能提交任务,能获取结果,能关闭线程池,这也是为什么我们经常用这个接口的原因。
FutureTask
在继续往下层介绍 ExecutorService 的实现类之前,我们先来说说相关的类 FutureTask。
Future Runnable
\ /
\ /
RunnableFuture
|
|
FutureTask
FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,
所以每个 Runnable 通常都先包装成 FutureTask,
然后调用 executor.execute(Runnable command) 将其提交给线程池
我们知道,Runnable 的 void run() 方法是没有返回值的,所以,通常,如果我们需要的话,会在 submit 中指定第二个参数作为返回值:
<T> Future<T> submit(Runnable task, T result);
其实到时候会通过这两个参数,将其包装成 Callable。它和 Runnable 的区别在于 run() 没有返回值,而 Callable 的 call() 方法有返回值,同时,如果运行出现异常,call() 方法会抛出异常。
public interface Callable<V> {
V call() throws Exception;
}
在这里,就不展开说 FutureTask 类了,因为本文篇幅本来就够大了,这里我们需要知道怎么用就行了。
下面,我们来看看 ExecutorService 的抽象实现 AbstractExecutorService 。
AbstractExecutorService
AbstractExecutorService 抽象类派生自 ExecutorService 接口,然后在其基础上实现了几个实用的方法,这些方法提供给子类进行调用。
这个抽象类实现了 invokeAny 方法和 invokeAll 方法,这里的两个 newTaskFor 方法也比较有用,用于将任务包装成 FutureTask。定义于最上层接口 Executor中的 void execute(Runnable command) 由于不需要获取结果,不会进行 FutureTask 的包装。
需要获取结果(FutureTask),用 submit 方法,不需要获取结果,可以用 execute 方法。
下面,我将一行一行源码地来分析这个类,跟着源码来看看其实现吧:
Tips: invokeAny 和 invokeAll 方法占了这整个类的绝大多数篇幅,这里选择适当跳过,
因为它们可能在你的实践中使用的频次比较低,而且它们不带有承前启后的作用,不用担心会漏掉什么导致看不懂后面的代码。
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现
// 前面也说了,FutureTask 间接实现了Runnable 接口。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
}
到这里,我们发现,这个抽象类包装了一些基本的方法,它们都没有真正开启线程来执行任务,它们都只是在方法内部调用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法还没出现,需要等具体执行器来实现这个最重要的部分,这里我们要说的就是 ThreadPoolExecutor 类了。
ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
我们可以基于它来进行业务上的扩展,以实现我们需要的其他功能,比如实现定时任务的类 ScheduledThreadPoolExecutor 就继承自 ThreadPoolExecutor。当然,这不是本文关注的重点,下面,还是赶紧进行源码分析吧。
首先看看类的结构:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE=32,32-3意味着前三位用于存放线程状态,后29位用于存放线程数
//很多初学者很喜欢在自己的代码中写很多 29 这种数字,或者某个特殊的字符串,然后分布在各个地方,这是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
//1 << COUNT_BITS = 0010 0000 0000 0000 0000 0000 0000 0000
//(1 << COUNT_BITS) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
//得到29个1,也就是说线程池的最大线程数是 2^29-1=536870911
// 以我们现在计算机的实际情况,这个数量还是够用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 1=0000 0000 0000 0000 0000 0000 0000 0001
//1反码=1111 1111 1111 1111 1111 1111 1111 1110
//-1=1补码=反码+1=1111 1111 1111 1111 1111 1111 1111 1111
//-1 << COUNT_BITS= -1左移29次=1110 0000 0000 0000 0000 0000 0000 0000
//也就是32的高3位为线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整数c的低29位改成0,得到线程池的状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//将整数c的高3位改为0,得到线程池中的线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//c < SHUTDOWN也只有RUNNING,这个方法是判断线程池是否运行中
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//cas比较设置增加ctl值,ctl值钱3位为状态,后29位为线程数
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//cas比较设置减少ctl值,ctl值钱3位为状态,后29位为线程数
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//循环设置ctl减少,直到更新成功
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
//等待队列
private final BlockingQueue<Runnable> workQueue;
//全局锁
private final ReentrantLock mainLock = new ReentrantLock();
//工作线程所在集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//这个是为了实现wait..notify类似效果,由condition.await和signal实现,这个可以针对单个唤醒
private final Condition termination = mainLock.newCondition();
//记录工作worker最大值,就是workers的set集合最大个数
private int largestPoolSize;
//执行完成的任务数量,抛异常也算
private long completedTaskCount;
//创建线程的工程
private volatile ThreadFactory threadFactory;
//在执行饱和或关闭时调用处理程序。拒绝策略
private volatile RejectedExecutionHandler handler;
//当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间
//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
private volatile long keepAliveTime;
//默认false,cpu核心线程闲置的话也会一直保持活着状态,如果true,核心线程会以keepAliveTime等待获取任务
//允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
//cpu核数
//核心线程会一直存活,即使没有任务需要执行
//当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
// 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
private volatile int corePoolSize;
//线程池最大线程数,实际受CAPACITY限制
private volatile int maximumPoolSize;
//默认拒绝策略,当队列满了,线程数据也达到了最大,则执行拒绝策略,默认直接抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* The context to be used when executing the finalizer, or null. */
private final AccessControlContext acc;
//worker是工作者,里面维护线程让线程执行任务
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
//运行任务的线程
final Thread thread;
//初始化任务,可以是空
Runnable firstTask;
//完成的任务数量
volatile long completedTasks;
//初始化worker,初始任务,可以为空
Worker(Runnable firstTask) {
//设置状态
//把状态位设置成-1,这样任何线程都不能得到Worker的锁,除非调用了unlock方法。这个unlock方法会在runWorker方法中一开始就调用,
//这是为了确保Worker构造出来之后,没有任何线程能够得到它的锁,除非调用了runWorker之后,其他线程才能获得Worker的锁
setState(-1);
this.firstTask = firstTask;
//线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
//worker执行任务,在addWorker里新增成功后启动线程
public void run() {
runWorker(this);
}
}
接着,我们来看看线程池实现中的几个概念和处理流程。
我们先回顾下提交任务的几个方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
当然,上图没有考虑队列是否有界,提交任务时队列满了怎么办?什么情况下会创建新的线程?提交任务时线程池满了怎么办?空闲线程怎么关掉?这些问题下面我们会一一解决。
我们经常会使用 Executors 这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这里先不说有什么区别,它们最终都会导向这个构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//corePoolSize:cpu核数不能小于0
//maximumPoolSize:线程池做大线程数不能小于等于0并不能小于cpu核心数
//keepAliveTime:当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间,也不能小于0
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//工作线程所在的队列和线程制造工厂和拒绝策略都不能空
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
在这里,介绍下线程池中的各个状态和状态变化的转换过程:
- RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
- SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
- STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
- TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
- TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。
看了这几种状态的介绍,读者大体也可以猜到十之八九的状态转换了,各个状态的转换过程有以下几种:
- RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的
- (RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚shutDown() 和 shutDownNow() 的区别了
- SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
- STOP -> TIDYING:当任务队列清空后,发生这个转换
- TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后
另外,我们还要看看一个内部类 Worker,因为 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部变量名叫 task 或 command),线程是 Worker。
Worker 这里又用到了抽象类 AbstractQueuedSynchronizer。题外话,AQS 在并发中真的是到处出现,而且非常容易使用,写少量的代码就能实现自己需要的同步方式(对 AQS 源码感兴趣的读者请参看我之前写的几篇文章https://www.jianshu.com/p/54d372425e54)。
//worker是工作者,里面维护线程让线程执行任务
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
//运行任务的线程
final Thread thread;
//初始化任务,可以是空
Runnable firstTask;
//完成的任务数量
volatile long completedTasks;
//初始化worker,初始任务,可以为空
Worker(Runnable firstTask) {
//设置状态
//把状态位设置成-1,这样任何线程都不能得到Worker的锁,除非调用了unlock方法。这个unlock方法会在runWorker方法中一开始就调用,
//这是为了确保Worker构造出来之后,没有任何线程能够得到它的锁,除非调用了runWorker之后,其他线程才能获得Worker的锁
setState(-1);
this.firstTask = firstTask;
//线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
}
//worker执行任务,在addWorker里新增成功后启动线程
public void run() {
runWorker(this);
}
...// 其他几个方法没什么好看的,就是用 AQS 操作,来获取这个线程的执行权,用了独占锁
worker的加锁解锁机制是基于AQS框架的,要完全弄明白它的加锁解锁机制请看AQS框架的实现,在这里只是简单介绍一下:
//尝试加锁方法,将状态从0设置为1;如果不是0则加锁失败,在worker线程没有启动前是-1状态,无法加锁
//该方法重写了父类AQS的同名方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁的方法,直接将state置为0
//该方法重写了父类AQS的同名方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//注意:tryAcquire与tryRelease是重写了AQS父类的方法,且不可以直接调用,它们被以下方法调用实现加锁解锁操作
//加锁:acquire法是它父类AQS类的方法,会调用tryAcquire方法加锁
public void lock() { acquire(1); }
//尝试加锁
public boolean tryLock() { return tryAcquire(1); }
//解锁:release方法是它父类AQS类的方法,会调用tryRelease方法
public void unlock() { release(1); }
//返回锁状态
public boolean isLocked() { return isHeldExclusively(); }
默认工厂DefaultThreadFactory
线程的创建看下defaultThreadFactory,当然是在Executors类创建
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
//线程池编号
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程池中线程所属线程组
private final ThreadGroup group;
//线程池中线程编号
private final AtomicInteger threadNumber = new AtomicInteger(1);
//线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//设置线程名称为"pool-线程池的编号-thread-线程的编号"
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//创建新的线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//设置为非守护线程
if (t.isDaemon())
t.setDaemon(false);
//设置优先级为NORMAL为5
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
一般我们最好不要用默认的线程池,可以继承该类,给线程指定一个识别度高的名字,出了问题好排查;
怎么去创建一个worker
worker是线程池执行任务的线程
- 如果线程池处于TERMINATED、STOP、TIDYING和(SHUTDOWN+任务队列为空)是没必要创建线程了
- core是true则线程数大于等于cpu核心线程数没必要创建线程或者core是false则大于等于线程最大数也没必要创建线程
- cas比较线程数+1成功则创建worker并且初始化线程
- 线程添加到线程集合并记录线程集合最大值,启动线程
- 线程启动失败则从线程集合移除线程
- 如果当前线程池状态处于RUNNING和(SHUTDOWN+任务队列不为空)则不能去停止线程池,如果处于TIDYING或TERMINATED就没必要再去关闭线程池了
- 如果线程池线程不为0,则遍历线程集合,判断是否被中断过是否是闲置,如果是发出中断,唤醒获取任务阻塞的线程,如果为0,设置TIDYING,成功则设置成TERMINATED,关闭线程池
先看看流程图:
这个方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我们看看它是怎么创建新的线程的:
private boolean addWorker(Runnable firstTask, boolean core) {
//标志,和break retry以及continue retry联合使用
//break retry结束内存循环直接往retry:下方代码,不再执行for
//continue retry结束内存循环直接往retry:下方代码,重新执行for
retry:
//自旋
for (; ; ) {
//或许线程池状态整数
int c = ctl.get();
//得到线程池状态
// 线程池状态只有高3位
int rs = runStateOf(c);
// 这个非常不好理解
// 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:
// 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
// 简单分析下:
// 还是状态控制的问题,当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行
// 当状态大于 SHUTDOWN 时,不允许提交任务,且中断正在执行的任务
// 多说一句:如果线程池处于 SHUTDOWN,但是 firstTask 为 null,且 workQueue 非空,那么是允许创建 worker 的
// 这是因为 SHUTDOWN 的语义:不允许提交新的任务,但是要把已经进入到 workQueue 的任务执行完,所以在满足条件的基础上,是允许创建新的 Worker 的
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
//获得线程池的线程数
int wc = workerCountOf(c);
//线程数大于等于CAPACITY(2^29-1),创建worker失败返回
//当core是true则线程数大于等于核心数就创建worker失败返回,
// 当是false则线程数大于等于线程最大数量就创建worker失败返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas比较设置当前线程数+1,成功(代表线程数量匹配上,并要创建新线程所以+1)退出自旋返回到retry:
//ctl的值就+1了
if (compareAndIncrementWorkerCount(c))
break retry;
//比较不成功,说明线程数可能已经被其它线程修改过了,重新读一次
c = ctl.get();
//得到的线程池状态跟初始进来的状态不一样,则continue retry到retry:重新执行for
if (runStateOf(c) != rs)
continue retry;
}
}
//到这里已经具备创建worker的条件,下面开始创建worker
//worker启动标志
boolean workerStarted = false;
//worker已经被添加标志
boolean workerAdded = false;
Worker w = null;
try {
//创建一新的Worker,在worker构造方法里对线程池的状态设置为-1
//并且线程工程创建线程,有第一个任务也会设置进去
w = new Worker(firstTask);
//在worker创建时初始化的线程
final Thread t = w.thread;
//线程初始化成功
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//这是整个线程池的全局锁,持有这个锁才能让下面的操作顺理成章
//因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
mainLock.lock();
try {
//ctl这个值在前面cas已经设置+1
//重新获取线程池的状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN:处于RUNNING,正常情况
//(rs == SHUTDOWN && firstTask == null):处于SHUTDOWN不接受新的任务,但是会执行等待队列的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//新增的worker里的线程可不能是已经启动的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//到这里,worker已经初始化好,并添加到hashSet里
workers.add(w);
//查看worker集合的大小
int s = workers.size();
//largestPoolSize是用于记录worker中个数的最大值
//当worker集合数量已经大于它,则将其调整为当前worker集合数量
if (s > largestPoolSize)
largestPoolSize = s;
//表示worker添加成功
workerAdded = true;
}
} finally {
//worker一系列添加后释放锁
mainLock.unlock();
}
//如果worker已经添加成,启动线程会调用runWorker方法,并设置线程已经启动标志
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//worker线程启动失败
if (!workerStarted)
addWorkerFailed(w);
}
//返回线程是否启动标志
return workerStarted;
}
简单看下 addWorkFailed 的处理:
tryTerminate();方法后面统一讲,因为很多地方调用它
//添加worker失败
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程启动失败,但是worker已经创建好了,则将创建好的worker从集合移除
if (w != null)
workers.remove(w);
//死循环,cas比较去将线程数整数减一,直到减成功,ctl的值就减一
decrementWorkerCount();
//重新检查是否终止,以防该worker的存在阻止了终止
tryTerminate();
} finally {
//释放锁
mainLock.unlock();
}
}
cas更新线程数,线程数减一
//循环设置ctl减少,直到更新成功
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
每个worker怎么获取任务呢?
- 自旋获取线程状态,线程状态不处于RUNNING或者不处于(SHUTDOWN+任务队列不为空),则线程数减一,任务返回空。
- 自旋获取线程状态,线程状态处于RUNNING或者处于(SHUTDOWN+任务队列不为空),则当前线程数如果大于最大线程数并且任务队列时刻,或者当前线程数大于核心线程数并且超时并且队列是空,那么cas设置线程减一,继续自旋,否则指定keepAliveTime超时时间去读取任务或者直接阻塞读取任务。
- 读取任务的时候会被阻塞,如果tryTerminal里的中断信号发出,这里就会识别,则抛出中断异常,线程醒过来,设置超时标识,继续进行自旋。
- 如果读取的任务不为空,则直接返回任务。
流程图如下:
// 如果发生了以下四件事中的任意一件,那么Worker需要被回收:
// 1. Worker个数比线程池最大大小要大
// 2. 线程池处于STOP状态
// 3. 线程池处于SHUTDOWN状态并且阻塞队列为空
// 4. 使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
// 如果使用超时时间并且也没有拿到任务的标识
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
//worker减一是因为前面添加worker的时候启动线程已经添加成功,而这盘点不符合执行任务,所以把添加的worker数量减掉
//在processWorkerExit进行回收
// 如果线程池是SHUTDOWN状态并且阻塞队列为空的话,worker数量减一,
// 直接返回null(SHUTDOWN状态还会处理阻塞队列任务,但是阻塞队列为空的话就结束了),
// 如果线程池是STOP状态的话,worker数量减一,
// 直接返回null(STOP状态不处理阻塞队列任务)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//当前线程数
int wc = workerCountOf(c);
//如果设置了allowCoreThreadTimeOut则可能超时
//标记从队列中取任务时是否设置超时时间,
// 如果为true说明这个worker可能需要回收,
// 为false的话这个worker会一直存在,并且阻塞当前线程等待阻塞队列中有数据
// allowCoreThreadTimeOut属性默认为false,表示线程池中的核心线程在闲置状态下还保留在池中;
// 如果是true表示核心线程使用keepAliveTime这个参数来作为超时时间
// 如果worker数量比基本大小要大的话,timed就为true,需要进行回收worker
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//4中情况分析:
//第一:wc > maximumPoolSize && wc > 1 大于了maximumPoolSize必然大于1,所以不获取任务
//第二:wc > maximumPoolSize && workQueue.isEmpty() 阻塞队列的没有任务了,也就没必要获取
//第三:(timed && timedOut) && wc>1超时了,有线程在运行,就没必要获取任务
//第四:(timed && timedOut) && workQueue.isEmpty()超时了,任务队列没任务,则没必要获取
//(timed && timedOut)表示当前线程大于核心数并且前面获取任务poll超时没获取到任务,则timedOut=true,
//这个就体现了keepAliveTime的用处,超时没获取到任务,并且当前线程超高了cpu核心,因为没获取到任务worker就闲置
//所以这里线程数减一,在外面进行worker的回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//线程数减一
if (compareAndDecrementWorkerCount(c))
//不用执行任务
return null;
continue;
}
try {
//poll:从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。
// 否则直到时间超时还没有数据可取,返回失败。
//take:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,
// 阻断进入等待状态直到BlockingQueue有新的数据被加入
//设置了allowCoreThreadTimeOut或者当前线程已经大于cpu核心数则以keepAliveTime超时时间获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//任务取到,返回任务
if (r != null)
return r;
//到这里任务是空,设置超时标识
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
每个worker是怎么执行任务的呢
添加worker成功后会去启动线程,这时候调用ThreadPoolExecutor的run方法,然后执行runWorker方法。
那接下来看下runWorker方法
- 先释放当前当前worker的锁,这样支持获取任务的时候可以多个线程去抢占woker,在if (!t.isInterrupted() && w.tryLock())才能获取到锁,如果获取到锁说明线程是闲置状态
- 获取任务
- 获取到任务,进行全局加锁
- 如果线程池已经处于STOP状态以上线程却没有终端,则中断当前线程,也就是不去获取任务了,但是会继续执行当前任务,然后完成任务数量+1,解锁,异常完成任务标识设置为false。
- 回收worker。
先看下流程图:
// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
// 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 该线程的第一个任务(如果有的话)
Runnable task = w.firstTask;
w.firstTask = null;
//由于在创建Worker的时候设置状态是-1,其它线程不能得到worker的锁。这里释放后其它线程可以获得worker的锁
//在getask的时候可以支持中断,如果这边不unlock的话,在if (!t.isInterrupted() && w.tryLock())就一直认为它是忙的状态
//则无法打断因为获取任务而阻塞的线程,它只是在获取任务而不是执行任务,也是闲置的线程
w.unlock();
//是否正常结束任务标识
boolean completedAbruptly = true;
try {
// 如果worker中的任务不为空,否则使用getTask获得任务。一直死循环,除非得到的任务为空才退出
//当然getTask如果没拿到任务会一直阻塞直到拿到任务
while (task != null || (task = getTask()) != null) {
// 如果拿到了任务,给自己上锁,表示当前Worker已经要开始执行任务了,
// 已经不是闲置Worker(闲置Worker的解释请看下面的线程池关闭)
w.lock();
// 在执行任务之前先做一些处理。
// 1. 如果线程池已经大于等于STOP状态并且当前线程没有被中断,中断线程
// 2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,
// 重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前需要做什么,ThreadPoolExecutor是个空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的开始执行任务,调用的是run方法,而不是start方法。这里run的时候可能会被中断,比如线程池调用了shutdownNow方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 任务执行结束需要做什么,ThreadPoolExecutor是个空实现
afterExecute(task, thrown);
}
} finally {
task = null;
// 记录执行任务的个数
w.completedTasks++;
// 执行完任务之后,解锁,Worker变成闲置Worker
w.unlock();
}
}
//正常执行完任务,改成false
completedAbruptly = false;
} finally {
// 回收Worker
processWorkerExit(w, completedAbruptly);
}
}
任务执行的worker应该怎么处理?
会对worker进行一个回收
- 判断在执行任务的时候有没有异常,如果有则会将线程数量通过cas比较减一,因为添加worker的时候已加入队列,失败了就要减掉
- 任务正常执行,统计线程完成任务数,移除当前worker,尝试关闭线程池
- 若线程池还处于RUNING和(SHUTDOWN+任务队列非空),则还不能关闭线程池,再判断运行任务的时候是否正常运行,是就开始设置线程数量最小值,设置了allowCoreThreadTimeOut最小值为0,没设置则cpu核心数,比较最小值,如果最小值为0并且还有任务为完成,则最小值改成1(执行任务)。当前线程数大于等于最小线程,则不用再创建worker,如果当前线程数量小于最小值添加新worker
只有以下几种情况才会在回收worker还会补偿创建worker:
- 线程池处于RUNING和(SHUTDOWN+任务队列非空)则任务还没执行完,前面执行任务又失败worker被回收,这里需要补偿创建
- 线程池处于RUNING和(SHUTDOWN+任务队列非空)则任务还没执行完,前面执行任务成功,但是当前线程数小于线程数量最小值,这里需要补偿创建
流程图如下:
//回收Worker
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果Worker没有正常结束流程调用processWorkerExit方法,worker数量减一。
// 如果是正常结束的话,在getTask方法里worker数量已经减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//退出前,将本线程已完成的任务数量,添加到已经完成任务的总数中
completedTaskCount += w.completedTasks;
// 线程池的worker集合删除掉需要回收的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
int c = ctl.get();
// 如果线程池还处于RUNNING或者SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
//如果非异常状况completedAbruptly=false,也就是没有获取到可执行的任务,则获取线程池允许的最小线程数,
// 如果allowCoreThreadTimeOut为true说明允许核心线程超时,则最小线程数为0,否则最小线程数为corePoolSize;
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果allowCoreThreadTimeOut=true,且任务队列有任务要执行,则将最最小线程数设置为1
if (min == 0 && !workQueue.isEmpty())
min = 1;
//如果当前线程数大于等于最小线程数,则直接返回
if (workerCountOf(c) >= min)
// 不需要新开一个Worker
return;
}
// 新开一个Worker代替原先的Worker
// 新开一个Worker需要满足以下3个条件中的任意一个:
// 1. 获取执行的任务发生了异常
// 2. Worker数量比线程池基本大小要小
// 3. 阻塞队列不空但是没有任何Worker在工作
addWorker(null, false);
}
}
execute
先看下流程图:
- 如果当前线程数小于核心数量,直接创建worker
- 如果线程池状态是RUNNING并且任务加到等待队列成功,还要重新确认一次线程池的状态,若还是RUNNING,则当前线程数等于0的话直接创建worker,如果不是RUNNING并且队列移除任务成功,执行拒绝策略,默认抛出异常
- 如果 workQueue 队列满了,以 maximumPoolSize 为界创建新的 worker, 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
有了上面的这些基础后,我们终于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源码分析的时候也说了,各种方法都最终依赖于 execute 方法,也就是线程池执行任务的入口:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 前面说的那个表示 “线程池状态” 和 “线程数” 的整数
int c = ctl.get();
//得到线程池的线程数,如果线程数小于cpu核心数
//新增worker加入hashSet成功,就直接返回了
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//新增worker加入hashSet失败,则重新确认ctl的值
c = ctl.get();
}
//到这里添加worker失败了,需要把任务放到等待队列
//线程池还在RUNNING状态,阻塞队列也没满的情况,加到阻塞队列里
if (isRunning(c) && workQueue.offer(command)) {
/* 这里面说的是,如果任务进入了 workQueue,我们是否需要开启新的线程
* 因为线程数在 [0, corePoolSize) 是无条件开启新的线程
* 如果线程数已经大于等于 corePoolSize,那么将任务添加到队列中,然后进到这里
*/
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (!isRunning(recheck) && remove(command))
//默认的拒绝策略直接抛异常
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
// 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支
// 以 maximumPoolSize 为界创建新的 worker,
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
如果等待队列满了后对任务怎么处理的
是有四种不同的拒绝策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
- AbortPolicy(默认策略)直接抛出异常拒绝接收任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- CallerRunsPolicy如果线程池在运行,以当前线程完成任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- DiscardOldestPolicy丢弃等待最久的任务,继续执行其它任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
- DiscardPolicy不做任何处理,新的任务直接忽略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
线程池的关闭
线程池的关闭有两个方法shutdown() 与shutdownNow() ;
shutdown会将线程池状态设置为SHUTDOWN状态,然后中断所有空闲线程,然后执行tryTerminate()方法(tryTerminate这个方法很重要,会在后面分析),来尝试终止线程池;
shutdownNow会将线程池状态设置为STOP状态,然后中断所有线程(不管有没有执行任务都设置为中断状态),然后执行tryTerminate()方法,来尝试终止线程池;
- shutdown
流程图如下:
这里的中断只是发出中断信号,而在获取任务阻塞的线程会识别到对应的中断信号,然后抛出异常,执行完方法退出
//尝试停止线程池,此时不接受新的任务,但是会处理等待队列的任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查关闭线程池的权限
checkShutdownAccess();
//把线程池状态更新到SHUTDOWN
advanceRunState(SHUTDOWN);
//中断闲置的Worker
interruptIdleWorkers();
// 钩子方法,默认不处理。ScheduledThreadPoolExecutor会做一些处理
onShutdown();
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
}
//更新指定状态
private void advanceRunState(int targetState) {
for (; ; ) {
int c = ctl.get();
//假设targetState=SHUTDOWN以下两种情况会结束自旋
//1.当前线程池状态不是运行状态
//2.当前线程状态是运行状态,但是cas比较更新当前线程数和线程池状态为SHUTDOWN成功
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
中断闲置线程
//传true,中断一个worker,false则中断全部闲置的worker,
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// Worker中的线程没有被打断并且Worker可以获取锁,这里Worker能获取锁说明Worker是个闲置Worker,
// 在阻塞队列里拿数据一直被阻塞,没有数据进来。如果没有获取到Worker锁,说明Worker还在执行任务,
// 不进行中断(shutdown方法不会中断正在执行的任务)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- shutdownNow
流程图如下:
这里的中断只是发出中断信号,而在获取任务阻塞的线程会识别到对应的中断信号,然后抛出异常,执行完方法退出
//尝试停止线程池,此时不接受新的任务,也不会处理等待队列的任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查关闭线程池的权限
checkShutdownAccess();
//把线程池状态更新到STOP
advanceRunState(STOP);
//中断所有Worker,不管是否闲置
interruptWorkers();
//清空等待队列的任务,并且返回清除成功的这些任务的集合
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
//返回清除成功的这些任务的集合
return tasks;
}
更新指定状态
//更新指定状态
private void advanceRunState(int targetState) {
for (; ; ) {
int c = ctl.get();
//假设targetState=SHUTDOWN以下两种情况会结束自旋
//1.当前线程池状态不是运行状态
//2.当前线程状态是运行状态,但是cas比较更新当前线程数和线程池状态为SHUTDOWN成功
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
停止所有的启动线程,不管是否闲置
//停止所有的启动线程,不管是否闲置
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
对阻塞队列的任务进行清空
//对阻塞队列的任务进行清空
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 该方法会将阻塞队列中的所有项添加到 taskList 中
// 然后清空任务队列,该方法是线程安全的
q.drainTo(taskList);
//队列还没清空?
if (!q.isEmpty()) {
// 将 List 转换为 数组,传入的 Runnable[0] 用来说明是转为 Runnable 数组
//也就是q转成了Runnable[],q有几个元素,Runnable[]有几个元素
for (Runnable r : q.toArray(new Runnable[0])) {
//从队列里移除指定元素
if (q.remove(r))
//移除成功,添加到任务集合
taskList.add(r);
}
}
//返回清空成功的任务集合
return taskList;
}
关于finalize方法说明:
垃圾回收时,如果判断对象不可达,且覆盖了finalize方法,则会将对象放入到F-Queue队列 ,有一个名为”Finalizer”的守护线程执行finalize方法,它的优先级为8,做最后的清理工作,执行finalize方法完毕后,GC会再次判断该对象是否可达,若不可达,则进行回收,否则,对象复活
注意:网上很多人说 ,Finalizer线程的优先级低,个人认为这是不对的,Finalizer线程在jdk1.8的优先级是8,比我们创建线程默认优先级5要高,之前其它版本的jdk我记得导出的线程栈信息里面优先级是5,忘记是哪个版本的jdk了,即使是5优先级也不比自建的线程默认优先级低,总之我没见过优先级低于5的Finalizer线程;
这个线程会不停的循环等待java.lang.ref.Finalizer.ReferenceQueue中的新增对象。一旦Finalizer线程发现队列中出现了新的对象,它会弹出该对象,调用它的finalize()方法,将该引用从Finalizer类中移除,因此下次GC再执行的时候,这个Finalizer实例以及它引用的那个对象就可以回垃圾回收掉了。
大多数时候,Finalizer线程能够赶在下次GC带来更多的Finalizer对象前清空这个队列,但是当它的处理速度没法赶上新对象创建的速度,对象创建的速度要比Finalizer线程调用finalize()结束它们的速度要快,这导致最后堆中所有可用的空间都被耗尽了;
当我们大量线程频繁创建重写了finalizer()方法的对象的情况下,高并发情况下,它可能会导致你内存的溢出;虽然Finalizer线程优先级高,但是毕竟它只有一个线程;最典型的例子就是数据库连接池,proxool,对要释放资源的操作加了锁,并在finalized方法中调用该加锁方法,在高并发情况下,锁竞争严重,finalized竞争到锁的几率减少,finalized无法立即释放资源,越来越多的对象finalized()方法无法被执行,资源无法被回收,最终导致导致oom;所以覆盖finalized方法,执行一定要快,不能有锁竞争的操作,否则在高并发下死的很惨;
尝试终止线程池tryTerminate
该方法会在很多地方调用,如添加worker线程失败的addWorkerFailed()方法,worker线程跳出执行任务的while 循环退出时的processWorkerExit()方法,关闭线程池的shutdown()和shutdownNow()方法,从任务队列移除任务的remove()方法;
该方法的作用是检测当前线程池的状态是否可以将线程池终止,如果可以终止则尝试着去终止线程,否则直接返回;
STOP->TIDYING 与SHUTDOWN->TIDYING状态的转换,就是在该方法中实现的,最终执行terminated()方法后会把线程状态设置为TERMINATED的状态;
尝试终止线程池执行过程;
- 重点内容先判断线程池的状态是否允许被终止
以下状态不可被终止:
1.如果线程池的状态是RUNNING(不可终止)
或者是TIDYING(该状态一定执行过了tryTerminate方法,正在执行或即将执行terminated()方法,所以不需要重复执行),
或者是TERMINATED(该状态已经执行完成terminated()钩子方法,已经是被终止状态了),
以上三种状态直接返回。
2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
以下两种状态线程池可以被终止:
1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
- 线程池状态可以被终止,如果线程池中仍然有线程,则尝试中断线程池中的线程
则尝试中断一个线程然后返回,被中断的这个线程执行完成退出后,又会调用tryTerminate()方法,中断其它线程,直到线程池中的线程数为0,则继续往下执行; - 如果线程池中的线程为0,则将状态设置为TIDYING,设置成功后执行 terminated()方法,最后将线程状态设置为TERMINATED
源码如下:
//尝试关闭线程池
//尝试关闭线程池
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
// 满足3个条件中的任意一个,不终止线程池
// 1. 线程池还在运行,不能终止
// 2. 线程池处于TIDYING或TERMINATED状态,说明已经在关闭了,不允许继续处理
// 3. 线程池处于SHUTDOWN状态并且阻塞队列不为空,这时候还需要处理阻塞队列的任务,不能终止线程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 走到这一步说明线程池已经不在运行,阻塞队列已经没有任务,但是还要回收正在工作的Worker
//还剩下 SHUTDOWN&&workQueue.isEmpty 、STOP这两种状态
if (workerCountOf(c) != 0) {
// 由于线程池不运行了,调用了线程池的关闭方法,在解释线程池的关闭原理的时候会说道这个方法
// 中断闲置Worker,直到回收全部的Worker。这里没有那么暴力,只中断一个闲置线程,
//发出中断信号,中断阻塞在获取任务的线程,然后还是会调用tryTerminate方法,如果还有闲置线程,那么继续中断
interruptIdleWorkers(ONLY_ONE);
return;
}
// 走到这里说明worker已经全部回收了,并且线程池已经不在运行,阻塞队列已经没有任务。可以准备结束线程池了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas操作,将线程池状态改成TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//钩子方法,没有任何实现
terminated();
} finally {
//terminated方法调用完毕之后,状态变为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
//传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// Worker中的线程没有被打断并且Worker可以获取锁,这里Worker能获取锁说明Worker是个闲置Worker,
// 在阻塞队列里拿数据一直被阻塞,没有数据进来。如果没有获取到Worker锁,说明Worker还在执行任务,
// 不进行中断(shutdown方法不会中断正在执行的任务)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
总结几个问题:
- 说说线程池中的线程创建时机?
1. 线程池处于RUNNING和(SHUTDOWN+等待队列不为空)状态,并且当前线程数量小于核心数,
那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务。
2. 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,
等待线程池中的线程去队列中取任务。
3. 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,
如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
- 注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了。因为后面的任务直接往队列塞就行了,此时 maximumPoolSize 参数就没有什么意义。
- 任务执行过程中发生异常怎么处理?
如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。
然后经过一系列确认,比如线程池的状态是否启动、有没有未完成的任务不够线程执行等等,
然后会启动一个新的线程来代替它。
- 什么时候会执行拒绝策略?
1、workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,
与此同时线程池被关闭了并且刚刚加入队列的任务移除成功,那么执行拒绝策略。
2、workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,
那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。
线程池看了几天,一开始看得有点蒙蔽,慢慢才看懂,喜欢给个赞!!!!!
网友评论