一、为何要使用线程池
在Java中,要使用多线程,除了使用new Thread()
之外,还可以使用线程池ExecutorService
。
// 使用Thread
Thread t = new Thread(new Runnable() {
@Override
public void run() {
// ...
}
});
t.start();
// 使用线程池
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(new Runnable() {
@Override
public void run() {
// ...
}
});
线程池主要解决了两个问题:
- 频繁创建销毁线程的开销
- 任务的管理
在异步任务比较多时,创建、销毁线程会占用很多系统资源;这时候,使用线程池,就可以实现线程的复用,让人专注于任务的实现,而不是管理线程。
二、线程池简介
1. 什么是线程池
线程池(本文特指ThreadPoolExecutor
类)顾名思义,就是一个装了线程的池子。线程池创建和管理若干线程,在需要使用的时候可以直接从线程池中取出来使用,在任务结束之后闲置等待复用,或者销毁。
线程池中的线程分为两种:核心线程和普通线程。核心线程即线程池中长期存活的线程,即使闲置下来也不会被销毁,需要使用的时候可以直接拿来用。而普通线程则有一定的寿命,如果闲置时间超过寿命,则这个线程就会被销毁。
查看ThreadPoolExecutor
类的其中一个典型的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
线程池的具体行为和几个参数有关:
-
核心数 corePoolSize
线程池中核心线程的数量。 -
最大容量 maximumPoolSize
线程池最大允许保留多少线程。 -
超时时间 keepAliveTime
线程池中普通线程的存活时间。
2. 线程池的使用
线程池的一般使用步骤如下:
- 使用
Executors
中的工厂方法来获取ExecutorService
实例; - 使用
ExecutorService
的execute(runnable)
或者submit(runnable)
方法来添加任务。
ExecutorService es = Executors.newSingleThreadExecutor();
es.execute(new Runnable() {
@Override
public void run() {
String response = new HttpUtil().get("http://littlefogcat.top");
System.out.println(response);
}
});
3. 线程池的分类
在Executors
工厂类中提供了多种线程池,典型的有以下四种:
1. SingleThreadExecutor 单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数为1,最大线程数为1,也就是说SingleThreadExecutor
这个线程池中的线程数固定为1。使用场景:当多个任务都需要访问同一个资源的时候。
2. FixedThreadPool 固定容量线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数为n,最大线程数为n。使用场景:明确同时执行任务数量时。
3. CachedThreadPool 缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为0,最大线程数无上限,线程超时时间60秒。使用场景:处理大量耗时较短的任务。
4. ScheduledThreadPool 定时线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/*
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
*/
核心线程数自定,最大线程数无上限。使用场景:处理延时任务。
可以看到,这四个方法都返回了一个ThreadPoolExecutor
对象(ScheduledThreadPoolExecutor
是其子类),仅仅是其中的参数略有不同。所以接下来就对ThreadPoolExecutor
类进行解析。
我将这四种常见的线程池总结了一个表格:
四种线程池
三、线程池的工作流程
1. 典型的线程池使用方式
一个典型的线程池使用方式如下:
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Runnable() {
@Override
public void run() {
// do sth
}
});
这里就以ThreadPoolExecutor.execute(runnable)
方法切入,分析线程池的工作流程。
在ThreadPoolExecutor.execute(runnable)
方法的注释中写道:
Executes the given task sometime in the future. The task
may execute in a new thread or in an existing pooled thread.
If the task cannot be submitted for execution, either because this
executor has been shutdown or because its capacity has been reached,
the task is handled by the current {@code RejectedExecutionHandler}.
简单来说就是将这个传入的runnable对象提交到线程池中,等待执行;如果线程池关闭,或者容量到上限不可以执行了,那么就无法提交,会交给线程池的RejectedExecutionHandler
进行处理(这个RejectedExecutionHandler
在构造方法中传入,或者通过setRejectedExecutionHandler(handler)
方法指定)。
2. 线程池工作流程
线程池的工作流程还是比较清晰的,具体的源码分析在第四节中,本节只做简要说明。
2.1 添加任务
当调用ThreadPoolExecutor.execute(runnable)
的时候,会进行以下判断(这里不考虑延时任务):
- 如果线程池中,运行的线程数少于核心线程数(corePoolSize),那么就新建一个线程,并执行该任务。
- 如果线程池中,运行的线程数大于等于corePoolSize,将线程添加到待执行队列中,等待执行;
- 如果2中添加到队列失败,那么就新建一个非核心线程,并在该线程执行该任务;
- 如果当前线程数已经达到最大线程数(maximumPoolSize),那么拒绝这个任务。
这里有个问题,什么情况下,任务会添加失败呢?这个问题会在下面第四节源码分析中workQueue部分说明。
2.2 执行任务
在2.1添加任务中,添加失败自然不必执行,会直接拒绝任务;任务添加成功有两种情况:
- 将任务添加到任务队列;
- 新建线程执行任务。
新建线程自不必说,主要看看添加到任务队列中的任务是如何被执行的。
从2.1中我们知道,每一个工作线程必然是被一个任务唤醒的,这个任务被称作初始任务(firstTask)。当一个工作线程完了它的初始任务之后,会从待执行的任务队列(workQueue)中取新的任务。workQueue是一个阻塞队列,线程会一直等待直到有新的任务到来为止。对于一个设置了超时时间的线程,如果在指定的时间之后仍然没有新任务到达,那么这个线程就会停止等待任务并且销毁。
四、线程池中的一些重要概念
Worker / workers
Worker
类是ThreadPoolExecutor
类的一个内部类,也是线程池管理操作线程的核心所在。每一个worker都对应着一个thread,所以在不混淆的情况下,可以把worker理解为工作线程。
ThreadPoolExecutor
有一个名为workers
的成员变量,它保存了这个线程池所有存活的worker对象。
workQueue
workQueue
是线程池内部用来保存待执行任务的队列。它是一个BlockingQueue<Runnable>
类型的变量,在没有任务的时候,它的poll()
方法会阻塞。
在一个允许超时的worker执行完任务之后,会调用workQueue.poll()
取出下一个任务执行。如果没有任务,则会在这里阻塞;当阻塞时间达到超时时间后,这个工作线程会退出并销毁。
五、通过源码详细分析线程池
1. ctl
ThreadPoolExecutor
通过一个原子整型ctl
来保存线程池的两个重要字段,workerCount和runState。workerCount即线程池工作线程的数量,而runState代表了线程池当前的状态(如:运行中、关闭、终止)。通过位运算,可以从ctl
得到workerCount和runState的值,反之也可以通过workerCount和runState组合得到ctl
。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
显然,这跟Android中的MesureSpec通过一个整数来保存两个属性原理是相同的。
2. execute(runnable)方法
本节所有流程都以ThreadPoolExecutor.execute(runnable)
方法切入,分析线程池的源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到,这个方法很简单,正如三-2.1小节所说的一样,在添加任务时做一些判断。在ThreadPoolExecutor
中,有一个队列workQueue
保存了待执行的任务。而当需要新建线程的时候,则执行addWorker(runnable, core)
方法来创建一个worker/线程。
因为这个方法是线程池执行的核心,所以下面重点理解这个方法里面的语句。
3. workQueue / Worker
workQueue是ThreadPoolExecutor
类的一个非常重要的成员变量。在2中,我们知道了,当正在执行的线程数量大于核心线程数,那么会优先将任务添加到任务队列,即workQueue中。
通过execute(runnable)
方法可以知道,对于一个处在运行中的线程池,只有在当前工作线程数量大于等于核心数时,才会将任务往队列中添加。并且,如果往任务队列添加失败的话,就会开启新的工作线程。
那么回到第三节中的问题,什么情况下会添加失败呢?注意这一句:
if (isRunning(c) && workQueue.offer(command)) {
// ...
}
很简单,当workQueue.offer(command)
返回false的时候,则说明添加失败。一般来说,当队列的容量满了,offer方法就会返回false。即,在线程数超过了核心数(workerCount>corePoolSize)的情况下,只有在任务队列被填满之后,线程池才会考虑创建新线程,否则只会将任务添加到任务队列中等待执行。在线程池的构造方法中传入不同的队列类型,就会有不同的效果。回到Executors
工厂类中,看看四种基本的线程池分别都是使用的什么队列?
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor的构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
-
SingleThreadExecutor
核心数和最大线程数均为1,使用LinkedBlockingQueue,容量为Interger.MAX_VALUE
。也就是说,SingleThreadExecutor中永远只有一个线程,所有任务单线执行,并且容量无上限。 -
CachedThreadPool
核心数为0,最大线程数Interger.MAX_VALUE,使用SynchronousQueue
。这个队列的特点是,没有内部容量。也就是说,对于一个新任务,但凡是没有空闲的线程,那么就创建一个新的线程。而由于核心数是0,当超过一定时间没有新任务之后,线程池中所有线程都将被销毁。 -
FixedThreadPool
和SingleThreadExecutor类似,使用LinkedBlockingQueue
;不同的是核心数和最大线程数为n。 -
ScheduledThreadPoolExecutor
使用DelayedWorkQueue
,可以实现延时/定时获取任务。
看完这里,就能很好的理解Executors
中的这些线程池为何能够呈现出各自的特性了。
在第四节中我们知道,对于线程的操作等,不是直接通过Thread来进行的,而一般是通过Worker类进行。每一个Worker对应了一个线程,任务的添加、执行等,都是通过Worker来实现的。ThreadPoolExecutor
中有一个HashSet<Worker>
类型的变量workers
,用来保存可用的Worker。也就是说,我们所谓的“线程池”实际本质上就是“Worker池”。由于Worker
和Thread
是一对一的关系,所以为了图方便,有时候可以简单的把Worker
理解成一个工作线程,但需要知道其本质上与真正的线程Thread
是不同的。
Worker
类是ThreadPoolExecutor
的一个内部类,继承自AbstractQueuedSynchronizer
,实现了Runnable
接口:
// ...略去一部分
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
真实运行的线程,是worker.thread
,在Worker
构造方法中,worker.thread
通过工厂方法创建。而线程肯定是要调用start()
方法运行的,搜索一下worker.thread
的start()
方法,发现是在ThreadPoolExecutor.addWorker()
这个方法里调用的。
在下面的第4小节中,会专门分析这个addWorker(runnable, core)
方法。
另一方面,Worker本质上又是一个Runnable对象,是一个可运行任务,在真实线程worker.thread
启动后,会调用其run()
方法:
// Worker中
public void run() {
runWorker(this);
}
4. addWorker(runnable, boolean)方法
线程池创建工作线程是通过addWorker
方法来进行的。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
虽然这段代码有点长,但是所做的事情其实只有两件:
- 检查是否应该添加这个worker:只有在线程池处于正在运行的状态(runState==RUNNING),并且当前worker数小于最大容量时,才能添加;
- 新建worker对象并添加到workers中。
5. runWorker(Worker)方法
在3中我们得知,当Worker的线程开始运行之后,会调用其run()方法:
// Worker中
public void run() {
runWorker(this);
}
而run()又会调用ThreadPoolExecutor.runWorker(Worker)
方法。在这里看一下这个方法。
// 省略一大部分
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} catch (Exception x) {
}
}
}
// 省略一大部分
private Runnable getTask() {
for (;;) {
if (/*无法获取任务*/) {
return null;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
}
}
}
为了便于观看,我删去了大部分代码,只留了核心的几行。可以看到,在Worker的任务执行完毕之后,会再从workQueue队列中获取新的任务,按此无限循环。什么时候Worker会结束并销毁呢?从这一句while (task != null || (task = getTask()) != null)
中,即worker中没有任务,并且getTast()
返回null,worker就会结束执行。什么时候返回null,不让worker继续存活了呢?
- 线程池被shutdown,并且任务队列空了;
- 线程池超容量;
- 超时;
也就是说,如果线程池在运行状态,容量也没有到最大,并且任务队列还有任务,这个worker就会永远运行下去。
六、总结
就用图片来总结一下。
下图阐述了线程池调用execute(runnable)
之后的流程。
这张图表示了execute
之后的调用链,相当于Worker的生命周期了(不包括销毁)。
网友评论