ThreadPoolExecutor 我们在开发过程中经常用到,它的主要作用就是提前创建好若干个线程放在一个容器中。如果有任务需要处理,则将任务直接分配给线程池中的线程来执行就行,任务处理完以后这个线程不会被销毁,而是等待后续分配任务。
那它是如何实现线程复用的?今天我们就回答这个问题。
一、ThreadPoolExecutor的基本用法
1.1、TheadPooolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize, //核心线程数量
int maximumPoolSize, //最大线程数
long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory,//创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
- corePoolSize 代表缓存的核心线程个数
- maximumPoolSize 代表线程池最大的线程个数
- keepAliveTime 代表超时销毁时间,针对非核心线程,空闲若干时间就会被销毁。
- unit keepAliveTime的时间单位
- workQueue 尚未执行的任务(Runnable)队列。
- threadFactory 创建Thread类的工厂
- handler 任务无法执行时的处理操作
创建线程池
var threadPool = ThreadPoolExecutor(1,2,60,TimeUnit.SECONDS,LinkedBlockingDeque<Runnable>())
执行线程任务
threadPool.execute(Runnable {
})
ThreadPoolExecutor的主要逻辑是:
- 线程数少于核心线程数,新建线程执行任务
- 线程数等于核心线程数后,将任务加入阻塞队列
- 如果队列容量非常大,可以一直添加;如果队列容量有限,队列满了之后,则尝试创建一个非核心线程执行任务。
- 执行完成任务的线程 反复去任务队列任务来执行。
- 任务队列为空时,核心线程会阻塞(block),直到有新的任务。
1.2、常用线程池
newFixedThreadPool
newFixedThreadPool 的核心线程数和最大线程数都是指定值,当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
}
用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量。
缺点:任务队列没有上限,一直追加可能造成OOM
newCachedThreadPool
newCachedThreadPool 创建一个可缓存线程池。核心线程数为0,最大线程数为Integer.MAX_VALUE。如果线程池长度超过处理需要,可灵活回收空闲线程。
接收到新任务将被立即执行:若有空闲线程,则放到空闲线程执行;若无空闲线程,则创建新的非核心线程来执行。
创建的线程空闲60s 则会被回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
缺点:线程可以无限创建,当接收大量任务时 可能创建大量的线程,给JVM过大的负担。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
二、线程池实现原理
ThreadPoolExecutor的核心属性和方法
public class ThreadPoolExecutor extends AbstractExecutorService {
//线程池任务队列
private final BlockingQueue<Runnable> workQueue;
//线程池工作线程
private final HashSet<Worker> workers = new HashSet<>();
//线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
}
public void shutdown() {}
public List<Runnable> shutdownNow() {}
}
2.1、workQueue;
workQueue 是线程池的任务队列,里面缓存待执行的Runnable任务
2.2、workers
workers 是当前正在执行的工作线程集合。
2.3、ctl 线程池状态
ctl是一个原子类,主要作用是用来保存线程数量和线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量
//运行状态保存在 int 值的高 3 位 (所有数值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任务,并执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,但是执行队列中的任务
private static final int STOP = 1 << COUNT_BITS;// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int TIDYING = 2 << COUNT_BITS; //所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法执行完成
- RUNNING 线程池处于运行状态:可以接收新的任务,可以执行任务队列中的任务。
- SHUTDOWN 线程池处于关闭状态:不接收新的任务,但是任务队列中的任务会继续执行。
- STOP 线程池处处于停止状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。
- TIDYING 所有的任务都已经结束,线程数为0。处于此状态的线程池,即将调用terminated()方法。
- TERMINATED terminate方法执行完成。
2.4、shundown() 和showdonwNow()
- 执行 shutdown() 线程池会进入SHUTDOWN状态:不接收新的任务,但是任务队列中的任务会继续执行。
- 执行 shutdownNow() 线程池会进入STOP状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。
2.5、execute() 执行Runnable任务
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //1、工作线程数 小于核心线程个数,则尝试直接创建一个新的核心线程来执行任务。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //2、核心线程数已满,任务队列未满,则将任务添加到任务队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //线程状态二次检查,如果当前线程池关闭了,则移除该任务
reject(command);
else if (workerCountOf(recheck) == 0) //线程状态二次检查,如果当前工作线程数为0,则新建非核心线程来执行剩余任务
addWorker(null, false);
}
else if (!addWorker(command, false)) //3、任务队列已慢,则尝试创建一个新的非核心线程 处理该任务。
reject(command);
}
2.5.1、addWorker
如果工作线程数小于核心线程数的话,会调用 addWorker,创建一个工作线程。
大致做了两件事情:
1、状态检查,不允许创建线程的情况,直接返回false; 条件符合,则工作线程计数+1
- (1)如果线程处于非运行状态,不允许新创建线程,直接返回false
- (2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
- (3)通过 cas工作线程数加1
2、创建Worker实例,添加到workers集合,并开启线程
- (4)将新创建的 Worker 添加到 workers 集合中
- (5)如果 worker 添加成功,启动线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //goto 语句,避免死循环
//状态检查,不允许创建线程的情况,直接返回false; workerCount计数+1。
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//(1)如果线程处于非运行状态,不允许新创建线程,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { //自旋
int wc = workerCountOf(c);//获得 Worker 工作线程数
//(2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//(3)通过 cas工作线程数加1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //这里如果不相等,说明线程的状态发生了变化,继续重试
continue retry;
}
}
//创建Worker实例,添加到workers集合,并开启线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //构建一个 Worker,传入了一个 Runnable 对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//将新创建的Workder 添加到workers工作线程集合当中。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
workers.add(w); //(4)将新创建的 Worker 添加到 workers 集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//(5)如果 worker 添加成功,启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗)
}
return workerStarted;//返回结果
}
Worker 是什么?
Worker代表一个工作线程,内部持有一个thread和firstTask。
- firstTask 是线程初始化时要被首先执行的任务
- thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。
- 线程具体的执行操作代理到了外部的runWorker()方法中。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
}
runWorker 方法
runWorker 是工作线程 执行任务的主要场所。
- 如果 task 不为空,则开始执行 task
- 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
- 执行完毕后,通过 while 循环继续 getTask()取任务
- 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
...
try {
beforeExecute(wt, task);//任务执行前回调
Throwable thrown = null;
try {
task.run(); //执行具体的任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//任务执行后回调
}
} finally {
task = null; //task设置成null,保证下次进入while循环执行getTask()
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//线程执行完成,将当前worker 从wokers集合移除
}
}
整体看就是一个无限循环:
从队列提取任务->执行任务->队列提取任务->执行任务
当getTask() 返回空任务时,循环结束,该工作线程也就结束了。
getTask
线程池之所以可以复用线程,关键点就在getTask。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {//自旋
int c = ctl.get();
int rs = runStateOf(c);
//(1)对线程池状态的判断:
//线程池处于SHUTDOWN状态 且任务队列为空, 或者 线程池处于STOP状态,应该销毁当前线程,此时返回null。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//返回 null,则当前 worker 线程会退出
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
//(2) 工作队列中有未执行的任务 且工作线程数量超过了maximumPoolSize
//或者上一次取任务已经超时,则 返回null,当前线程退出。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
keepaliveTime 时间内没有获取到任务,则返回 null.
否则通过 take 方法阻塞式获取队列中的任务*/
//(3) timed = true,表示需要进行超时控制,采用workQueue.poll() 从队列中取任务,会阻塞keepAliveTime时间,超时后 timedOut = true,下次循环会直接返回null。 - 线程被回收。
timed = false,表示不需要进行超时控制,采用workQueue.take(),该线程会一直阻塞,直到有新的任务。 - 线程不被回收
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
return r;
timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
} catch (InterruptedException retry) {
timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
}
}
}
关键代码如下;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
默认 allowCoreThreadTimeOut为false
- 对于核心线程timed =false,workQueue.take() 取下一个任务,workQueue是LinkedBlockQueue,没有新任务时会一直阻塞在这里(线程不会被释放),当用新任务添加进来,take()返回新任务,线程继续执行。
- 对于非核心线程(wc > corePoolSize),采用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方式提取新任务,当workQueue为空时,会等待keepAliveTime,时间超时后,返回null。当前线程就会被释放。
至此 ThreadPoolExecutor 可以服用线程的原理也就清除了。
processWorkerExit
runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit,将当前线程从worker中移除,并执行tryTerminate()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
2.6、拒绝策略
当工作线程数超过corePoolSize,且工作队列已满,且线程总数也达到了maximumPoolSize(非核心线程也无法创建),此时就会拒绝新任务:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
三、其他
3.1、如何取消任务
ThreadPoolExecutor.execute() 提交一个任务是无法取消该任务的。
那提交了一个任务之后,想取消执行该任务 有没有途径呢?
答案是submit()方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit方法返回一个Future对象
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future.cancel(true) 可以取消Runnable任务。
- 任务尚未执行,调用cancel()后,该任务就永远不会被执行
- 任务正在执行,mayInterruptIfRunning=true时,会interrupted Thread.
- Attempts to cancel execution of this task. This attempt will
- fail if the task has already completed, has already been cancelled,
- or could not be cancelled for some other reason. If successful,
- and this task has not started when {@code cancel} is called,
- this task should never run. If the task has already started,
- then the {@code mayInterruptIfRunning} parameter determines
- whether the thread executing this task should be interrupted in
- an attempt to stop the task.
测试代码
fun testThreadPool(){
var threadPool = ThreadPoolExecutor(1,5,60,
TimeUnit.SECONDS,
LinkedBlockingDeque<Runnable>()
);
var fu = threadPool.submit(MyRunaable())
Thread.sleep(10000)
fu.cancel(true)
Log.d("feifei","fu cancel")
}
class MyRunaable():Runnable{
var index = 0;
override fun run() {
while(true){
Thread.sleep(1000)
Log.d("feifei","MyRunaable run():${index++}")
}
}
}
输出结果:
2021-01-12 15:12:29.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():0
2021-01-12 15:12:30.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():1
2021-01-12 15:12:31.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():2
2021-01-12 15:12:32.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():3
2021-01-12 15:12:33.927 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():4
2021-01-12 15:12:34.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():5
2021-01-12 15:12:35.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():6
2021-01-12 15:12:36.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():7
2021-01-12 15:12:37.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():8
2021-01-12 15:12:38.925 6847-6847/com.sogou.iot.myapplication D/feifei: fu cancel
3.2、线程池的监控
如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简单给大家演示一个案例
3.3、使用线程池注意事项
3.3.1、 使用ThreadPoolExecutor而非Executors创建线程池
用 Executors创建线程池 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题。
比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题。
3.3.2、合理设置线程池大小。
需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型
- 如果是 CPU 密集型:主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高。CPU 核心数=最大同时执行线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。
- 如果是 IO 密集型:主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。一般可以配置 cpu 核心数的 2 倍。
网友评论