JDK多任务执行框架

作者: 城市里永远的学习者 | 来源:发表于2018-09-27 17:08 被阅读0次

    1、为什么要使用线程池?
    2、线程池有什么作用?
    3、说说几种常见的线程池及使用场景。
    4、线程池都有哪几种工作队列?
    5、怎么理解无界队列和有界队列?
    6、线程池中的几种重要的参数及流程说明。

    1.无限制线程的缺陷
    一、线程池
    1.定义:进行线程的复用,减少线程创建和销毁的开销,提高多核CPU的优势
    2.原理:任务进来先监测是否有池中是否有空闲线程,若有直接用;若没有则创建线程。当使用后不是直接销毁,而是放回池的空闲队列。
    二、Executor框架
    1.在concurrent并发包中核心类,其中ThreadPool-Executor表示一个线程池,Executors类则是线程池工厂的角色,通过它可以获得一种特定功能的线程池。


    image.png

    使用Executor框架调度MyThread,其代码如下:

    ExecutorService executorService= Executors.newFixedThreadPool(10);
            int threadCount=0;
            for (int i=0;i<10;i++) {
                executorService.submit(new MyThread(i));
            }
    

    Executors工厂类提供了生成不同用途的线程池,具体:
    类型1:

     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    

    该方法返回一个固定数量的线程池,当任务提交后,若有空闲线程则处理,否则进入任务队列,直到有空闲线程时在处理任务队列中的任务
    类型2:

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    

    该方法返回一个数量为1的线程池,多余任务进队列,直到线程空闲,才处理多余的任务
    类型3:

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    该方法返回一个可复用的线程池,当提交新的任务后,若有空闲线程则直接使用线程,如无则直接创建,直到Interger最大值2147....,线程使用完后放回线程复用
    类型4:

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    

    该方法返回一个线程池大小为1 的ScheduledExecutorService,在给定时间执行某个任务或周期性执行
    类型5:

    public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    

    该方法返回一个可以指定线程池大小的ScheduledExecutorService对象

    2.自定义线程池
    Executors工厂创建的线程池内部均创建了new ThreadPoolExecutor

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    

    参数解析:
    corePoolSize:指定了线程池的线程数量(常驻线程大小)
    maximumPoolSize:指定了最大线程池大小
    keepAliveTime:当线程池线程数量超过了corePoolSize 时,多余的空闲线程 存活时间,即超过corePoolSize的空闲线程,多久会被销毁
    unit:时间单位
    workQueue:任务队列,提交但未执行的任务
    threadFactory:线程工厂,用于创建线程,一般采用默认
    handler:拒绝策略
    其中重点是workQueue和threadfactory,workQueue是接口BlockingQueue的对象,用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可以使用2.1 以下几种BlockingQueue:

    image.png
    a. SynchronousQueue:同步阻塞队列,没有容量,每个插入操作都要等待一个删除操作,同时一个删除操作也要等待一个插入操作。不保存任务,总是将任务提交给线程执行,没有空闲线程,则创建新的线程,直到线程池达到最大值,执行拒绝策略,通常要设置很多的maximumPoolSize

    b. ArrayBlockingQueue:有界的任务队列,构造函数需要指定一个线程池容量参数,表示该池最大线程数。当使用有界队列时,若有新任务提交时,实际线程小于corePoolSize,则创建新的线程;当大于corePoolSize时,加入任务队列等待执行;若任务等待队列已满,在不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize了,则执行拒绝策略。可见,有界队列仅当满了之后,才能将线程提升到mamximumPoolSize,否则会一直维持在corePoolSize的水平。

    c. LinkedBlockingQueue:无界的任务队列,若有新任务进来,实际线程小于corePoolSize,则会创建新的线程处理任务;当线程达到corePoolSize又没有空闲线程,则会加入到无界的任务队列,若任务创建和处理的速度相差很多的时候,会导致队列无限的增长,直至系统内存耗尽。

    d. PriorityBlockingQueue:带优先级的任务队列,可以执行带优先级的任务队列,是一种特殊的无界队列,而ArrayBlockingQueue和LinkedBlockingQueue都是先进先出的队列,而这个属于按优先级顺序执行的队列。

    几种线程池使用队列的对比:
    a. newFixedThreadPool返回了一个corePoolSize和maximumPoolSize大小相等的、且使用了LinkedBlockingQueue的线程池,由于两个值相等,所以对于线程池数量是固定的,但是由于使用了无界任务队列,当任务频繁提交时,会使队列快速膨胀,导致内存溢出
    b. newSingleThreadPool返回的是单线程的线程池,是newFixedThreadPool的一种
    c. newCachedThreadPool返回的是corePoolSize为0 ,maximumPoolSize为Integer.MAX_VALUE(2147....) 。这就意味着无任务时无线程,当有任务提交时,若有空闲线程,则用空闲线程;无空闲线程,则将任务加入到SynchronousQueue,而其又是同步阻塞队列,当有多任务提交时就会迫使线程池创建新的线程执行任务,当任务执行完后,由于corePoolSize为0 ,就会回收。如果有大量的任务被提交,而任务的执行速率慢的话,系统就会开启等量的线程,从而耗尽系统资源。

    2.2 JDK内置的拒绝策略
    a. AbortPolicy:直接抛出异常,阻止系统正常工作
    b. CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢
    弃的任务
    c. DiscardOledestPolicy:该策略丢弃最老的一个请求,并尝试将当前任务加入到任务
    队列
    d. DiscardPolicy :直接丢弃无法处理的任务,不做任何处理
    可以使用自定义拒绝策略:

    public interface RejectedExecutionHandler {
       void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    其中r为当前执行的任务,executor为当前线程池,

    三、优化线程池大小
    公式:Nthreads=Ncpu* Ucpu*(1+w/c)
    其中:Ncpu是CPU的数量,可以通过Runtime.getRunTime().availableProcessor()
    Ucpu是目标cpu的使用率,0<Ucpu<1
    W/C是等待时间与计算时间的比率

    四、扩展ThreadPoolExecutor
    ThreadPoolExecutor是一个可以扩展的线程池,提供了beforeExecute、afterExecute、terminated方法对线程池进行控制,比如:在ThreadPoolExecutor.Worker.runTask方法中:


    image.png

    可以监控线程状态,对于排查系统故障有帮助

    相关文章

      网友评论

        本文标题:JDK多任务执行框架

        本文链接:https://www.haomeiwen.com/subject/kahjoftx.html