线程池概述
线程池的好处:
- 线程可重用,线程是稀缺资源,使用线程池可减少创建和销毁线程的次数,每个工作线程都可以被重复使用。
- 线程并发数控制,可以根据系统承受能力,调整线程池中工作线程数量,提高系统响应能力,降低资源消耗。
ThreadPoolExecutor

ThreadPoolExecutor是线程池的核心类,要透彻的了解线程池,必须先了解这个类。
构造函数参数
- int corePoolSize:线程池当中所一直维护的线程数量,如果线程池处于任务空闲期间,那么该线程并不会被回收(allowCoreThreadTimeOut默认false,保证核心线程不会timeOut被清除)。prestartAllCoreThreads()方法可以初始化corePoolSize的核心线程。
- int maximumPoolSize:线程池所维护的最大线程数量。
- long keepAliveTime:超过CorePoolSize的线程(非核心线程)在keepAliveTime时间后一直处于空闲状态,那么超过的这部分线程将会被回收掉。
- TimeUnit unit:回收时间单位。
- BlockingQueue<Runnable> workQueue:向线程池提交的任务位于阻塞队列,它的实现有多种方式。
- ThreadFactory threadFactory:线程工厂类,提供创建新线程的方法。
-
RejectedExecutionHandler:当线程池中的线程都忙于执行任务且阻塞队列已经满了的情况下,新提交的任务该如何被对待和处理:
7.1 AbortPolicy:直接抛出异常RejectedExecutionException
7.2 DiscardPolicy:空实现,什么都不做,直接丢弃当前任务(可能会导致future.get()一直阻塞)
7.3 DiscardOldestPolicy:丢弃阻塞队列中存放最久的任务(poll对头元素),并且为当前所提交的任务留出一个队列中的空闲空间,以便将其放到队列中
7.4 CallerRunsPolicy:直接由提交任务的线程来运行这个提交的任务
线程池状态
线程池控制状态的方式类似于ReentrantReadWriteLock通过state高低位表示读写状态,线程池通过一个AtomicInteger ctl来控制。
- 线程池本身的状态:ctl的左边高3位
- 线程池中所运行的线程的数量:ctl的其余29位;这里补充一下int类型左移<<知识(规则则是带符号位移,高位移出,低位补0,移动位数超过该类型的最大位数,则进行取模,如对Integer型左移34位,实际上只移动了两位。左移一位相当于乘以2的一次方,左移n位相当于乘以2的n次方。)
1<<COUNT_BITS = 001带29个0
CAPACITY = 1<<COUNT_BITS - 1 = 000带29个1
~CAPACITY = 111带29个0
所以workerCountOf的判断就是当前count和CAPACITY相与&;
runStateOf的判断就是当前状态值和~CAPACITY想与&;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; =>29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; =>(2^29) - 1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS; =>0
private static final int STOP = 1 << COUNT_BITS; =>00100000 (000...00)
private static final int TIDYING = 2 << COUNT_BITS; =>01000000 (000...00)
private static final int TERMINATED = 3 << COUNT_BITS; =>01100000 (000...00)
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
线程池一共存在5种状态:
- RUNNING:线程池可以接收新的任务提交,并且还可以正常处理阻塞队列中的任务。
- SHUTDOWN:不再接受新的任务提交,不过线程池还可以继续处理阻塞队列中的任务(已经入队列的任务还会继续处理)。
- STOP:不再接收新的任务,同时还会丢弃阻塞队列中既有的任务;此外,它还会中断正在处理中的任务。
- TIDYING:所有的任务都终止(也涵盖了阻塞队列中的任务),当前线程池中的活动线程数workCount降为0,即将会调用terminated()钩子方法(模板方法,由自定义线程池可实现做一些扩展操作)。
- TERMINATED:线程池的终止状态,当terminated()执行完毕之后,线程池将会处于该状态之下。
状态切换:
- RUNNING -> SHUTDOWN:当调用了线程池的shutdown()方法,或者当finalize()方法被隐式调用。
- RUNNING, SHUTDOWN -> STOP:当调用了线程池的shutdownNow()方法。
- SHUTDOWN -> TIDYING:在线程池与阻塞队列均变为空时。
- STOP -> TIDYING:在线程池变为空的时候(STOP的时候已经将queue的任务清空了)
- TIDYING -> TERMINATED:在terminated()被执行完毕时
源码分析
线程池提交任务的方式有两种:submit和execute
线程池提交之submit
submit方法有父类AbstractExecutorService实现。
submit有三种方式,最终都是将传递进来的任务转换为一个FutureTask。(RunnableFuture)对象进行处理。
之前我们做了FutureTask的源码解析,FutureTask可以接受Callable和Runnable类型,且最终都是封装成Callable的(RunnableAdapter适配器模式)。
且可以看到任务被转换成FutureTask之后都是调用execute()方法。
//Runnable在FutureTask构造函数内通过RunnableAdapter转换成Callable
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
//Runnable在FutureTask构造函数内通过RunnableAdapter转换成Callable,并且result为future.get()的返回值
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//callable直接有FutureTask包装
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
线程池提交之execute
execute()方法有ThreadPoolExecutor自己实现,且是submit()内部会调用的方法。
该方法里主要的三个步骤:
- 如果工作的线程数小于corePoolSize,就创建一个新的worker线程,并将提交的任务作为其firstTask。
- 如果工作线程数大于corePoolSize,并且成功的加入workQueue,此时还需要重新确认一下运行状态,如果不再运行了(线程挂了或者线程池shutdown了),需要reject当前任务;
- 如果不能成功加入workQueue,并且工作线程数没有大于maxPoolSize,则添加非核心线程;如果大于maxPoolSize,则拒绝当前任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//注意这里加入workQueue是通过offer()方法,所以不会阻塞。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//这里重新检查线程运行状态以及工作线程数
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加非核心线程,如果添加失败,则拒绝策略拒绝当前任务。
else if (!addWorker(command, false))
reject(command);
}
Worker
在上面看到execute()方法,主要还是操作Worker类,该类继承AQS,那么它就提供了一些锁的功能。
state=0表示unlock
state=1表示locked
所以Worker继承了AQS之后,提供的tryAcquire()和tryRelease()就是将state分别置为1和0,来表示获得锁和释放锁。
那么在构造函数中还有一句state=-1,这里主要对应interruptIfStarted()方法,而该方法被interruptWorkers()调用,InterruptIfStarted()意味着中断只对已经started的worker线程有效,即getState() >= 0的线程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//Worker类所运行的线程
final Thread thread;
//初始化的运行任务
Runnable firstTask;
//完成的任务数
volatile long completedTasks;
//构造函数,接受一个Runnable firstTask,且注意Worker也是一个Runnable
//这里thread会接受worker这个Runnable,而不是线程池提交的那个Runnable
Worker(Runnable firstTask) {
//这里将state置为-1,为了防止shutdown方法中断未启动的Worker线程,
// 因为中断只会关注state>=0的线层任务,Worker启动的时候先调用unlock()方法,是为了响应中断。
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//Worker是一个Runnable,run()方法调用runWorker()方法
public void run() {
runWorker(this);
}
// 0表示 unlocked state.
// 1表示 locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//中断已启动的worker线程
void interruptIfStarted() {
Thread t;
//主要针对getState() >= 0的worker线程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker()
这个是Worker的核心方法,该方法在开始的时候会先调用w.unlock(),这与上面构造函数state=-1有一定的关系,unlock()方法主要将state置为0,那么意味着该worker线程已经被启动,那么调用interruptWorkers()方法时,该worker就会能响应中断。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//将state从-1置为0,从而能够响应中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//hook方法,由ThreadPoolExecutor子实现类提供
beforeExecute(wt, task);
Throwable thrown = null;
try {
//拿到firstTask,在这里调用其run()方法
//如果是submit方式,任务都会被包装成FutureTask,如果任务里没有捕获异常,
// 不会终止worker线程,而是future.get()的时候返回executionException
//如果是execute()方式,那么Runnable里面没有捕获异常,会导致Worker抛出异常提前终止。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//hook方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()
该方法在runWorker()方法里调用,在worker的firstTask完成了之后,就会从workQueue里面去获取task来执行,如果queue是空了, 则该方法就会阻塞(一直或者一段时间)。
private Runnable getTask() {
boolean timedOut = false; // 上一次轮询是不是timeout
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池shutdown了,并且workQueue已经空了,或者STOP了,即workQueue被终止。
//那么就需要开始降workerCount了,并且return,不在阻塞。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 看是不是需要定时,如果allowCoreThreadTimeOut为true,则所有的线程在poll的时候都有等待时间;
//如果allowCoreThreadTimeOut为false,那么只有非核心线程在poll的时候有等待时间。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当wc超过maximumPoolSize或者是非核心线程且上一次poll超时了
//并且wc个数大于1,或者workQueue为空,则降低workerCount,并return null
//这里主要就是非核心线程在从workQueue里面poll任务的时候,等待超时了,就会被回收,这会return null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//再次检查workerCount的数量,如果超过了maxPoolSize,则不操作了。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//开始从workQueue里面poll或者take任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit()
该方法在runWorker()方法最后finally里调用,completedAbruptly为true表示firstTask.run()方法抛出异常;completedAbruptly为false,表示正常结束。
一般来说getTask()在workQueue不为空的时候,会一直轮询workQueue;那么workQueue为空的时候,就需要阻塞(非核心线程会有等待时间限制)
所以该方法主要是在抛出异常后剔除该Worker。
当然getTask()也有一些时候会return null(比如非核心线程在poll任务的时候等待超时,会被回收了),那么这个方法同样会检查如果线程池还没stop的(包括shutdown,但是workQueue还没清空的时候),这时候还需要补充新的worker来完成workQueue的清理工作。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果run方法出了异常,那么要将worker的count降下来
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//将该worker从workers集合中剔除
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池,将其状态更新为TERMINATED,
// 如果当前(SHUTDOWN and pool and queue empty) or (STOP and pool empty)
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//这里主要看如果运行的worker数目大于最小的数量,就不需要补充
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//当一个Worker出现异常而退出或者WorkerCout小于min,那么就需要重新创建一个Worker来补充进来
addWorker(null, false);
}
}
addWorker
上面看过Worker之后,execute()方法会通过addWorker()来维护与Worker的关系。
addWorker(Runnable, boolean)接收Runnable作为参数,以及一个boolean表示是否是核心线程。
刚开始的两个for循环,外层for循环检查线程池状态;内层for循环检查worker线程数。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
//拿到ctl的值,通过runStateOf拿到状态值进行判断
int c = ctl.get();
int rs = runStateOf(c);
//shutdown,stop,tidying,terminated状态都不会接受新的任务提交
//或者running状态的时候,firstTask传进来是null,任务也不会被接收
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty())) {
return false;
}
//第二个for循环检查workerCount
for (; ; ) {
int wc = workerCountOf(c);
//workerCount工作线程大于CAPACITY,肯定不行
//如果是新建core核心线程,就看corePoolSize有没有超出
//如果是新建core核心线程,就看maximumPoolSize有没有超出
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
//如过检查都好,就CAS增加一个workCount
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
//如果CAS更新失败就重新检查一下状态,如果状态变了就重试最外层for循环
//否则就重试内部for循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) {
continue retry;
}
// else CAS failed due to workerCount change; retry inner loop
}
}
//在状态和workerCount检查完毕并且workerCount加一成功之后,开始增加新的Worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个新的Worker,内部启动一个线程调用Worker的runWorker()方法
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
{
throw new IllegalThreadStateException();
}
//这个锁的内部主要就是添加到workers集合里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//上面worker创建并添加集合成功,就调用start()方法,启动worker的runWorker()方法
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
shutdown
shutdown方法主要就是将状态设为SHUTDOWN,不再接收新的任务,然后中断没有工作的worker线程(主要通过tryLock()来看是不是在工作)。
shutdown和shutdownNow最后都会调用tryTerminate()方法,就是检查线程池状态,最终更新为TERIMATED,更新成功则termination.signalAll()唤醒调用awaitTermination()而被阻塞的线程去优雅的关闭。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//更新state到SHUTDOWN
advanceRunState(SHUTDOWN);
//中断idle的worker线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试去更新到TERMINATED状态
tryTerminate();
}
//中断idle,即没有工作的worker线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//主要通过tryLock()来看是不是在工作,因为runWorker()里面getTask()之后开始执行,是lock()了
if (!t.isInterrupted() && w.tryLock()) {
try {
//如果tryLock()成功,则表示worker线程是空闲的,会调用interrupt()方法来中断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow
shutdownNow与shutdown的区别就是状态更新到STOP,不再接收新的任务,并且中断所有启动的(state>=0)worker线程,并且返回workQueue里面没有处理的任务列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//更新状态到STOP
advanceRunState(STOP);
//中断所有的启动的worker线程
interruptWorkers();
//从workQueue里面导出未处理的Runnable列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//中断所有的启动的worker线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//interruptIfStarted在Worker类里面分析过了,只对state>=0的Worker有效
//因为初始化Worker的state是-1,只有runWorker()方法开始的时候调用unlock()将state更新为0
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
awaitTermination
从上面shutdown和shutdownNow看出一些区别,shutdown不会中断正在处理任务的线程,意味着一直会处理到workQueue被清空;shutdownNow则会中断正在处理中的任务,返回未处理的Runnable列表。
那么如果想要优雅的关闭线程池,就调用shutdown方法,但是又不想无止境的等下去,所以就可以通过awaitTermination(long, timeUnit)来设置一个等待终止的时间。
awaitTermination方法在await()一段时间之后,如果状态到了TERMINATED,表示线程池已经优雅的关闭了,返回true;如果线程池还没到TERMINATED状态,就返回false。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//一直循环直到等待时间到达,如果没有到达,则当前线程会被挂起
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
优雅关闭线程池
ExecutorService es = Executors.newFixedThreadPool(10);
try {
es.shutdown();
//超过等待时间,如果还有线程没执行完,不再等待,shutdownNow直接关闭线程池
if (!es.awaitTermination(20, TimeUnit.SECONDS)) {
es.shutdownNow();
}
} catch (Throwable e) {
es.shutdownNow();
}
网友评论