线程池
1. 简介
当一个程序中需要多个task
需要被并发执行时,最直接的方式是为每一个task
创建一个线程去执行,但这样会带来以下问题:
-
大量线程创建与运行会导致系统不断的启动和关闭新线程,会过渡消耗系统资源.
-
过度切换线程的危险,从而可能导致系统的崩溃.
-
同时创建过多的线程意味着要创建过多的
Thread
对象,这样也会额外增大jvm
的垃圾回收压力
在这种情况下,引入"池化技术"是必要的,在实际开发中这种技术得到很多的应用,例如数据库连接池等
池化技术可以简单理解为是一个池子,在这个池子存放着固定的资源,这些资源可以是线程,也可以数据库连接,具体取决于是怎么类型的池
这些资源不会"消失",而是可以被多次复用,从而达到节省资源开销等目的
image-20211102172615693
线程池就是"池化技术"的一种体现,使用该技术可以避免上文中提到的问题,并且好处众多,如下:
- 不需要创建大量线程,只需要创建一个线程池即可,让线程池去管理线程
- 加快程序响应速度,合理利用CPU资源
- ......
2. 参数
创建线程池会需要传入几个参数才能创建成功,如下:
参数名 | 说明 |
---|---|
corePoolSize |
核心线程数 |
maxPoolSize |
最大线程数 |
keepAliveTime |
保持存活时间 |
workQueue |
任务存储队列 |
threadFactory |
线程工厂 |
rejectHandler |
拒绝处理器 |
关于线程池中线程创建过程大体流程如下(含参数解释):
-
当线程池创建并初始化完成,此时线程池里面并没有任何资源,当有任务过来需要被执行时才会去创建核心线程执行任务
-
线程不会无限制创建,当创建线程数超过
corePoolSize
时,就会把任务存储在workQueue
中如果创建的线程数没有超过
corePoolSize
时,即使线程池有线程时空闲的,也还是会创建线程 -
当
workQueue
中存储任务已满,则再会去创建线程从队列中拉取任务执行,线程也不会无限制创建,当创建的线程达到maxPoolSize
时,则不会创建了 -
当线程达到
maxPoolSize
,且workQueue
存满时,则会根据rejectHandler
执行拒绝策略 -
最后多余非核心线程的空闲时间超过配置的
keepAliveTime
,那么线程进行停止销毁核心线程会一直存在,不会销毁
具体流程图如下:
image-20211102180822666ThreadFactory
线程工厂,用来创建线程的,在创建线程池时,可以使用开发人员定义的线程工厂,也可以使用默认提供的ThreadFactory
默认的ThreadFactory
使用Executors.defaultThreadFactory()
创建,该线程工程创建出来的线程都是在同一个线程组,且都不是守护线程
如果开发人员想要自己指定创建出来的线程名,线程组,优先级,是否为守护线程则可以使用自己的线程工厂
WorkQueue
工作队列,或者叫任务队列,用来存储任务,当核心线程已满,且核心线程都忙碌时,则将任务存储到工作队列中,直接工作队列也存储满,才会去创建非核心线程,如上文图
工作队列为阻塞队列,常用一般分为三个,可以由开发人员自由选择
-
SynchronousQueue
该队列实现了
image-20211103102056762BlockingQueue
,类图如下:该队列可以理解为是一个直接交接队列,或者为中转队列,当有任务过来,该队列就会立马交给线程池的线程执行
该队列是没有容量的,因此如果线程池采用这种可以将此案成maxPoolSize设置大一些
这样只要有任务过来,该队列就会交给线程池执行
-
LinkedBlockingQueue
该队列可以理解为"无边界队列",同样的也实现了
image-20211103103420548BlockingQueue
,如下:当创建该队列时,如果没有指定大小,那么该队列则无上限
如果采用该队列,意味着maxPoolSize参数无作用,因为队列有可能永远存储不满
注意:其实也不是存储不满,存储的上限时
Integer.MAX_VALUE
-
ArrayBlockingQueue
该队列可以理解为"有边界队列",同样的也实现了
image-20211103103549737BlockingQueue
,如下:这种队列创建就需要指定大小,这样也就意味着
maxPoolSize
是有意义的了具体选择何种队列,则需要根据具体场景来进行选择
3. 创建
线程池创建分为两大类:
-
手动创建
手动创建,即自己创建线程池对象
ThreadPoolExecutor
,这种方式可以更加的去理解线程池规则,规避风险在阿里巴巴开发手册中,也提到过,系统开发过程中,不允许使用
image-20211103104631291jdk
默认提供的线程池,如图: -
自动创建
自动创建即使用
jdk
默认提供的线程池,虽然阿里巴巴开发手册上不允许使用,但是还是需要了解一下
3.1 自动
jdk
默认封装的线程池主要有以下:
FixedThreadPool
SingleThreadPool
CachedThreadPool
ScheduledThreadPool
接下来就从其说明,使用,缺点问题方面探讨这些线程池之间的优缺点
3.1.1 FixedThreadPool
-
说明
这种线程池创建时,只需要传入一个核心线程数即可,如下:
ExecutorService executor = Executors.newFixedThreadPool(2);
从源码其源码可知,该线程池核心线程数与最大线程数一样,采用
image-20211103112142577LinkedBlockingQueue
,如下:从源码可知,其最大线程数并没有任何意义,其工作队列无上限,意味着任务的存储无上限(其实也不是无上限,最大上限为
Integer.maxValue
) -
使用
使用该类型线程池处理线程,如下:
package com.tomato.thread.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixThreadPool { public static void main(String[] args) { // 创建线程池,并指定核心线程数为2 ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { // 循环往线程池中提交任务,循环十次,即意味着提交了十个任务 // 由于核心线程只有2个,意味着最多只有2个线程在执行任务 // 没有执行的任务就在等待 // 任务都是Runable类型对象,所以这里为了简便用了lamda executorService.execute(() -> { System.out.println( Thread.currentThread().getName()); }); } } }
运行结果如下:
image-20211103113109106从图中可以看出,最多两个线程在执行任务
-
缺点
由于任务存储队列没有上限,假如执行的任务耗时较久,在任务较多的情况下,就意味着存储队列中会不停的存储任务,这样会导致最后
image-20211109102829059oom
,如图:代码演示如下:
package com.tomato.thread.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixThreadPoolOOM { public static void main(String[] args) { // 为了更好的测试,这里将核心线程数设置为1 ExecutorService executorService = Executors.newFixedThreadPool(1); for(int i = 1; i < 100_000; i++) { // 往线程池中添加10W个任务 executorService.execute(() -> { try { // 这里休眠50s是为了模拟耗时操作 // 当核心在执行该任务时,其他任务没有线程去被执行就只能先存储在队列中 // 如果队列中元素过多肯定会报oom Thread.sleep(50_000); System.out.println( Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
为了更加明显突出,直接修改
image-20211103115319924-Xmx 6m -Xms 6m
,调小jvm运行内存,如下图:结果如下:
image-20211103121610336
3.1.2 SingleThreadPool
该线程池从名字可以看出是单个线程,因此在创建线程池的时候也不需要传入核心线程数,如下:
ExecutorService executor = Executors.newSingleThreadExecutor();
从其创建的源码可知,该线程池使用的也是"无边界阻塞队列",如下:
image-20211109094219578因此如果在当个任务处理的情况下,也会发生OOM
,原理同FixThreadPool
原理一样,这里就不再演示
3.1.3 CachedThreadPool
-
说明
该线程池与之前线程池创建一样,不需要传入核心线程数,如下:
ExecutorService executor = Executors.newCachedThreadPool();
但是通过源码可以得知,该线程池的核心线程为0,而非核心线程数为
image-20211109101404414Integer.MAX_VALUE
,使用的也是中转队列,如下: -
使用
使用该线程池处理任务代码如下:
package com.tomato.thread.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.text.MessageFormat.format; public class CacheThreadPoool { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for(int i = 0; i < 1_000; i++) { executorService.execute(() -> { System.out.println(format("{0}执行任务",Thread.currentThread().getName())); }); } } }
运行结果如下:
image-20211109101959313 -
缺点
从其源码可知,当有任务过来会立马交给线程池的线程执行,而其核心线程为0,最大线程为
Integer.MAX_VALUE
如果当执行的任务是一个耗时任务,在任务较多的情况下,就会频繁的创建线程对象,从而有可能发生OOM,如下:
image-20211109103029903代码演示如下:
package com.tomato.thread.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.text.MessageFormat.format; public class CacheThreadPoolOOM { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < Integer.MAX_VALUE; i++) { executor.execute(() -> { try { Thread.sleep(50_000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(format("{0}执行任务完成", Thread.currentThread().getName())); }); } } }
同时为了更快看到效果,这里设置
image-20211109103845255JVM
参数-Xms2m -Xmx2m
,如下:因此在阿里开发规范中,该类型的线程池与上述类型的两种线程池是不允许使用的
3.1.4 ScheduledThreadPool
该类型的线程池,是一个具备周期性执行的一个线程池,在这里不再解释,之前的章节有过详细描述
3.2 手动
-
说明
在上述中主要是利用jdk提供的线程池去进行创建,但是也提到了每个线程池的局限性,因此在实际开发中手动创建线程池机会反而多点
-
代码
因此在具体创建时,代码如下:
package com.tomato.thread.pool; import java.text.MessageFormat; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static java.text.MessageFormat.*; /** * 自定义线程池 */ public class CustomThreadPool { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 10, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20)); for(int i = 0; i < 30; i++) { executor.execute(() -> { System.out.println(format("{0}执行任务",Thread.currentThread().getName())); }); } } }
执行结果如下:
image-20211109112033050这样做的好处可以自己控制核心线程数,最大线程数,以及存储队列等
4. 核心数
当手动创建线程池时,如何去确定核心线程数为多少,目前在一些实践中主要分为以下两类:
-
CPU密集型(经常加密,计算hash等)
线程核心数此时应该为 CPU可用核数的1~2倍
-
IO密集型(数据库读写,文件读写,网络读写)
一般该线程数为核心线程数的很多倍,根据
Brain Goetz
的公式为:CPU 核心数 * (1 + 平均等待时间/平均工作时间)
5. Factory
在上述中,不管是手动创建线程池还是自动创建线程池,使用的ThreadFactory
都是默认的线程工厂
有时候在创建线程池时,想要修改线程池一些参数,例如线程名字等,这样就可以使用自定义线程工厂,如下:
package com.tomato.thread.pool;
import java.text.MessageFormat;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.text.MessageFormat.*;
public class CustomThreadFactory implements ThreadFactory {
private ThreadGroup threadGroup;
private AtomicLong threadNumber = new AtomicLong(1L);
/**
* 线程池中线程组名字前缀
*/
private String prefix;
public CustomThreadFactory() {
this("");
}
public CustomThreadFactory(String prefixName) {
SecurityManager securityManager = System.getSecurityManager();
threadGroup = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
prefix = (prefixName != null && !"".equals(prefixName)) ? prefixName : "tomato-pool-thread";
}
@Override
public Thread newThread(Runnable r) {
/**
* 当设置stackSize属于<=0 时,以-Xss为准
* 当设置stackSize属于(0, 4k]区间时,设置的-Xss会失效,栈空间取默认值1M
* 当设置stackSize属于(4k, 64k]区间时,设置的-Xss会失效,栈空间取4k。
* 当设置stackSize属于(64k, 128k]区间时,设置的-Xss会失效,栈空间取64k。
* 当设置stackSize属于 >128k 时,设置的-Xss会失效,栈空间取stackSize本身
*/
Thread thread = new Thread(threadGroup,r, prefix + "-" + threadNumber.getAndIncrement(), 0);
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}
class Test {
public static final Integer ACPU = Runtime.getRuntime().availableProcessors();
public static void main(String[] args) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
ACPU * 2 + 1,
ACPU * 2 + 1,
0,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(600),
new CustomThreadFactory());
for(int i = 1; i < 1_00; i++) {
executor.execute(() -> {
System.out.println(format("{0}执行任务完成",Thread.currentThread().getName()));
});
}
}
}
执行结果,发现线程的名字都改变,如下:
image-20211109134421034这样就完成了自定义ThreadFactory的实现
6. Reject
从上文的描述中可知,线程池的流程是当任务存储队列满了的时候,则会创建非核心线程去执行队列中的任务
那如果所有的线程都处于忙碌中,且队列中存储满了,那么当任务再过来就会执行配置的
拒绝策略去拒绝任务,在线程池中拒绝策略主要有三个,如下:
如果线程池异常关闭,有任务过来也会异常拒绝
-
AbortPolicy
终止策略,当任务被拒绝时,则抛出
RejectExecutionException
异常这种策略也是默认的策略
-
CallerRunsPolicy
调用者策略,如果线程池使用该策略拒绝了该任务,那么该任务由哪个线程提交的就由哪个线程执行
-
DiscardOldestPolicy
丢弃最早未处理请求策略,当使用该策略时,线程池会丢弃最先进入阻塞队列中的任务,给最新的任务腾出空间
-
DiscardPolicy
丢弃策略,使用该策略,就会丢弃最新任务
-
自定义
当然也可以根据其策略去自定义拒绝策略
关于四种策略,其详细解释如下文
6.1 AbortPolicy
-
说明
该策略为终止策略,也是线程默认的策略,其原理就是当队列存储已满,且无任何空闲线程时,就会抛出
image-20211109141419872RejectExcutionException
,原理图如下:同时从源码可以看出该策略,其方式就是抛出异常,如下:
image-20211109141752480 -
使用
创建线程池使用该策略,如下:
package com.tomato.thread.pool; import java.text.MessageFormat; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 策略方式为抛异常 */ public class AbortPolicyTest { public static void main(String[] args) { final Integer ACPU = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2 * ACPU + 1, 2 * ACPU + 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(6), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i <= 100; i++) { executor.execute(() -> { try { // 模拟耗时 Thread.sleep(100); System.out.println(MessageFormat.format("{0}执行完成", Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
从代码可以看出,阻塞队列最大容量是6,而添加的任务是100,因此当超过容量,且线程都处于忙碌时,则会抛出异常,如下:
image-20211109143945153
6.2 CallerRunsPolicy
-
说明
如果创建线程池使用该策略拒绝了该任务,那么该任务由哪个线程提交的就由哪个线程执行,如图:
image-20211109144806090从源码也可以看出,该方法直接被执行也,也就是哪个线程提交哪个执行,而不是让线程池的线程去执行,如图:
image-20211109145120665 -
使用
创建线程池使用该策略,如下:
package com.tomato.thread.pool; import java.text.MessageFormat; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CallerRunPolicyTest { public static void main(String[] args) { final Integer ACPU = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2 * ACPU + 1, 2 * ACPU + 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(6), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i <= 100; i++) { executor.execute(() -> { try { // 模拟耗时 Thread.sleep(100); System.out.println(MessageFormat.format("{0}执行完成", Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
最后从结果也可以得知,当线程池塞满时,也会有一些任务由主线程执行,如下:
image-20211109150326337
6.3 DiscardOldestPolicy
-
说明
当使用该策略时,线程池会丢弃最先进入阻塞队列中的任务,给最新的任务腾出空间,如图:
image-20211109151124441当然从其源码也可以知道,先从队列弹出一个task,再把最新的任务添加到线程池,如下
image-20211109151448261 -
使用
创建线程池,使用该策略如下:
package com.tomato.thread.pool; import java.text.MessageFormat; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class DiscardOldestPolicyTest { private final static AtomicInteger atomic = new AtomicInteger(1); public static void main(String[] args) { final Integer ACPU = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2 * ACPU + 1, 2 * ACPU + 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(6), new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i <= 100; i++) { executor.execute(() -> { try { // 模拟耗时 Thread.sleep(100); System.out.println(MessageFormat.format("{0}:执行{1}个任务完成", Thread.currentThread().getName(), atomic.getAndIncrement())); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
从结果也可以看出,有些任务并没有执行而是直接被舍弃了,如下:
pool-1-thread-18:执行3个任务完成 pool-1-thread-19:执行4个任务完成 pool-1-thread-21:执行1个任务完成 pool-1-thread-1:执行15个任务完成 pool-1-thread-17:执行6个任务完成 pool-1-thread-16:执行2个任务完成 pool-1-thread-24:执行25个任务完成 pool-1-thread-22:执行23个任务完成 pool-1-thread-13:执行8个任务完成 pool-1-thread-14:执行7个任务完成 pool-1-thread-7:执行18个任务完成 pool-1-thread-25:执行19个任务完成 pool-1-thread-23:执行9个任务完成 pool-1-thread-5:执行22个任务完成 pool-1-thread-8:执行12个任务完成 pool-1-thread-20:执行5个任务完成 pool-1-thread-6:执行24个任务完成 pool-1-thread-12:执行21个任务完成 pool-1-thread-3:执行20个任务完成 pool-1-thread-9:执行10个任务完成 pool-1-thread-10:执行13个任务完成 pool-1-thread-11:执行17个任务完成 pool-1-thread-2:执行14个任务完成 pool-1-thread-4:执行16个任务完成 pool-1-thread-15:执行11个任务完成 pool-1-thread-17:执行31个任务完成 pool-1-thread-16:执行26个任务完成 pool-1-thread-18:执行30个任务完成 pool-1-thread-1:执行28个任务完成 pool-1-thread-21:执行29个任务完成 pool-1-thread-19:执行27个任务完成
6.4 DiscardPolicy
-
说明
使用该策略,如果线程池想要拒绝任务,则会将最新的任务丢弃,原理如下:
image-20211109152300372当然从代码也看出,丢弃了最新任务:
image-20211109152436923 -
使用
package com.tomato.thread.pool; import java.text.MessageFormat; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class DiscardPolicy { private final static AtomicInteger atomic = new AtomicInteger(1); public static void main(String[] args) { final Integer ACPU = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 2 * ACPU + 1, 2 * ACPU + 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(6), new ThreadPoolExecutor.DiscardPolicy()); for (int i = 0; i <= 100; i++) { executor.execute(() -> { try { // 模拟耗时 Thread.sleep(100); System.out.println(MessageFormat.format("{0}:执行{1}个任务完成", Thread.currentThread().getName(), atomic.getAndIncrement())); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
从结果看出,有些任务并没有执行,而是直接被舍弃了,如下:
pool-1-thread-21:执行25个任务完成 pool-1-thread-11:执行21个任务完成 pool-1-thread-10:执行20个任务完成 pool-1-thread-7:执行4个任务完成 pool-1-thread-16:执行8个任务完成 pool-1-thread-4:执行13个任务完成 pool-1-thread-18:执行22个任务完成 pool-1-thread-25:执行7个任务完成 pool-1-thread-12:执行14个任务完成 pool-1-thread-6:执行17个任务完成 pool-1-thread-2:执行24个任务完成 pool-1-thread-19:执行18个任务完成 pool-1-thread-13:执行11个任务完成 pool-1-thread-5:执行16个任务完成 pool-1-thread-1:执行3个任务完成 pool-1-thread-20:执行10个任务完成 pool-1-thread-8:执行5个任务完成 pool-1-thread-23:执行12个任务完成 pool-1-thread-24:执行23个任务完成 pool-1-thread-3:执行15个任务完成 pool-1-thread-17:执行9个任务完成 pool-1-thread-22:执行6个任务完成 pool-1-thread-15:执行1个任务完成 pool-1-thread-9:执行19个任务完成 pool-1-thread-14:执行2个任务完成 pool-1-thread-10:执行29个任务完成 pool-1-thread-7:执行30个任务完成 pool-1-thread-4:执行31个任务完成 pool-1-thread-21:执行26个任务完成 pool-1-thread-16:执行28个任务完成 pool-1-thread-11:执行27个任务完成
6.5 sumary
四种策略,具体采用哪种策略还是需要根据具体的业务场景来实现
网友评论