美文网首页
[并发] 6 线程池之ThreadPoolExecutor

[并发] 6 线程池之ThreadPoolExecutor

作者: LZhan | 来源:发表于2019-11-06 17:26 被阅读0次
    1.Thread使用

    new Thread的弊端:

    • 每次new Thread新建对象,性能差;
    • 每次新建Thread实例,线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多的系统资源导致死机或者OOM
    • 缺少更多功能,如更多执行、定期执行、线程中断

    所以,在实践中使用多线程时,并不会使用Thread,而是使用线程池。

    2.线程池使用

    线程池的好处:

    • 重用存在的线程,减少对象的创建、消亡的开销,性能佳
    • 可以有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
    • 提供定时执行、定期执行、单线程、并发数控制等功能
    3.ThreadPoolExecutor类的注释说明
    关系图.png

    ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。
    线程池解决了两个不同的问题:
    <1> 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
    <2> 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。

    为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。

    • Executors.newCachedThreadPool(无界线程池,自动线程回收)
    • Executors.newFixedThreadPool(固定大小的线程池);
    • Executors.newSingleThreadExecutor(单一后台线程);
    4.ThreadPoolExecutor类参数

    重要初始化参数:
    <1> corePoolSize:核心线程数量
    <2> maximumPoolSize:线程池最大线程数
    <3> workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响
    <4> keepAliveTime:线程没有任务执行时最多保持多久时间终止
    <5> unit:keepAliveTime的时间单位
    <6> threadFactory:线程工厂,用来创建线程
    <7> rejectHandler:当拒绝处理任务时的策略

    • 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    • 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。
    • 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务。
    • 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理。
    • 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程。
    • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭。


      image.png

    4.1 workQueue
    workQueue是一个BlockingQueue,用于存放提交的任务,队列的实际容量与线程池大小相关联。
    主要有三种队列策略:
    <1> Direct handoffs 直接握手队列
    Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
    此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大于平均处理速度时,会导致线程数量会无限增长问题。

    <2> Unbounded queues 无界队列
    当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。

    <3> Bounded queues 有界队列
    一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:

    • 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
    • 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

    4.2 拒绝策略(RejectedExecutionHandler的实现类)
    拒绝任务有两种情况:1.线程池被关闭 2.任务队列已满且maximumPoolSize已满;
    无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
    <1> AbortPolicy
    默认的策略,直接抛出RejectedExecutionException运行时异常
    <2> CallerRunsPolicy
    提供一个简单的反馈控制机制,可以减慢提交新任务的速度
    <3> DiscardPolicy
    直接丢弃新提交的任务
    <4> DiscardOldestPolicy
    如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程)

    4.ThreadPoolExecutor的几种状态
    状态.png

    RUNNING:运行态,可以处理新任务并执行队列中的任务
    SHUTDOWN:关闭态,不接受新任务,但是处理队列中已存在的任务
    STOP:停止态,不接受新任务,不处理队列中任务,并且打断运行中任务
    TIDYING:整理态,所有任务已经结束,workerCount=0,将执行terminated()方法
    TERMINATED:结束态,terminated()方法已完成

    5.ThreadPoolExecutor的几种方法

    <1> execute():提交任务,交给线程池执行
    <2> submit():提交任务,能够返回执行结果,相当于execute+Future
    <3> shutdown():关闭线程池,等待任务都执行完(进入shutdown状态)
    <4> shutdownNow():关闭线程池,不等待任务执行完(进入stop状态)
    适用于监控
    <5> getTaskCount():线程池已执行和未执行的任务总数
    <6> getCompletedTaskCount():已完成的任务数量
    <7> getPoolSize():线程池当前的线程数量
    <8> getActiveCount():当前线程池中正在执行任务的线程数量

    线程池类图.png
    6.Executor框架接口
    • Executors.newCachedThreadPool
      创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
    
    • Executors.newFixedThreadPool
      创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
    
    • Executors.newScheduledThreadPool
      创建一个周期线程池,支持定时及周期性任务执行。
        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
        
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    • Executors.newSingleThreadExecutor
      创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    相关文章

      网友评论

          本文标题:[并发] 6 线程池之ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/vbprbctx.html