线程池
线程池做的工作主要是控制运行的线程的数量 ,处理过程中将任务加入队列 ,然后在线程创建后启动这些任务, 如果先生超过了最大数量,超出的数量的线程排队等候, 等其他线程执行完毕,再从队列中取出任务来执行.
他的主要特点为: <mark>线程复用:控制最大并发数:管理线程.</mark>
-
第一:降低资源消耗.通过重复利用自己创建的线程降低线程创建和销毁造成的消耗.
-
第二: 提高响应速度.当任务到达时,任务可以不需要等到线程和粗昂就爱你就能立即执行.
-
第三: 提高线程的可管理性.线程是稀缺资源,如果无限的创阿金,不仅会消耗资源,还会较低系统的稳定性,使用线程池可以进行统一分配,调优和监控.
线程池的使用
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类.
image-20200927204213636 image-20200927204431109 image-20200927204443158Executor
和ExecutorService
就相当于Collection和List。我们使用ExecutorService,通过Executors
工具类获取常用的三种线程池。
Executors.newFixedThreadPool(int)
创建一个定长线程池 ,可控制线程的最大并发数,超出的线程会在队列中等待.
image-20200927205413344 image-20200927205436770 image-20200927205557347 image-20200927205237881可以看见这10个用户都被5个线程(最多5个)给执行了。
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5个处理线程
try {
//模拟10个用户来办理业务,每个业务就是一个来自外部的请求线程
for (int i = 1; i <= 10; i++) {
//线程执行
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程关闭
threadPool.shutdown();
}
}
}
查看底层源码
image-20200927210418015底层是返回个ThreadPoolExecutor
对象。
newFixedThreadPool创建的线程池corePoolSize和MaxmumPoolSize是相等的都为给定的初始值,它使用的的 LinkedBlockingQueue 也就是该队列能放Integer.MAX_VALUE
个数。最大承载辆为初始值+Integer.MAX_VALUE,相当于无穷大。
执行一个长期的任务,性能好很多
Executors.newSingleThreadExecutor()
<mark>创建一个单线程化的线程池 ,它只会用唯一的工作线程来执行任务,保证所有任务都按照指定顺序执行</mark>
image-20200927205925427public class MyThreadPoolDemo {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5个处理线程
ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1个处理线程
try {
//模拟10个用户来办理业务,每个业务就是一个来自外部的请求线程
for (int i = 1; i <= 10; i++) {
//线程执行
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程关闭
threadPool.shutdown();
}
}
}
image-20200927210625484
newSingleThreadExecutor 将 corePoolSize和MaxmumPoolSize都设置为1,它使用的的 LinkedBlockingQueue
一个任务一个线程执行的任务场景
Executors.newCachedThreadPool()
创建一个可缓存线程池 ,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程.
image-20200927210250061public class MyThreadPoolDemo {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5个处理线程
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1个处理线程
ExecutorService threadPool = Executors.newCachedThreadPool();//一池N个处理
try {
//模拟10个用户来办理业务,每个业务就是一个来自外部的请求线程
for (int i = 1; i <= 10; i++) {
//线程执行
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程关闭
threadPool.shutdown();
}
}
}
image-20200927210725975
newCachedThreadPool 将 corePoolSize设置为0,MaxmumPoolSize设置为 Integer. MAX_VALUE ,它使用的 是SynchronousQUeue ,也就是说来了任务就创建线程运行,如果线程空闲超过60秒,就销毁线程。
适用:执行很多短期异步的小程序或者负载较轻的服务器
虽然Java中提供了现成的线程池可以使用,但是实际开发中我们却一个都不用。而是使用ThreadPoolExecutor
自定义线程池。从阿里巴巴开发手册得知
-
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
-
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:- FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
要使用ThreadPoolExecutor自定义线程池前,我们要先了解线程池的七大参数。
线程池七大参数
image-20200927215907478public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
线程池中的常驻核心线程数
1.在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近视理解为今日当值线程
2.当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.
maximumPoolSize
线程池能够容纳同时执行的最大线程数,此值大于等于1
keepAliveTime
多余的空闲线程存活时间,当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止。
默认情况下:只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程中的线程数不大于corepoolSIze,
unit
keepAliveTime的计量单位
workQueue
工作队列,被提交但尚未被执行的任务.
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
threadFactory
表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可
handler
拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示 数(maxnumPoolSize)时如何来拒绝.
详解
image-20200928091830633就比如一家银行, maximumPoolSize
代表着这家银行有多少个处理业务的窗口,假如有5个。一般来说银行的窗口并不是全部开放处理业务的,这是开放一部分,corePoolSize
就是开放的处理业务的窗口假设有2个。那么剩余的3个是不开放的也就是没有工作人员在的。当人需要来银行处理业务时,人们就去开放的窗口处理业务。此时银行最多处理两个。剩余的人只能继续在候客区workQueue
等待。候客区有3个位置,也就是最多只能坐3个人。当后续还有人来办理业务的话,银行就没有地方坐了因为候客区也满了,业务窗口也满了,此时经理只能将剩余的3个窗口全部开放,让下班的银行员工回来加班。此时5个窗口已经全部开放。那么此时银行能接待的最大人数就是maximumPoolSize+workQueue
为5+3等于8。此时如果还有人来的话,银行已经没有地方坐了(假设人不愿意站)。那么经理就用有相应的处理策略handler
来处理这种情况。此时银行处理的高峰已经过去了,人也慢慢走了,加班的员工已经没有业务来处理了,当被打电话过来加班的员工,等待了keepAliveTime
单位unit
时间后,经理看高峰已经过了,就让他们下班了。threadFactory
则代表这家银行的服饰,logo采用什么样式。
1、在创建了线程池后,开始等待请求。
2、当调用 execute() 方法添加一个请求任务时,线程池会做出如下判断:
2.1如果正在运行的线程数量小于 corePoolSize ,那么马上创建线程运行这个任务;
2.2如果正在运行的线程数量大于或等于 corePoolSize ,那么将这个任务 放入队列 ;
2.3如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize ,那么还是要创建非核心线程立刻运行这个任务;
2.4如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize ,那么线程池会 启动饱和拒绝策略来执行 。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间( keepAliveTime )时,线程会判断:
如果当前运行的线程数大于 corePoolSize ,那么这个线程就被停掉。
所以线程池的所有任务完成后, 它最终会收缩到 corePoolSize 的大小 。
四大拒绝策略
等待队列已经排满了 ,再也塞不下新任务了 同时, 线程池中的max线程也达到了 ,无法继续为新任务服务。 这个是时候我们就需要拒绝策略机制合理的处理这个问题。
-
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
-
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
-
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后尝试把这次拒绝的任务放入队列。
-
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。
如果允许任务丢失,这是最好的一种策略。
以上内置拒绝策略均实现了RejectedExecutionHandle接口
手动创建线程池
通过new ThreadPoolExecutor
配置参数,手动创建线程池。
核心数为corePoolSize为2,maximumPoolSize为5,阻塞队列大小为3。运行查看结果~
image-20200928094156526此时这个线程的最大处理数为maximumPoolSize+workQueue
为5+3等于8。此时我么难道拒绝策略AbortPolicy
,当我们的线程数超过8为9时,就会可能引发异常。
抛出java.util.concurrent.RejectedExecutionException
异常。
我们修改拒绝策略为 CallerRunsPolicy
当超出处理后线程池无法处理后,就会将其返回给调用者处理,这里我们使用的是main线程调用,所以就是main线程处理。
修改拒绝策略为DiscardPolicy
。运行查看结果。
修改策略为DiscardOldestPolicy
抛弃队列中等待最久的任务,然后尝试把这次拒绝的任务放入队列。
合理配置线程池
任务一般可分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池。
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
IO密集型即该任务需要大量的IO,即大量的阻塞。
通过Runtime.getRuntime().availableProcessors()
获取CPU核心数
- CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1
。
因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。 - IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数
。
IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。 - 混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
网友评论