线程池

作者: 卡路fly | 来源:发表于2020-05-23 20:31 被阅读0次

    功能

    1. 当执行大量异步任务时线程池能够提供很好的性能。
    2. 线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。
    线程池体系
    • Executor 是线程池最顶层的接口,在 Executor 中只有一个 execute 方法,用于执行任务。至于线程的创建、调度等细节由子类实现。
    • ExecutorService 继承并拓展了 Executor,在 ExecutorService 内部提供了更全面的任务提交机制以及线程池关闭方法。
    • ThreadPoolExecutor 是 ExecutorService 的默认实现,所谓的线程池机制也大多封装在此类当中。
    • ScheduledExecutorService 继承自 ExecutorService,增加了定时任务相关方法。
    • ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。
    • ForkJoinPool 是一种支持任务分解的线程池,一般要配合可分解任务接口 ForkJoinTask 来使用。

    ForkJoinPool介绍:https://www.jianshu.com/p/85a928076a3c

    线程池分类

    newSingleThreadExecutor

    单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按先进先出的顺序执行。

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    newCachedThreadPool

    可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    newFixedThreadPool

    固定数目的、可重用的线程池。

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    newScheduledThreadPool

    定时线程池,支持定时及周期性任务执行。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    

    构造参数

    corePoolSize

    表示核心线程数量。

    maximumPoolSize

    表示线程池最大能够容纳同时执行的线程数,必须大于或等于 1。如果和 corePoolSize 相等即是固定大小线程池。

    keepAliveTime

    表示线程池中的线程空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下 corePoolSize 个线程。

    unit:

    用来指定 keepAliveTime 的时间单位,有 MILLISECONDS、SECONDS、MINUTES、HOURS 等。

    workQueue

    等待队列,BlockingQueue 类型。当请求任务数大于 corePoolSize 时,任务将被缓存在此 BlockingQueue 中。

    线程池三种队列区别

    1. SynchronousQueue
    没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

    拥有公平(FIFO)和非公平(LIFO)策略,非公平策略会导致一些数据永远无法被消费的情况

    使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

    使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

    2.LinkedBlockingQueue
    LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

    注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE

    3. ArrayBlockingQueue
    ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

    • threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是 null,则使用默认工厂类 DefaultThreadFactory。

    DefaultThreadFactory实现了ThreadFactory接口,newThread中生产了一个个线程并且设置为不是守护线程,线程优先级均为Thread.NORM_PRIORITY。
    在我们使用线程池的时候如果不自己创建线程工厂类,那么系统会给我们创建一个默认的线程工厂来生产线程(Executors中defaultThreadFactory())

    • handler:执行拒绝策略的对象。当 workQueue 满了之后并且活动线程数大于 maximumPoolSize 的时候,线程池通过该策略处理请求。(RejectedExecutionException)

    PS: 当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 设置为 true 时,核心线程超时后也会被销毁。

    流程

    当我们调用 execute 或者 submit,将一个任务提交给线程池,线程池收到这个任务请求后,有以下几种处理情况:

    a 当前线程池中运行的线程数量还没有达到 corePoolSize 大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态。

    b 当前线程池中运行的线程数量已经达到 corePoolSize 大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了,线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行。

    c 如果线程数大于 corePoolSize 数量但是还没有达到最大线程数 maximumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务。

    d 最后如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建“非核心线程”直接执行,线程池将根据拒绝处理器定义的策略处理这个任务。比如在 ThreadPoolExecutor 中,如果你没有为线程池设置 RejectedExecutionHandler。这时线程池会抛出 RejectedExecutionException 异常,即线程池拒绝接受这个任务。

    程序会报异常 RejectedExecutionException,拒绝策略是线程池的一种保护机制,目的就是当这种无节制的线程资源申请发生时,拒绝新的任务保护线程池。默认拒绝策略会直接报异常,但是 JDK 中一共提供了 4 种保护策略,如下:


    实际上拒绝策略都是实现自接口 RejectedExecutionException,开发者也可以通过实现此接口,定制自己的拒绝策略。

    阿里拒绝禁止使用 Executors原因

    阿里 Java 开发手册中严禁使用 Executors 工具类来创建线程池。尤其是 newFixedThreadPool 和 newCachedThreadPool 这两个方法。

    • newFixedThreadPool
      理论上可以无限添加任务到线程池。当核心线程执行时间很长(比如 sleep10s),则新提交的任务还在不断地插入到阻塞队列中,最终造成
      OOM。

    • newCachedThreadPool
      缓存线程池的最大线程数为 Integer 最大值。当核心线程耗时很久,线程池会尝试创建新的线程来执行提交的任务,当内存不足时就会报无法创建线程的错误。


    整理自:
    线程池之刨根问底

    动图来源


    allowCoreThreadTimeOut(true) 能使核心线程销毁

    相关文章

      网友评论

          本文标题:线程池

          本文链接:https://www.haomeiwen.com/subject/vvulahtx.html