目录
一、线程池的优点
二、线程池创建中各个参数的含义
三、ThreadPoolExecutor的任务添加执行流程
四、线程池的状态
五、提交任务
六、关闭线程池
七、线程池内任务的取消
八、合理配置线程池
九、线程池的监控。
十、任务队列长度的动态设置。
一、线程池的优点
- 重用线程池里的线程,避免创建和销毁线程所带来的性能开销。
- 有效控制最大并发数,避免造成线程间抢占系统资源而造成阻塞。
- 提高线程可管理性,可以统一进行分配,并提供定时执行以及指定间隔循环执行等功能。
二、线程池创建中各个参数的含义
ThreadPoolExecutor是线程池的实现,它的构造方法提供了一系列参数来配置线程池,如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
必须的五个参数:
int corePoolSize:核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
int maximumPoolSize:线程池所能容纳的最大线程数,当活动线程数达到这个数值后,后续的新任务将会被阻塞。该值等于核心线程数量 + 非核心线程数量。如果任务队列使用无界的阻塞队列,该参数没有什么效果。
int keepAliveTime:非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。
TimeUnit unit:用于指定keepAliveTime参数的时间单位。
BlockingQueue workQueue:线程池中的任务队列,通过线程池的execute方法提交的Runnable对象存储在这个参数中。常用阻塞队列有:
1.ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置
2.LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列(常用)。此队列的默认和最大长度为Integer.MAX_VALUE,它按照先进先出的原则对元素进行排序。
3.PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
4.DelayQueue:一个使用优先级队列实现的无界阻塞队列,支持延时获取元素,队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。DelayQueue非常有用,可以将DelayQueue运用在缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
5.SynchronousQueue:一个不存储元素的阻塞队列(常用)。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。
非必须参数:
ThreadFactory threadFactory:创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
RejectedExecutionHandler handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
1.ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
2.ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
3.ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
4.ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
三、ThreadPoolExecutor的任务添加执行流程
1.线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
2.线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
3.当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
4.缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
四、线程池的状态
ThreadPoolExecutor类中使用了一些final int常量变量来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-
线程池创建后处于RUNNING状态。
-
调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。
-
调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
-
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
五、提交任务
- execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
- submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
六、关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
七、线程池内任务的取消
方案1:直接传入Runnable 的 子类Futuratask ,如下代码所示
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
Callable callable = new Callable() {
@Override
public Object call() throws Exception {
System.out.println("hehe");
return null;
}
};
FutureTask<String> futureTask = new FutureTask<>(callable);
threadPoolExecutor.execute(futureTask);
if (!futureTask.isDone()) {
futureTask.cancel(true);
}
方案2:通过submit方法的返回值来实现。如下代码所示
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
Future future = threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("hehe");
}
});
if(!future.isDone()){
future.cancel(true);
}
八、合理配置线程池
我们先来看这么个问题,假设1秒内任务数量范围是500-1000,每个任务处理时间是0.1秒内,用户最大忍耐时间是1秒,在这种情况下,我们需要多少个线程来处理任务呢?
按照纯数学方法来计算:
tasks : 每秒的任务数
taskcost:每个任务花费时间
responsetime:系统允许容忍的最大响应时间
线程数 = 每秒需要多少个线程来处理?
threadCount = tasks / (1 / taskcost) = taskstaskcost= (500~1000)0.1 = 50~100 个线程 。
按照这样的数学计算的话,可以这么认为,最少需要设置 50~100个线程。
上述的计算也是理论层面的,是基于CPU性能足够的情况下来考虑的这样的计算不能够完全应用于我们实际的线程池参数设置去,但是我们也应该在实际的压测中去设置这样的理论值去看看压测结果。
有了上面的基础,我们来看看,实际情况下(考虑cpu性能、io性能等)我们如何来设置线程池的值来进行系统压测。
1.经验值
配置线程数量之前,首先要看任务的类型是 IO密集型,还是CPU密集型?
什么是IO密集型?
比如:频繁读取磁盘上的数据,或者需要通过网络远程调用接口。
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO【博主:按照这个我理解的IO就是指把内容从硬盘上读到内存的过程,或者是从网络上接收信息到本机内存的过程】的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
什么是CPU密集型?
比如:非常复杂的调用,循环次数很多,或者递归调用层次很深等。
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
IO密集型配置线程数经验值是:2N,其中N代表CPU核数。
CPU密集型配置线程数经验值是:N + 1,其中N代表CPU核数。
如果获取N的值?
int availableProcessors = Runtime.getRuntime().availableProcessors();
那么问题来了,混合型(既包含IO密集型,又包含CPU密集型)的如何配置线程数?
混合型如果IO密集型,和CPU密集型的执行时间相差不太大,可以拆分开,以便于更好配置。如果执行时间相差太大,优化的意义不大,比如IO密集型耗时60s,CPU密集型耗时1s。
2.最佳线程数目算法
除了上面介绍是经验值之外,其实还提供了计算公式:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
因为很显然,线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
虽说最佳线程数目算法更准确,但是线程等待时间和线程CPU时间不好测量,实际情况使用得比较少,一般用经验值就差不多了。再配合系统压测,基本可以确定最适合的线程数。
有了上面的之后,我们再次看看一开始提到的问题。假设1秒内任务数量范围是500-1000,每个任务处理时间是0.1秒内,用户最大忍耐时间是1秒,在这种情况下,我们应该怎么配置线程池的各个参数呢?
先设置核心线程数和最大线程数为 2N(CPU密集型任务为N+1) ,任务队列长度为 1000 来测试, 再设置 核心线程数 和最大线程数为数学计算理论值 100(先测按最大线程来测试) ,队列长度为 1000来测试。往线程池中添加1000个任务,不断在经验值和理论之间这个范围调整核心线程数量(最大线程数量和核心线程数量保持一致),来压测调整出最后的最佳线程数量。
以下是个人建议:
对于非常驻型应用(不会在后台运行,退出界面则关闭应用):
直接设置核心线程数量和最大线程数量为压测出最佳线程数量。任务队列长度根据实际需求考虑,如果加入线程池的任务认定为是无法拒绝的,则设置任务队列为无界队列,然后根据响应时间需求看看规定时间范围内能完成多少个任务,监听任务队列长度,超出能完成的任务长度则给用户友好提示信息。如果任务是可以拒绝的,则考虑在拒绝策行策略的回调上给与用户一定的友好提示,如提示系统繁忙之类的。
对于后台型应用(即使退出界面也会在后台常驻,这种类型的应用占用过多资源不合适):
可以根据大部分情况下任务的并发程度来设置核心线程数量。如果加入线程池的任务认定为是无法拒绝,则设置最大线程数和核心线程数量一致,设置任务队列为无界队列,并监听任务队列内任务的个数情况来实时调整线程池中的最大线程数量和核心线程数量(先设置最大线程数量,再设置核心线程数量,设置的值一致)。如果任务是可以拒绝的,一般会这样设置,任务队列长度为 (核心线程数量/最佳线程数量)* 最大任务个数 ,最大线程数量设置为 最佳线程数量 ,这样的设计一定程度有可能会造成任务被拒绝,但是既然我们关注大部分情况下任务的并发程度,就需要有这样的牺牲。我们也可以根据实际任务是否存在任务拒绝的情况来调整任务队列的长度。
监听任务队列长度可以通过自定义队列类,在offer 和 take方法内实现。
这些参数的设置需要以下的地方:
1.如果设置任务队列为无界队列,最大线程数基本会没有效果,这是线程池添加任务的原理导致的,任务会一直累加在任务队列中,线程数量只会达到核心线程数量的上限。
2.如果核心线程数量和最大线程数量不一致,需要注意任务队列的长度,如果过长会导致一直线程池一直达不到最大线程数量,如果太短,也会导致任务添加失败。
3.利用好拒绝执行策略,可以给与用户友好提示以及收集线程池信息。
九、线程池的监控。
线程池实际情况的监控会影响我们对线程池参数的配置,直接影响到我们的应用性能。
1、可以通过线程池提供的参数读线程池进行监控,有以下属性可以使用:
-
taskCount:线程池需要执行的任务数量,包括已经执行完的、未执行的和正在执行的。
-
completedTaskCount:线程池在运行过程中已完成的任务数量,completedTaskCount <= taskCount。
-
largestPoolSize:线程池曾经创建过的最大线程数量,通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
-
getPoolSize: 线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以线程池的线程数量只增不减。
-
getActiveCount:获取活动的线程数。
2、通过继承线程池并重写线程池的 beforeExecute,afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
十、线程池参数的动态设置。
有时候我们不希望设置无界的任务队列,但是也不希望加入线程池的任务被拒绝时,我们可以尝试动态设置任务队列的长度,根据任务队列内的任务添加的情况,来动态调整任务队列的长度,就如同 List 的动态扩容一样 。
方案:
我们可以通过自己实现任务队列类来实现,可以简单地copy LinkedBlockingQueue 的源码 ,在 添加任务的方法内 offer()内 判断任务内存在的任务长度 来动态修改任务队列可容纳的任务长度 。
注意点:
如果是 corePoolSize 和 maximumPoolSize 设置不一致的线程,需要注意队列的长度 影响到最大线程数量的创建。
参考文章链接:
Java线程池面试必备:核心参数、工作流、监控、调优手段
CPU密集型操作与IO密集型操作区分
线程池原理
网友评论