为什么需要线程池
在生产环境中,为每个任务都分配一个线程,这种方法存在一些缺陷,尤其是当需要创建大量的线程时:
线程生命周期开销非常高:线程的创建与销毁都会需要JVM和操作系统参与提供一些辅助操作。在高并发的场景下,是非常消耗CPU计算资源的。
资源消耗:活跃的线程是会占用系统内存资源的,大量的线程对系统内存的消耗非常严重。
稳定性:线程数量在一定范围内是会提高系统资源利用率的,如果超出了这个限制,很可能会发生一些意向不到的问题,例如OOM。
线程池的出现很好的解决了这些问题,它将提交任务和执行任务分开(生产者-消费者模式),通过配置参数限制了线程池中最大可允许创建线程的数量。
线程池的基本概念
Integer corePoolSize: 线程池的最大空闲线程数,但变量allowCoreThreadTimeOut同样可以设置在一定时间内回收这些线程。
Integer maximumPoolSize:线程池最大的活跃线程数。
long keepAliveTime:当线程池内的线程数大于corePoolSize指定的线程数时,可以通过keepAliveTime来指定
TimeUnit unit:keepAliveTime的时间单位(时,分,秒,毫秒)
BlockingQueue<Runnable> workQueue:该队列用于暂存因为空闲线程导致没有执行的任务。
ThreadFactory threadFactory:线程池中的线程是通过ThreadFactory创建的。
RejectExecutionHandler handler:当BlockingQueue(阻塞队列)的暂存的任务数达到BlockingQueue容量的上限,并且此时暂无可用线程时,会执行该拒绝策略。
Worker类:提交到线程池中的任务都会被封装为一个Worker对象,并分配一个线程。在执行任务时会调用Worker类的run()方法。Worker类实现了Runnable的run方法,继承了AbstractQueueSynchronizer类,重写了tryAcquire,tryRelease方法,控制线程在运行过程中的interrupt操作,提供了同步机制。
线程池的执行流程
image.png常见线程池的使用方式
Executor框架
FixThreadPool
Executor框架提供了两个创建FixThreadPool线程池的方法,newFixThreadPool(int nThreads),newFixThreadPool(int nThreads,ThreadFactory threadFactory)。FixThreadPool限制了线程池中线程的数量。
SingleThreadPool
只有一个线程的线程池,提交到线程池中的任务会按照他们的提交顺序来执行。
CachedThreadPool
在程序执行过程中会创建与所需数量相同的线程,然后再它回收旧线程时停止创建新线程。
ScheduledThreadPoolExecutor
通过schedule(Runnable command, long delay, TimeUnit unit)方法,向ScheduledThreadPoolExecutor中提交任务时,可以设置延迟时间。设置延迟时间的任务会在一段时间后执行。
使用线程池时需要注意哪些事项
-
合理设置核心线程数大小
image.png
在设置线程池中线程的数量时,只要数量不是“过大”或者“过小”都可以,在设置完成后,需要充分的自测。在《Java并发编程实战》中给出了核心线程数的计算公式。
线程池的最优大小 = CPU核数 * CPU的期望利用率 * (1 + 线程等待时间/线程执行时间)
-
不同职责的线程池区分
项目中使用线程池时需要根据自己的应用场景来选择合适的线程池。不过在阿里编写的《阿里巴巴Java开发手册》中建议尽量使用ThreadPoolExecutor来构造适合自己的线程池。 -
选择适当的拒绝策略
我在另外一篇文章中有介绍 https://www.jianshu.com/p/5f70fbd70b84 -
避免使用ThreadLocal
线程池中线程的生命周期通常会超过任务的生命周期,所以ThreadLocal中以Thread ID作为Key来存储缓存可能会出现问题。
日常工作中的使用
需求:统计线程池中的任务一段时间内请求外部接口的次数
实现:可以利用ThreadPoolExecutor中的beforeExecute,afterExecute,terminated方法来实现这个要求。
具体代码如下。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class StatisticThreadFactoryPool extends ThreadPoolExecutor {
private final static ThreadLocal<Long> threadLocal = new ThreadLocal<>();
private final static List<StatisticDO> statisticDOS = new ArrayList<>(16);
public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* 在任务执行前,记录当前而任务执行的开始时间
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
// 执行父类的方法
super.beforeExecute(t, r);
// 存储当前的时间
threadLocal.set(System.nanoTime());
}
/**
* 在任务结束后,将统计信息保存到StatisticDO对象中。
* 记录任务的开始时间
* 记录任务的结束时间
* 计算执行时间
* 保存当前任务的标识
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
StatisticDO statisticDO = new StatisticDO();
statisticDO.setStartTime(threadLocal.get());
statisticDO.setEndTime(System.nanoTime());
statisticDO.setRunTime(statisticDO.getEndTime() - statisticDO.getStartTime());
statisticDO.setTask("thread:" + Thread.currentThread() + r.toString());
statisticDOS.add(statisticDO);
} finally {
super.afterExecute(r, t);
}
}
/**
* 在线程池的任务结束后,将统计信息放入redis缓存中,方便后续调用查询。
*
*/
@Override
protected void terminated() {
try {
// 将statisticDOS中的数据写入缓存中:redis。方便后面查询统计使用
threadLocal.remove();
} finally {
super.terminated();
}
}
class StatisticDO {
// 任务运行时间
private long runTime;
// 任务开始时间
private long startTime;
// 任务结束时间
private long endTime;
// 任务标识
private String task;
public long getRunTime() {
return runTime;
}
public void setRunTime(long runTime) {
this.runTime = runTime;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public String getTask() {
return task;
}
public void setTask(String task) {
this.task = task;
}
}
}
网友评论