一、概述
-
ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;
-
Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。
-
先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障。同时可以根据自己的需求实现自己的线程池
Executor、ExecutorService 两个接口
- Executor只有一个方法 void execute(Runnable command)
- ExecutorService继承Executor,自己的方法多一些,例如 <T> Future<T> submit(Callable<T> task)
接下来要说的ThreadPoolExecutor(线程池)其实就是实现于它
二、核心构造方法讲解
下面是ThreadPoolExecutor最核心的构造方法
构造方法参数讲解 参数名作用
重点讲解:
其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
PS:决定最多开启多少线程执行任务
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
PS:使用中的线程已达上限,存入列表等待调度
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
PS:这种情况下可以叫做临时线程 根据maximumPoolSize来决定 线程数量的上限
4.当提交任务数超过maximumPoolSize+workQueue之和,新提交任务由RejectedExecutionHandler处理
PS:这个没什么好说的
5.当线程池中任务超过corePoolSize,空闲时间达到keepAliveTime时,关闭空闲线程
PS:在 3 情况下创建的线程,临时线程 空闲达到指定时间则关闭
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
PS:在 1和3 情况下创建的线程,空闲达到指定时间则关闭
概要四点
-
当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲仍会创建);
-
当核心线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;
-
当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁);
-
当线程总数达到maximumPoolSize时,后续提交的任务都会被RejectedExecutionHandler拒绝。
三、Executors提供的线程池配置方案
1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4、构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
四、向线程池提交任务:
可以使用两个方法向线程池提交任务,分别是execute()和submit()方法。
- execute
- submit(submit需要和Callable一起使用)详情参考Java并发之线程基础知识
线程池的(提交任务、关闭、监控)等详细内容参考Java并发之线程池
总结:
-
用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM
-
如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
-
最大线程数一般设为2N+1最好,N是CPU核数
-
核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
-
如果要获取任务执行结果,用CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大OOM,所以最好异步开个线程获取结果
网友评论