线程池
为什么要用线程池?
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
创建的线程池的方式
(1) 使用 Executors 创建
我们上面刚刚提到了 Java 提供的几种线程池,通过 Executors 工具类我们可以很轻松的创建我们上面说的几种线程池。但是实际上我们一般都不是直接使用Java提供好的线程池,另外在《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。
(2) ThreadPoolExecutor的构造函数创建
我们可以自己直接调用 ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给 BlockQueue 指定容量就可以了。示例如下:
private static ExecutorService executor = new ThreadPoolExecutor(13, 13,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(13));
这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException,这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好。
线程池常用API以及ThreadPoolExecutor构造参数:
ThreadPool参数
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
//创建一个核心数为1,最大线程数为2,超时时间为30s,等待队列最长为1,拒绝策略为直接拒绝(抛出异常)的线程池
-
corePoolSize:
线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。
-
maxmumPoolSize
池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。
-
KeepAliveTime
超过corePoolSize之后的“临时线程”的存活时间。
-
unit
keepAliveTime的单位。 (TimeUnit.SECONDS)
-
workQueue
当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制.
-
ThreadFactory
创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。
-
handler
线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。
系统默认的拒绝策略有以下几种:
- AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。
- DiscardPolicy:直接抛弃不处理。
- DiscardOldestPolicy:丢弃队列中最老的任务。
- CallerRunsPolicy:将任务分配给当前执行execute方法线程来处理。
我们还可以自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可,友好的拒绝策略实现有如下:
- 将数据保存到数据,待系统空闲时再进行处理
- 将数据用日志进行记录,后由人工处理
各种线程池的适用场景介绍
- FixedThreadPool: 适用于为了满足资源管理需求,而需要限制当前线程数量的应用场景。它适用于负载比较重的服务器;
- SingleThreadExecutor: 适用于需要保证顺序地执行各个任务并且在任意时间点,不会有多个线程是活动的应用场景。
- CachedThreadPool: 适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器;
- ScheduledThreadPoolExecutor: 适用于需要多个后台执行周期任务,同时为了满足资源管理需求而需要限制后台线程的数量的应用场景,
- SingleThreadScheduledExecutor: 适用于需要单个后台线程执行周期任务,同时保证顺序地执行各个任务的应用场景。
优雅地关闭线程池
-
shutdown():
- 调用之后不允许继续往线程池内继续添加线程;
- 线程池的状态变为
SHUTDOWN
状态; - 所有在调用
shutdown()
方法之前提交到ExecutorSrvice
的任务都会执行; - 一旦所有线程结束执行当前任务,
ExecutorService
才会真正关闭。
-
shutdownNow():
- 该方法返回尚未执行的 task 的 List;
- 线程池的状态变为
STOP
状态; - 阻止所有正在等待启动的任务,并且停止当前正在执行的任务;
- 线程池不再接受新的任务,但是仍然会将任务队列中已有的任务执行完毕。
-
awaitTermination:
- 设置定时任务,代码内的意思为 2s 后检测线程池内的线程是否均执行完毕(就像老师告诉学生,“最后给你 2s 钟时间把作业写完”),若没有执行完毕,则调用
shutdownNow()
方法。
- 设置定时任务,代码内的意思为 2s 后检测线程池内的线程是否均执行完毕(就像老师告诉学生,“最后给你 2s 钟时间把作业写完”),若没有执行完毕,则调用
-
常用的关闭的线程池的方法:
-
shutdown方法
-
awaitTermination方法
-
shutdownNow方法(发生异常或者是Timeout的时候)
-
代码示例:
private static final Random random = new Random(System.currentTimeMillis()); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0, 20).forEach(i -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(10)); System.out.println(Thread.currentThread().getName() + " [" + i + "] finish done"); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); //标记线程池中的线程,当所有的的线程都执行完毕后,线程池关闭 if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { //如果超时还有线程没有结束则强制关闭正在工作的线程,并且关闭线程池 executorService.shutdownNow(); } System.out.println("===============has Shutdowned========== "); }
-
线程池拒绝策略--RejectedExecutionHandler
-
AbortPolicy :
- 当线程池满了的时候,直接拒接当前提交任务当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException异常 。
-
**CallerRunsPolicy **:
- 当任务添加到线程池中被拒绝时,会在调用该execute方法中进行该任务的执行,如main线程。
-
DiscardOldestPolicy:
- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最久的未处理任务,然后将被拒绝的任务添加到等待队列中。
-
DiscardPolicy (不建议使用):
-
当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
-
Demo:
public class RejectedExecutionExample { public static void main(String[] args) throws InterruptedException { // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.AbortPolicy());//直接拒绝,抛出 RejectedExecutionException异常 // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.CallerRunsPolicy());// 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.DiscardOldestPolicy());// 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。 // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.DiscardPolicy());//直接拒绝 IntStream.range(0, 4).forEach(i -> threadPool.execute(() -> { System.out.println("Thread--> " + i + " is working"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread--> " + i + " work done"); })); TimeUnit.SECONDS.sleep(2); threadPool.execute(() -> System.out.println("Power By: " + Thread.currentThread().getName())); threadPool.shutdown(); if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { threadPool.shutdownNow(); } } public static ExecutorService RejectedExecutionHandlerTest(RejectedExecutionHandler handler) { return new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), Executors.defaultThreadFactory(), handler); } }
网友评论