Executor
框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,详情请看ThreadPoolExecutor 源码解析。Executors
还为我们提供了4种常见的线程池,他们都是基于ThreadPoolExecutor而来,还有一个是WorkStealingPool
是基于ForkJoinPool的。
FixedThreadPool
使用方式
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
创建一个工作线程数为10的线程池。
实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
我们可以看到:
- 它其实就是创建了一个核心线程数(corePoolSize)和最大线程数(maximumPoolSize)都是
nThreads
的ThreadPoolExecutor
-
keepAliveTime
为0表示线程一旦空闲就会被回收 - 它使用了
LinkedBlockingQueue
队列来存储任务,但是该队列的长度默认是Integer.MAX_VALUE
长度,当任务过多时有可能会发生内存溢出,所以呢不建议使用。
运行示意图
运行示意图.jpgSingleThreadExecutor
使用方式
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
创建一个工作线程数为1的线程池。
实现
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
我们可以看到:
- 它其实就是创建了一个核心线程数(corePoolSize)和最大线程数(maximumPoolSize)都是
1
的ThreadPoolExecutor
-
keepAliveTime
为0表示线程一旦空闲就会被回收 - 它也使用了
LinkedBlockingQueue
队列来存储任务,但是该队列的长度默认是Integer.MAX_VALUE
长度,当任务过多时有可能会发生内存溢出,所以呢不建议使用。
SingleThreadExecutor会将提交的任务按照提交顺序串行执行
运行示意图
SingleThreadExecutor的运行示意图.jpgCachedThreadPool
使用方式
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
创建一个最大工作线程数是Integer.MAX_VALUE
的线程池。
实现
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
我们可以看到:
- 创建了一个核心线程数(corePoolSize)是0,最大线程数(maximumPoolSize)是
Integer.MAX_VALUE
的ThreadPoolExecutor
,该线程池的线程数不可控,极端情况下, 可能会因为创建过多线程而耗尽CPU和内存资源,不建议使用 - 空闲线程的存活时间是1分钟
- 它也使用了
SynchronousQueue
队列来存储任务,该队列默认不存储元素,每一个put操作必须等待一个take操作,否则不能添加元素。当一直有任务提交的时候,改线程池会不断的新增工作线程来执行任务
运行示意图
CachedThreadPool的运行示意图.jpg前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图:
CachedThreadPool的任务传递示意图.jpg
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。
使用方式
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
//0表示首次执行任务的延迟时间,1表示每次执行任务的间隔时间,TimeUnit.SECONDS执行的时间间隔数值单位
}, 0, 1, TimeUnit.SECONDS);
创建一个核心线程时10,最大工作线程数是Integer.MAX_VALUE
的线程池。
运行结果:
Connected to the target VM, address: '127.0.0.1:50560', transport: 'socket'
pool-4-thread-1 2019-08-06T15:57:08.942
pool-4-thread-1 2019-08-06T15:57:09.802
pool-4-thread-2 2019-08-06T15:57:10.803
pool-4-thread-1 2019-08-06T15:57:11.803
pool-4-thread-3 2019-08-06T15:57:12.803
pool-4-thread-2 2019-08-06T15:57:13.803
实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
我们可以看到:
- 创建了一个核心线程数(corePoolSize)是10,最大线程数(maximumPoolSize)是
Integer.MAX_VALUE
的ThreadPoolExecutor
,该线程池的线程数不可控,极端情况下, 可能会因为创建过多线程而耗尽CPU和内存资源,不建议使用 - 空闲线程的存活时间是0,一但空闲就会被回收掉
- 它也使用了
DelayedWorkQueue
队列来存储任务,它是一个只能存储RunnableScheduledFuture
任务的延迟队列,一般我们使用他的子类ScheduledFutureTask
。
ScheduledFutureTask类图
ScheduledFutureTask.pngScheduledFutureTask核心属性
/** 表示这个任务被添加到ScheduledThreadPoolExecutor中的序号 */
private final long sequenceNumber;
/** 表示这个任务将要被执行的具体时间 */
private long time;
/** 表示任务执行的间隔周期 */
private final long period;
运行示意图
ScheduledThreadPoolExecutor的任务传递示意图.jpgWorkStealingPool
使用方式
ExecutorService workStealingPool = Executors.newWorkStealingPool();
创建一个ForkJoinPool线程池,线程池默认大小是CPU核心数。
实现
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
我们可以看到:
- 它其实就是创建了一个核心线程数是CPU核心数的
ForkJoinPool
总结
从上面我们可以看到,大部分的线程池是不建议直接使用的,我们在使用线程池的时候尽量使用ThreadPoolExecutor
来创建,并且根据业务合理的设置corePoolSize
、maximumPoolSize
、keepAliveTime
、BlockingQueue
、RejectedExecutionHandler
的值,否则在线上环境很容易耗尽CPU和内存资源,导致服务不可用。
参考
《java并发编程的艺术》
源码
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-concurrent 工程
layering-cache
为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下
网友评论