接着前面的继续做笔记,现在到了线程池。线程池平时项目里,使用的非常多,但一直停留在一知半解的状态,也就是知道一个execute(Runnable)的水平,也基本不使用自定义线程池,这次好好的学一下。
先看看书上线程池的作用:
- 利用线程池管理并复用线程、控制最大并发数。
- 实现任务线程队列缓存策略和拒绝机制。
- 实现某些与时间相关的功能,如定时执行、周期执行等。
- 隔离线程环境。
为了更好的使用这些功能,需要在创建线程池时,选择合适的参数,先看下他的构造方法:
image.png
- corePoolSize,线程池保留的最小线程数。如果线程池中的线程少于此数目,则在执行execute()时创建。
- maximumPoolSize,线程池中允许拥有的最大线程数。
- keepAliveTime、unit,当线程闲置时,保持线程存活的时间。
- workQueue,工作队列,存放提交的等待任务,该类队列是生产者消费者模型队列。
- threadFactory,线程工厂,它用来产生一组相同任务的线程。
- handler,执行拒绝策略的对象,如果不实现,出现拒绝服务时,默认则是抛出异常。
再看下线程池是如何工作的,从源码的注释来说:
线程池基本策略
- 当接收到一个任务时,如果线程池中运行的线程数小于corePoolSize核心线程,则新建一个线程。
- 如果所有运行的核心线程都都在忙,超出核心线程处理的任务,执行器更多地选择把任务放进队列,而不是新建一个线程。
- 如果一个任务提交不了到队列,在不超出最大线程数量情况下,会新建线程,否则就出现拒绝服务。
显然,队列workQueue的选择就显得很有必要,再来看下注释里的3类队列使用:
- Direct handoffs:直接提交,代表类型SynchronousQueue。特点是不保持,任务直接提交给线程,如果没有空闲线程,则自行开启线程。
- Unbounded queues:无限提交,代表类型LinkedBlockingQueue。由于队列本身没有上限,所以当核心线程都在工作时,会直接把任务保存在该队列中,所以同时执行的线程数最多只会是corePoolSize,自然maximumPoolSize参数也没有了意义。
- Bounded queues:有限提交,代表类型ArrayBlockingQueue。和无限提交类似,不过就是队列有了上限,超出队列的任务,会尝试去开启线程,如果运行线程总数超过maximumPoolSize,则会出现拒绝服务。
知道了这些策略,我们来看下JDK提供的几个常用的线程池,看一下使用范例:
- newFixedThreadPool,采用Bounded queues,可以长期保持固定的线程数工作,会积压任务,适合长期稳定的任务,但由于没有拒绝策略,可能会造成OOM
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newCachedThreadPool,采用Direct handoffs,没有核心线程,会复用已有线程,适合短时间的密集任务,同样有OOM风险
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- ScheduledThreadPoolExecutor,支持定时以及周期性任务执行,比Timer更安全,功能更强大,同样有OOM风险
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
前面的以前多少有接触一些, 书上更让我感兴趣的是后面的一部分:
Executors中默认的线程工厂和拒绝策略过于简单,通常对用户不友好,线程工厂需要对创建的线程做好表示,便于后续问题分析;而拒绝策略应考虑到实际业务,返回相应的提示或者友好地跳转。
看下我根据书上说明写的:
public ExecutorService service = null;
public TestExecutorsPool() {
service = new ThreadPoolExecutor(1,
1,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
new TestThreadFactory("TEST"),
new TestRejectHandler());
}
public static class TestThreadFactory implements ThreadFactory{
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
public TestThreadFactory(String whatFeatureOfGroup) {
this.namePrefix = "TestThreadFactory: " + whatFeatureOfGroup + " - ";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndIncrement();
return new Thread(r,name);
}
}
public static class TestRejectHandler implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 根据业务需求,采取不同的拒绝策略,通常就是打打LOG
}
}
再简单说下JDK提供的几个拒绝策略:
- AbortPolicy(默认):丢弃任务并抛出异常
- DiscardPolicy:仅丢弃任务,啥都不做
- DiscardOldestPolicy:抛弃队列里等待最久的任务,然后把当前任务入队
- CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行,这个会在线程池被使用的线程里执行,如果是UI线程,那就要小心了。
网友评论