线程池
0、前言
线程池,顾名思义就是线程的池子,在每次需要取线程去执行任务的时候,没必要每次都创建新线程执行,线程池就是起着维护线程的作用,当有任务的时候就取出一个线程执行,如果任务执行完成则把线程放回到池子中,完成线程的复用,没必要每次都去创建和销毁现场,提高效率。
线程池为线程的生命周期的开销和资源不足提供的解决方案。通过对多个任务的重用线程
那么什么是时候使用多线程呢?
- 单个任务处理时间比较短
- 处理的任务数比较大,创建和销毁线程消耗资源过多
1、Exector 接口
image.png- Exector 接口:运行新任务的简单接口
- ExectorService 接口:扩展了Exector接口,添加了一些用来管理执行器生命周期和任务生命周期的方法
- ScheduledExecutorService 接口:扩展自ExectorService接口,支持Future和定期执行任务
- Exectors 类:包装了具体的几个常用的线程池的定义,便于使用
2、ThreadPoolExecutor 类
java的线程都是在JUC包中,其中java.uitl.concurrent.ThreadPoolExecutor
是最为核心的类
其包中包含的四种创建线程池的方法
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
具体的调用方法又可以是在Exectors
类中,只是提供更加便利的线程池调用方法,如果希望使用更加灵活自定义的线程池,还是建议使用ThreadPoolExecutor
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2.1 参数说明
-
private volatile int corePoolSize;
:核心池大小,当当前线程数小于核心池数字,则新加一个任务就创建一个线程处理,当超过了核心池大小则加入到任务队列中,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中 -
private volatile int maximumPoolSize;
:最大线程数,线程池最大能创建的线程数目 -
private volatile long keepAliveTime;
:线程没有需要处理的任务时最多保持多久会终止,默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0 -
private volatile ThreadFactory threadFactory;
:创建线程的工厂bean,其中Executors.defaultThreadFactory()提供默认的线程工厂bean -
private final BlockingQueue<Runnable> workQueue;
:任务队列 -
private volatile RejectedExecutionHandler handler
:拒绝处理任务时的策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
2.2 重要方法
2.2.1 execute()
Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
2.2.2 submit()
ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法
2.2.3 shutdown()
关闭线程池的方法,不会接受新的任务,会等到已经存在的任务执行完成
2.2.4 shutdownNow()
关闭线程池的方法,不会接受新的任务,已经运行的任务也会被中断执行
3、线程池实现原理
3.1 线程池状态
线程池的状态,表示当前线程池的工作状态,例如会不会接受新的任务,之前存在的任务是如何处理的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl字段是组合为线程池的运行状态和有效线程数异或生成的,高三位保存了runState,低29位保存了workerCount
- RUNNING : 能接受新提交的任务,并且能处理阻塞队列中的任务
- SHUTDOWN : 关闭线程池,不再接受新提交的任务,但是可以继续处理在阻塞线程中已经存在的任务,线程在 RUNNING 状态调用shutdown()方法进入到SHUTDOWN状态
- STOP : 不接受新任务,也不处理阻塞队列的任务,中断正在处理的线程,线程处于 RUNNING 或者 SHUTDOWN 状态 调用 shutdownNow()方法进入到STOP 状态
- TIDYING 所有的任务都终止了,workCount为0,线程池如果处于该情况下,调用terminated()方法,进入到 TERMINATED 状态
- TERMINATED TODO
3.2 任务添加执行
3.2.1 execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
1、当前有效线程数小于核心线程数时,应该添加一个新线程addWorker(command, true)
,成功则退出,否则继续进程
2、再次获取ctl值(状态、线程数可能发生变化)
3、如果线程池处于运行状态,并且当前任务成功加入到workQueue中,再次进行double-check,理由同上,
- 如果发现线程池不再处于运行状态,而且remove成功,则需要拒绝该任务,退出
- 否则当当前有效线程数为0时,创建一个空的线程,只是为了保证线程池在RUNNING状态存在一个可用的线程去执行任务
4、运行最后一个else则有如下情况 - 线程不处于RUNNING状态
- 线程是RUNNING状态,但是工作线程数目已经超过了核心线程数,而且阻塞队列已经满了
这个时候通过调用addWorker(command, false)
,false表示线程上线设置为最大线程数去添加该任务
5、如果没有正常添加,则拒绝该任务
3.2.2 addWorker(Runnable firstTask, boolean core)
创建一个新线程去执行任务
firstTask表示需要执行的任务,如果为null,则是单独的创建一个新线程
core 则是判断线程数是否需要超过核心线程数目,true表示新增线程数前判断是否小于核心线程数,false表示新增线程数前判断是否小于最大线程数
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/*
如果状态 > SHUTDOWN 直接返回false,此时不执行新任务,也不执行阻塞队列的任务
但是 在 = SHUTDOWN的状态,不会处理新任务,还是可以继续执行在阻塞队列的任务
如果状态 = SHUTDOWN 而且 firstTask 为 null 直接返回false
如果状态 = SHUTDOWN 而且 firstTask 不为 null 而且阻塞队列为空 直接返回false
综上所述就是如果状态>SHUTDOWN,则不会处理当前任务,返回false,当=SHUTDOWN时,如果任务为null也不会处理,返回false,或者是阻塞队列已经被执行完成了,也是不会处理当前任务的,同样返回false
*/
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
/*
很明显采用的是经典的**死循环加CAS模式**去判断当前是否可以新加线程
如果当前工作线程数目超过了最大值,就返回false,不再处理
采取CAS去添加线程数目,如果成功就跳出第一个for循环,这里需要注意下,这里**线程池状态可能会发生变化**,所以有如下代码
```java
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
```
在进行CAS操作的时候,是针对整个的ctl值进行操作,如果成功,**肯定能够确保线程状态没有发生变化**!
CAS失败,再次获取ctl值,如果当前线程池状态不等于rs,则说明线程池状态发生变化,需要从for循环重新开始,否则一直在这个for循环内运行
for循环break之后就是表示当前任务可以被创建成一个线程去执行操作了
*/
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
/*
```java
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
```
新建一个Worker对象,其中Thread线程是通过线程工厂bean调用生成的
*/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 利用可重入的排他锁,确保线程安全
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程池在RUNNING 运行状态或者
// 在 SHUTDOWN 而且 firstTask为null,则需要创建一个新线程
// !!!注意,原本SHUTDOWN状态是不应该创建新线程的,但是如果提交任务为null,则需要创建一个线程,只是线程不会执行任何任务
if (t.isAlive()) // precheck that t is startable
// 如果该线程已经存活,则抛出异常,肯定不正常,线程还是刚创建,压根没启动
throw new IllegalThreadStateException();
workers.add(w);
// workers是一个HashSet<Worker>()
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// largestPoolSize 表示线程池中存在的最大线程数的情况
workerAdded = true;
// 往工作线程中加入了新的worker成功
}
} finally {
mainLock.unlock();
// 锁被释放,其他的线程可以进入了
}
if (workerAdded) {
t.start();
// 这里才是真正的线程启动的地方,由于Worker本身是继承Runnable
// 所以这个start最终运行的也是Worker的run方法,也就是`runWorker`方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 没有正常添加线程到worker中或者线程启动失败
addWorkerFailed(w);
/*
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 依旧可能存在线程安全问题
// 不得不说,这代码对线程安全的粒度做的确实很细
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
// 尝试更新线程池状态到TERMINATED 状态
} finally {
mainLock.unlock();
}
}
*/
}
return workerStarted;
}
3.2.3 runWorker(Worker w)
新创建了一个worker,然后执行,如果worker中不包含对于的任务,可以从阻塞队列中获取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 当前需要执行的线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 解绑workder和当前任务的关系,需要执行的是task任务
// 线程已经创建完成wt,wt需要执行task
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 这是一个死循环,直到无法获得到有效的task
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
// STOP 状态 是调用了 shutdownNow() 之后的线程池状态
try {
beforeExecute(wt, task);
// 这个可以自定义加一些线程监控mirror
Throwable thrown = null;
try {
task.run();
// 真正的运行线程
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 当前这个workder有效完成的任务数
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
3.2.4 getTask()
从阻塞队列中获取有效的任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/*
1、如果当前线程并未STOP,但是却进入了SHUTDOWN状态,而且阻塞队列也没有任何,则减少workers数目,并且返回null
2、如果当前线程池>=STOP,则直接返回返回null
*/
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
这个allowCoreThreadTimeOut 就是配套 keepAliveTime 使用
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/*
wc > maximumPoolSize 工作线程数大于最大线程数,可能是在此时之前设置了最大线程数
(timed && timedOut) 需要设置超时
如果这种情况下,线程数>1或者阻塞队列为空,则需要减少一个worker
成功了,则返回null,无有效task
线程池中无论是否存在有效任务,必须是存在一个线程在
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 从阻塞队列中获取任务,可能是需要设置超时,如果一旦时间超时,则需要在规定的时间内获取到任务
if (r != null)
return r;
// 这一步也感觉很妙,如果没有有效的获取到任务,(可能是这个时候阻塞队列为空,也有可能是因为没有设置超时而没有获取到),自动的设置这个timeOut为true,加上超时设置
timedOut = true;
} catch (InterruptedException retry) {
// 如果发生了中断,则回复这个timeOut设置,进行重试操作
timedOut = false;
}
}
}
至此核心的函数都已经介绍完毕,提交任务,可以立刻处理任务就立马创建线程处理,如果当前无法马上处理则加入到阻塞队列中,如果加入到阻塞队列都失败,则调用相关的拒绝策略处理任务
Worker作为处理线程任务则一直在循环接受到新的任务或者从阻塞队列获取任务
3.3 其他方法
3.3.1 processWorkerExit(Worker w, boolean completedAbruptly)
此方法是在runWorker中被调用的,当runWorker无法获取到有效执行的task或者执行任务期间抛出各种异常导致的此worker无法在继续执行下去,该worker需要被消耗
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 如果这个值为true,则不是没有task导致的,而是在执行任务期间出现问题,故减1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
// 移除该worker,并汇总共完成的有效任务数,当然这需要确保线程安全
} finally {
mainLock.unlock();
}
tryTerminate();
// 尝试使得线程池状态变更为TERMINATED
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 线程池状态位RUNNING 或者 SHUTDOWN
if (!completedAbruptly) {
// 如果当前worker是由于没有有效task而退出的
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
// 如果设置了超时,而且有工作线程,则最少保存min=1个worker
min = 1;
// 这里min=1或者corePoolSize
if (workerCountOf(c) >= min)
return; // replacement not needed
// 如果当前没有工作线程,需要加1操作
}
// 直接创建一个新的空的worker
addWorker(null, false);
}
}
3.3.2 tryTerminate()
尝试让线程池状态变更为 TERMINATED,结束线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
1、线程池在RUNNING 阶段
2、线程池在TIDYING 或者 TERMINATED 阶段
3、线程池在SHUTDOWN 而且工作线程不为空
以上几种情况都直接返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 工作线程数不为0,则中断一个空闲的工作线程,返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 到这里来的可能就是SHUTDOWN 而且工作线程为空的情况
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 状态和工作线程数变更成功
try {
terminated(); // 这是一个空方法,可以自定义实现
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
3.3.3 interruptIdleWorkers(boolean onlyOne)
中断空闲的线程,如果onlyOne为true,则中断其中的一个
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
// 如果t线程不处于中断状态,尝试获取线程成功
try {
// 调用中断操作
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.4 线程池初始化
如上代码所示,起初创建线程池的时候,是不包含任何线程的,但是可以人为的创建线程
// 创建一个线程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
// 添加一个没有任务的空现场
}
// 创建核心线程数目个线程,直到addWorker返回false
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
3.5 任务队列
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
3.6 线程池监控
- getTaskCount : 线程池已经执行和未执行的任务总数
- getCompletedTaskCount : 线程池已经完成的任务数目
- getLargestPoolSize : 线程池曾创建过线程数最多的线程数
- getPoolSize : 线程池当前线程数目
- getActiveCount : 当前线程池正在执行的任务数目
3.7 配置线程池大小
- 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
- 如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
网友评论