美文网首页Java开发那些事
Java线程池工作原理浅析

Java线程池工作原理浅析

作者: wanderingGuy | 来源:发表于2019-05-09 23:33 被阅读80次

    随着项目业务的快速扩张,你是否已经注意到很多单独的线程游离在各个模块中,一旦想做线程方面的监控与优化,代码将需要大动干戈。

    相信你一定用过rxjava、okHttp这些流行的框架,它们内部都涉及线程的调度,且封装好一系列的API供你使用,你甚至完全不必关心这些线程是如何工作的。如果单独使用它们都没问题,可是如果你从项目架构的角度考虑是否应该重新考量如何使用它们。

    为什么要用线程池?

    1. 线程属于稀缺资源,它的创建会消耗大量系统资源。
    2. 线程频繁地销毁,会频繁地触发GC机制,使系统性能降低。
    3. 多线程并发执行缺乏统一的管理与监控。

    线程池的使用

    线程池的创建使用可通过java并发包中的Executors类完成,它提供了创建线程池的常用方法。

    • newFixedThreadPool
    • newSingleThreadExecutor
    • newCachedThreadPool
    • newScheduledThreadPool

    后面会陆续介绍它们,我们先来看一个例子。

    public void main() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for(int i = 0; i < 20; i++) {
            executorService.execute(new MyRunnable(i));
        }
    }
    
    static class MyRunnable implements Runnable {
        int id;
         MyRunnable(int id) {
            this.id = id;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
                Log.i("threadpool", "task id:"+id+" is running threadInfo:"+Thread.currentThread().toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    示例中创建了一个固定线程数的线程池,并向其中添加20个任务。

    fix.gif

    通过log打印可以看到,日志一次打印三条,每3秒打印一次,所有任务都在名为pool-1-thread-1,pool-1-thread-2,pool-1-thread-3的线程中运行,这与我们为线程池设置的大小相吻合。导致这种现象的原因是线程池中只有三个线程,当一次性将20个任务加入到线程池中时,前三个任务优先执行,后面的任务都在等待。

    而如果我们把ExecutorService executorService = Executors.newFixedThreadPool(3);换为ExecutorService executorService = Executors.newCachedThreadPool();再来看一下效果。

    cache.gif

    一瞬间任务都执行完了,可以预见使用newCachedThreadPool方式创建的线程池,执行任务时会创建足够多的线程。

    接下来,我们正式来看线程池的内部的工作原理,我们分为三个部分来讲解。

    线程池.png

    常见的线程池种类上面已经提及,我们来看一下它们是如何创建出来的,我们举两个栗子。

    # -> Executor.newFixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    # -> Executor.newSingleThreadExecutor
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    可见线程池的创建都是通过ThreadPoolExecutor完成的,来看一下它的构造方法。

    # -> ThreadPoolExecutor构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            ...
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    构造方法声明的一系列参数非常重要,理解了它们线程池的基本原理你就掌握了,我们来看看他们的具体含义:

    • corePoolSize 核心线程数,除非设置核心线程超时(allowCoreThreadTimeOut),线程一直存活在线程池中,即使线程处于空闲状态。
    • maximumPoolSize 线程池中允许存在的最大线程数。
    • workQueue 工作队列,当核心线程都处于繁忙状态时,将任务提交到工作队列中。如果工作队列也超过了容量,会去尝试创建一个非核心线程执行任务。
    • keepAliveTime 非核心线程处理空闲状态的最长时间,超过该值线程则会被回收。
    • threadFactory 线程工厂类,用于创建线程。
    • RejectedExecutionHandler 工作队列饱和策略,比如丢弃、抛出异常等。

    线程池创建完成后,可通过execute方法提交任务,线程池根据当前运行状态和特定参数对任务进处理,整体模型如下图:

    线程池模型图.png

    下图清晰的展示了线程池提交任务后的流程,不再赘述。


    线程池工作流程图.jpg

    接下来我们看看Executors工具类中常见的几种线程池使用的构造参数是怎样的。

    线程池类型 核心线程数 最大线程数 非核心线程空闲时间 工作队列
    newFixedThreadPool specific specific           0           LinkedBlockingQueue
    newSingleThreadExecutor 1 1 0 LinkedBlockingQueue
    newCachedThreadPool 0 Integer.MAX_VALUE 60s SynchronousQueue
    newScheduledThreadPool specific Integer.MAX_VALUE 0 DelayedWorkQueue

    其中specific指的是需要使用者传入固定值。

    这里需要先对阻塞队列进行额外的分析。

    阻塞队列

    你有没有想过为什么要用阻塞队列,非阻塞的不行吗?

    实际上阻塞队列常用于生产者-消费者模型,任务的添加是生产者,任务的调度执行是消费者,他们通常在不同的线程中,如果使用非阻塞队列,那势必需要额外的处理同步策略和线程间唤醒策略。比如当任务队列为空时,消费者线程取元素时会被阻塞,当有新的任务添加到队列中时需唤醒消费者线程处理任务。

    阻塞队列的实现就是在添加元素和获取元素时设置了各种锁操作(Lock+Condition)。

    另一个需要关注的是阻塞队列的容量问题,因为根据线程池处理流程图,阻塞队列容量的大小直接影响非核心线程的创建。具体来说,当阻塞队列未满时并不会创建非核心线程,而是将任务继续添加到阻塞队列后面等待核心线程(如果有)执行。

    • LinkedBlockingQueue 内部用链表实现的阻塞队列,默认的构造函数使用Integer.MAX_VALUE作为容量,即常说的“无界”,另可以通过带capacity参数的构造函数限制容量。使用Executors工具类创建的线程池容量均为无界的。
    • SynchronousQueue 容量为0,每当有任务添加进来时会立即触发消费,即每次插入操作一定伴随一个移除操作,反之亦然。
    • DelayedWorkQueue 用数组实现,默认容量为16,支持动态扩容,可对延迟任务进行排序,类似优先级队列,搭配ScheduledThreadPoolExecutor可完成定时或延迟任务。
    • ArrayBlockingQueue 它不在上述线程池的体系当中,它基于数组实现,容量固定且不可扩容。

    应根据实际需求选择合适的阻塞队列,现在我们再来看这些线程池的使用场景。

    • newFixedThreadPool 它的特点是没有非核心线程,这意味着即使任务过多也不会创建新的线程,即使任务闲置也仍保留一定数量的核心线程。等待队列无限,性能相对稳定,适用于长期有任务要执行,同时任务量也不大的场景。
    • newSingleThreadExecutor 相当于线程数量为1的newFixedThreadPool,因为线程数量为1,所以用于任务需顺序执行的场景。
    • newCachedThreadPool 它的特点是没有核心线程,非核心线程无限,可短时间内处理无限多的任务,但实际上创建线程十分消耗资源,过多的创建线程极可能导致OOM,同时设置了线程超时时间,还涉及到线程资源的释放,大量任务并行时性能不稳定,少量任务并行且后续不再需要执行其他任务的场景可使用。
    • newScheduledThreadPool 通常用于定时或延迟任务。

    在实际开发过程中不建议直接使用Executors提供的方法,如果任务规模、响应时间大致确定,应根据实际需求通过ThreadPoolExecutor各种构造函数手动创建,还自由可控制线程数、超时时间、阻塞队列、饱和策略(默认的饱和策略都是AbortPolicy即抛出异常)。

    饱和策略

    内置的饱和策略有如下四种

    • DiscardPolicy 将丢弃被拒绝的任务。
    • DiscardOldestPolicy 将丢弃队列头部的任务,即先入队的任务会出队以腾出空间。
    • AbortPolicy 抛出RejectedExecutionException异常。
    • CallerRunsPolicy 在execute方法的调用线程中运行被拒绝的任务。

    用户也可通过实现RejectedExecutionHandler接口自定义饱和策略,并通过ThreadPoolExecutor多参的构造函数传入。

    接下来我们有必要了解一下线程池的继承结构。

    线程池类图

    线程池类图.png
    • Executor 基类接口,仅定义一个execute方法,这也就是我们提交任务时使用的方法,使用execute添加的任务不具备返回值。
    • ExecutorService 仍然是一个接口,开始产生池化概念,定义了带返回结果的提交任务方法submit,以及关闭线程池的shutDown方法。
    • AbstractExecutorService 实现了大部分接口方法,剩余shutDown相关和execute抽象方法未实现。
    • ThreadPoolExecutor 最常用的线程池
    • ScheduledThreadPoolExecutor 定义了一系列支持延迟任务的线程池。
    • ForkJoinPool 与ThreadPoolExecutor解决的问题不同,它采用分治思想,将一个任务细分为多个子任务在多线程中执行。比如要计算1到100万的整数和,按ThreadPoolExecutor的方案是将这一个任务拆分为多个提交至线程池,而按ForkJoinPool的方案是提交一个任务到线程池,任务的拆分细化工作交由自定义任务执行。感兴趣的小伙伴参考这篇文章多线程 ForkJoinPool 详解

    线程池大小选定

    了解了线程池的内部结构,在实战中我们应该如何选取线程池的大小呢?

    这需要大致了解任务是CPU密集型还是IO密集型。

    • CPU密集型 比如大量的计算任务,CPU占用率较高,那么此时如果多开线程反而会因为CPU频繁的做线程调度导致性能降低。一般建议线程数为cpu核心数+1,加1是为了防止某个核心线程阻塞或意外中断时作为候补。
    • IO密集型 通常指文件I/O、网络I/O等。线程数的选取与IO耗时和CPU耗时的比例有关,最佳线程数 = CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)],之所以设置比例是为了使I/O设备和CPU的利用率都达到最大。

    以单核、CPU计算和I/O操作的耗时是1:2为例,可以看到三个线程可使CPU利用率达到100%(本例来自极客时间 --Java并发编程实战)。

    时间分片.png

    线程池状态

    线程池的状态在整个任务处理过程中至关重要,比如添加任务时会先判断线程池是否处于运行状态,任务添加到队列后再判断运行状态,如果此时线程池已经关闭则移除任务并执行饱和策略。

    我们接下来看看线程池的几种状态:

    • RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
    • SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
    • STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
    • TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
    • TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。

    一图说明状态流转


    线程池状态变化图.png

    总结

    回到文章开始的问题:RxJava和OkHttp采用哪种线程池进行调度的呢?

    1. 在rxjava内部定义了几个线程调度器常用的Schedulers.io()Schedulers.computation()分别对应了IO密集型和CPU密集型调度器,内部均使用的是ScheduledThreadPoolExecutor线程池,这是出于链式调用中可能存在delay等延时操作的考量而设计的。二者不同点在于最大线程数不同,对computation来说最大线程数为CPU核心数,而对io来说最大线程数为无限。
    2. 在OkHttp中默认使用线程池就是newCachedThreadPool,上面介绍过此线程池的弊端,这可能是出于高并发量的考量而做的选择,在实际使用中可根据实际情况灵活配置。
    # -> Dispatcher
    public Dispatcher() {
    }
    
    public Dispatcher(ExecutorService executorService) {
        this.executorService = executorService;
    }
    
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
    

    参考文章

    相关文章

      网友评论

        本文标题:Java线程池工作原理浅析

        本文链接:https://www.haomeiwen.com/subject/nqpnoqtx.html