传统的多线程是通过继承Thread类及实现Runnable接口来实现的,每次创建及销毁线程都会消耗资源、响应速度慢,且线程缺乏统一管理,容易出现阻塞的情况,针对以上缺点,线程池就出现了。
一.简介
a.定义
线程池是一个创建使用线程并能保存使用过的线程以达到复用的对象,简单的说就是一块缓存了一定数量线程的区域。
b.作用
1.复用线程:线程执行完不会立刻退出,继续执行其他线程;
2.管理线程:统一分配、管理、控制最大并发数;
c.优点
1.降低因频繁创建&销毁线程带来的性能开销,复用缓存在线程池中的线程;
2.提高线程执行效率&响应速度,复用线程:响应速度;管理线程:优化线程执行顺序,避免大量线程抢占资源导致阻塞现象;
3.提高对线程的管理度;
二.使用流程
线程池的使用也比较简单,流程如下:
a.创建线程池,通过配置线程池的参数,从而实现自己所需的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
sPoolWorkQueue,
sThreadFactory);
b.向线程池提交任务:execute()
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
...
//执行任务
}
});
c.关闭线程池shutdown()
threadPoolExecutor.shutdown();
三.工作原理
接下来通过源码来介绍一下ThreadPoolExecutor内部实现及工作原理。
a.参数介绍
参数 | 含义 | 备注 |
---|---|---|
corePoolSize | 核心线程数 | 默认情况下,核心线程会一直存活 |
maximumPoolSize | 线程池容纳的最大线程数 | 当活动线程数大于该值时,后续的新任务会阻塞 |
keepAliveTime | 非核心线程闲置超时时间 | 超过该时间后,非核心线程会被回收,当设置allowCoreThreadTimeOut(true),核心线程也会被回收 |
unit | 时间单位 | 常用TimeUnit.SECONDS |
workQueue | 任务队列 | 执行execute()后,如果核心线程满了,会将Runnable加入到该参数内 |
threadFactory | 线程工厂 | 为线程池创建新线程 |
b.源码分析
线程池的最终实现类是ThreadPoolExecutor,通过实现可以一步一步的看到,父接口为Executor:
public interface Executor {
void execute(Runnable command);
}
其他的继承及实现关系就不一一列举了,直接通过以下图来看一下:
image.png
c.ThreadPoolExecutor内部实现
从构造方法开始看:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通过以上可以看到,在创建ThreadPoolExecutor时,对传入的参数是有要求的:corePoolSize不能小于0;maximumPoolSize需要大于0,且需要大于等于corePoolSize;keepAliveTime大于0;workQueue、threadFactory都不能为null。
在创建完后就需要执行Runnable了,看以下execute()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
.....
int c = ctl.get();
//--------------分析1-----------------
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//--------------分析2-----------------
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//--------------分析3-----------------
else if (!addWorker(command, false))
reject(command);
}
在execute()内部主要执行的逻辑如下:
分析点1:如果当前线程数未超过核心线程数,则将runnable作为参数执行addWorker(),true表示核心线程,false表示非核心线程;
分析点2:核心线程满了,如果线程池处于运行状态则往workQueue队列中添加任务,接下来判断是否需要拒绝或者执行addWorker();
分析点3:以上都不满足时[corePoolSize=0且没有运行的线程,或workQueue已经满了],执行addWorker()添加runnable,失败则执行拒绝策略;
总结一下:线程池对线程创建的管理,流程图如下:
以上可以看到,核心线程数量或非核心线程队列不满时,就执行addWorker(),否则执行reject(),接下来看一下addWorker()执行逻辑:
private boolean addWorker(Runnable firstTask, boolean core) {
......
//对添加时一些状态进行判断,提前判断是否成功
......
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//--------分析1----------
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//----分析2-----
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在执行addWorker时,主要做了以下两件事:
分析点1:将runnable作为参数创建Worker对象w,然后获取w内部的变量thread;
分析点2:调用start()来启动thread;
在addWorker()内部会将runnable作为参数传给Worker,然后从Worker内部读取变量thread,看一下Worker类的实现:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
......
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
.......
.......
}
Worker实现了Runnable接口,在Worker内部,进行了赋值及创建操作,先将execute()时传入的runnable赋值给内部变量firstTask,然后通过ThreadFactory.newThread(this)创建Thread,上面讲到在addWorker内部执行t.start()后,会执行到Worker内部的run()方法,接着会执行runWorker(this),一起看一下:
final void runWorker(Worker w) {
//-------------分析1--------------
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//-------------分析2-------------------------
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//----分析3-----
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
前面可以看到,runWorker是执行在子线程内部,主要执行了三件事:
分析1:获取当前线程,当执行shutdown()时需要将线程interrupt(),接下来从Worker内部取到firstTask,即execute传入的runnable,接下来会执行;
分析2:while循环,task不空直接执行;否则执行getTask()去获取,不为空直接执行;
分析3:对有效的task执行run(),由于是在子线程中执行,因此直接run()即可,不需要start();
前面看到,在while内部有执行getTask(),一起看一下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//-------------------------分析1---------------------------
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//-------------------------分析2---------------------------
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()是从workQueue内部获取接下来需要执行的runnable,内部主要做了两件事:
分析1:先获取到当前正在执行工作的线程数量wc,通过判断allowCoreThreadTimeOut[在创建ThreadPoolExecutor时可以进行设置]及wc > corePoolSize来确定timed值;
分析2:通过timed值来决定执行poll()或者take(),如果WorkQueue中有未执行的线程时,两者作用是相同的,立刻返回线程;如果WorkQueue中没有线程时,poll()有超时返回,take()会一直阻塞;如果allowCoreThreadTimeOut为true,则核心线程在超时时间没有使用的话,是需要退出的;wc > corePoolSize时,非核心线程在超时时间没有使用的话,是需要退出的;
allowCoreThreadTimeOut是可以通过以下方式进行设置的:
threadPoolExecutor.allowCoreThreadTimeOut(true);
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
如果没有进行设置,那么corePoolSize数量的核心线程会一直存在。
总结一下:ThreadPoolExecutor内部的核心线程如何确保一直存在,不退出?
上面分析已经回答了这个问题,每个线程在执行时会执行runWorker(),而在runWorker()内部有while()循环会判断getTask(),在getTask()内部会对当前执行的线程数量及allowCoreThreadTimeOut进行实时判断,如果工作数量大于corePoolSize且workQueue中没有未执行的线程时,会执行poll()超时退出;如果工作数量不大于corePoolSize且workQueue中没有未执行的线程时,会执行take()进行阻塞,确保有corePoolSize数量的线程阻塞在runWorker()内部的while()循环不退出。
如果需要关闭线程池,需要如何操作呢,看一下shutdown()方法:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
以上可以看到,关闭线程池的原理:a. 遍历线程池中的所有工作线程;b. 逐个调用线程的interrupt()中断线程(注:无法响应中断的任务可能永远无法终止)
也可调用shutdownNow()来关闭线程池,二者区别:
shutdown():设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程;
shutdownNow():设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;
使用建议:一般调用shutdown()关闭线程池;若任务不一定要执行完,则调用shutdownNow();
总结一下:ThreadPoolExecutor在执行execute()及shutdown()时的调用关系,流程图如下:
1.客户端在创建完线程池后,调用execute()来执行一个runnable任务;
2.在execute()内部会执行addWorker()来创建一个Worker对象,然后调用线程的start()方法,即执行Worker内部的run()方法;
3.Worker内部的run()方法中会调用到runWorker()方法;
4.runWorker()方法内部会在while()循环内执行getTask()来不断的从workQueue中获取未执行的runnable然后执行;
5.getTask()内部会实时判断当前正在执行的Worker数量与corePoolSize进行比较,如果数量不大于corePoolSize且workQueue为空,会执行task()进行阻塞,确保corePoolSize的线程不退出,即核心线程不退出;
6.执行shutdown()来中断那些没有在执行的线程;
四.线程池类型
线程池可以通过Executors来进行不同类型的创建,具体分为四种不同的类型,如下:
a.newCachedThreadPool
可缓存线程池:不固定线程数量,且支持最大为Integer.MAX_VALUE的线程数量:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1、线程数无限制
2、有空闲线程则复用空闲线程,若无空闲线程则新建线程
3、一定程度上减少频繁创建/销毁线程,减少系统开销
b.newFixedThreadPool
固定线程数量的线程池:定长线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1、可控制线程最大并发数(同时执行的线程数)
2、超出的线程会在队列中等待。
c.newSingleThreadExecutor
单线程化的线程池:可以理解为线程数量为1的FixedThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1、有且仅有一个工作线程执行任务
2、所有任务按照指定顺序执行,即遵循队列的入队出队规则
d.newScheduledThreadPool
定时以指定周期循环执行任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
五.队列类型
一般来说,等待队列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 与 SynchronousQueue 。
假设向线程池提交任务时,核心线程都被占用的情况下:
ArrayBlockingQueue :基于数组的阻塞队列,初始化需要指定固定大小。
当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了之后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。所以最终可能出现后提交的任务先执行,而先提交的任务一直在等待。
LinkedBlockingQueue:基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定。
当指定大小后,行为就和 ArrayBlockingQueue一致。而如果未指定大小,则会使用默认的 Integer.MAX_VALUE 作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功。最终意味着所有任务都是在核心线程执行。如果核心线程一直被占,那就一直等待。
SynchronousQueue :无容量的队列。
使用此队列意味着希望获得最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败。而失败后如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Integer.MAX_VALUE就实现了真正的无等待。
但是需要注意的是,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个。
网友评论