美文网首页web开发资源Java知识
深入分析java线程池的实现原理

深入分析java线程池的实现原理

作者: 美团Java | 来源:发表于2016-07-17 14:50 被阅读60150次

    简书 占小狼 转载请注明原创出处,谢谢!

    2017/04/23 于复兴中路裸心社
    回头看看之前写的这篇文章,印象中读源码的兴趣源头似乎来自于Java线程池,当山头被一座一座攻克时,你会发现掉到一个大坑中,因为不懂的领域的实在太多。

    快关注我的公众号!


    前言

    线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:
    1、降低资源消耗;
    2、提高响应速度;
    3、提高线程的可管理性。

    Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。

    demo

    1、Executors.newFixedThreadPool(10)初始化一个包含10个线程的线程池executor;
    2、通过executor.execute方法提交20个任务,每个任务打印当前的线程名;
    3、负责执行任务的线程的生命周期都由Executor框架进行管理;

    ThreadPoolExecutor

    Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool方法可以生成一个拥有固定线程数的线程池。

    其本质是通过不同的参数初始化一个ThreadPoolExecutor对象,具体参数描述如下:

    corePoolSize

    线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

    maximumPoolSize

    线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

    keepAliveTime

    线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;

    unit

    keepAliveTime的单位;

    workQueue

    用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
    1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    4、priorityBlockingQuene:具有优先级的无界阻塞队列;

    threadFactory

    创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。

    handler

    线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;
    当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

    Exectors

    Exectors工厂类提供了线程池的初始化接口,主要有如下几种:

    newFixedThreadPool

    初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。

    newCachedThreadPool

    1、初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
    2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;

    所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。

    newSingleThreadExecutor

    初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。

    newScheduledThreadPool

    初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

    实现原理

    除了newScheduledThreadPool的内部实现特殊一点之外,其它几个线程池都是基于ThreadPoolExecutor类实现的。

    线程池内部状态

    其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
    1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
    2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
    3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
    4、TIDYING : 2 << COUNT_BITS,即高3位为010;
    5、TERMINATED: 3 << COUNT_BITS,即高3位为011;

    任务提交

    线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。

    Executor.execute()

    通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。

    ExecutorService.submit()

    通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

    任务执行

    当向线程池中提交一个任务,线程池会如何处理该任务?

    execute实现

    具体的执行流程如下:
    1、workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤(2);
    2、如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4);
    3、再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
    4、执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务;

    addWorker实现

    从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码实现如下:

    这只是addWoker方法实现的前半部分:
    1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
    2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程,具体实现如下:

    线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程,其中Worker类设计如下:
    1、继承了AQS类,可以方便的实现工作线程的中止操作;
    2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
    3、当前提交的任务firstTask作为参数传入Worker的构造方法;


    从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。

    runWorker实现

    runWorker方法是线程池的核心:
    1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
    2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
    3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
    4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

    getTask实现

    整个getTask操作在自旋下完成:
    1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
    2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

    所以,线程池中实现的线程可以一直执行由用户提交的任务。

    Future和Callable实现

    通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。

    在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
    1、Callable接口类似于Runnable,只是Runnable没有返回值。
    2、Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
    3、Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

    submit实现

    通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。

    FutureTask

    1、FutureTask在不同阶段拥有不同的状态state,初始化为NEW;
    2、FutureTask类实现了Runnable接口,这样就可以通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

    FutureTask.get实现

    内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

    1、如果主线程被中断,则抛出中断异常;
    2、判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
    3、如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
    4、通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
    5、最终通过LockSupport的park或parkNanos挂起线程;

    FutureTask.run实现

    FutureTask.run方法是在线程池中被执行的,而非主线程
    1、通过执行Callable任务的call方法;
    2、如果call执行成功,则通过set方法保存结果;
    3、如果call执行有异常,则通过setException保存异常;

    set
    setException

    set和setException方法中,都会通过UnSAFE修改FutureTask的状态,并执行finishCompletion方法通知主线程任务已经执行完成;

    finishCompletion

    1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;
    2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值,并通过LockSupport类unpark方法唤醒主线程;


    我是占小狼
    坐标魔都,白天上班族,晚上是知识的分享者
    如果读完觉得有收获的话,欢迎点赞加关注


    相关文章

      网友评论

      • 2_shou:你好,请问execute实现那里,“1、workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤(2)”,步骤(2)指的是图片中的步骤2还是后面的下一个2
      • 三两五花肉:深入浅出,不错,狼哥
      • papa_er:狼哥,能说一下线程池在实际工作中的场景吗
      • a072fa038320:对照源码看了一遍 花了1小时。。 看完 整体理解了,还需要整理成自己的理解。
      • 月伴飞鱼:很不错
      • Tellmelies:每个Woker 实例类只有一个线程,为什么执行task还要加锁呢? 望大神解答下
      • fdd3b0857d3e:大神,看了你好几篇文章 写的都是通俗易懂 理解很到位 佩服佩服!
        美团Java:@时间海yy 多谢支持
      • 疯狂的哈丘:你好,博主,Worker为什么要继承AQS我不太理解。如果只是为了加锁,那直接继承ReentrantLock不就好了?看Doug Lea写的javadoc,好像是为了防止中断。但是不是很理解,可以解答一下吗?具体什么场景用到?
        疯狂的哈丘:https://stackoverflow.com/questions/42189195/why-threadpoolexecutorworker-extends-abstractqueuedsynchronizer
        找到答案了。:smile: 主要为了防止worker运行线程的时候被其他的操作中断。
        在interruptIdleWorkers方法中,会将一些空闲的线程中断。如果用了ReentrantLock,由于这是可重入得锁,所以执行tryLock()会返回true。所以Worker自己继承了AQS实现了一个不可重入得锁。我觉得这个可以在文章里面说的清楚些。
        最后还是谢谢博主写了一篇好文章:+1:
      • 574d1aec84b0:很不错的文章
        美团Java:@乌托邦的污妖王 欢迎关注公众号啊
      • 7bc304b3a6f1:兄弟你的文章写真心不错,但是我对照的源码和你的文章有点看不懂。。。。咋整?
      • lopman87:狼哥,线程池是如何回收线程的啊?当最大线程数大于核心线程数的时候
      • 望京屯子coding沐:Java程序媛表示好佩服小狼锅!最近正好想把Java基础再走一遍,谢谢好文~~ :grin:
        6672cce5b861:有问题,可以直接去你工位请教吗:smile:
        望京屯子coding沐:@占小狼 常来~~
        美团Java:@望京屯子coding沐 望京啊,我今天正好出差望京
      • 玖柒叁:newSingleThreadExecutor创建的线程,如果异常是怎么能够让新的线程 继续 执行的?一直不太明白
        美团Java:@wwwyn666 需要看下源码
      • e0e671e24011:补充一些东西。关于一个Worker如果从队列中取不到task的情况(已经没有任务可以执行了)。
        这个时候会执行一个processWorkerExit的方法,这个方法先看worker退出的原因是不是由于用户提交的任务中有异常,如果是,把worker的数量减一。
        再从工作队列中移除worker。
        然后看看线程池是否是运行的。如果是运行的并且worker退出的原因是由于用户任务中的异常,则新增一个worker(官方文档上说是“替换”这个worker)。
        如果不是,则根据allowCoreThreadTimeOut字段取和workQueue是否为空取一个最小值,最小值可能为0或者corePoolSize。如果工作的worker数量小于这个最小值,则同样新增一个worker
        e0e671e24011:最小值可能为0、1或者corePoolSize
      • 93c25b7d5260:如果低29位表示线程个数的话,那么线程池应该是不能设置成integer.max的?
      • faf35009f9b4:您好,我想请教一个问题:如果当前线程数小于核心线程池的数量,会创建一个线程执行提交的任务,为何即便有空闲的线程能够执行新任务还要创建线程呢,这不是增加系统开销吗
        美团Java:@Kevin先生 尝试下未必不可以
        faf35009f9b4:@占小狼 大佬高看我了,最近在看并发编程的艺术,看到这块有些疑问:smile:
        美团Java:@Kevin先生 这是jdk线程池的设计,如果你不满足这种设计,可以自己尝试写一个
      • 沉默得夜:有demo吗?
      • fb919fa2aa73:看得好吃力,但总算啃完。。有时间得再看一遍并整理成自己的日志
        美团Java:@wwwfumeckcom :+1:
      • 极简架构:基于1.7写的啊
        极简架构:@占小狼 漂亮,无招胜有招啊,高手:+1:
        美团Java:@kimze1107 是吗,我都不记得基于什么版本写的了,不过只要了解其中原理就可以啦
        极简架构:狼哥为何我看的1.7源码的getTask()跟你贴出来的稍有差异呢?什么鬼
      • c100644fee4a:正常(running状态)流程还能看的懂,特殊(考虑各种非running状态、关闭非核心线程等)流程就看的稀里糊涂了。
        美团Java:@wendyqun 哈哈,知道实现原理就好了,细节记不住
      • 白敏鸢:博主你好。线程内部状态ctlof 是哪里的函数😂,找了很久。
        美团Java:@catetc 确定你没找错么?
      • c27d7adf8c82:有个很基本的问题,希望楼主回答我。实例在创建的时候,就已经初始化了一个线程。每放入一个task对象到queue里面去,就会初始化一个线程,我的理解一个thread对象就对应一个线程,那么线程池怎么做到线程复用的?
        c27d7adf8c82:我百度了一下线程的生命周期,在new的时候就初始化一个线程,run会就绪等待执行
        c27d7adf8c82:@占小狼 源码我都看了,你说线程的复用是通过queue自旋取下一个,没有就挂起。那我的理解这个挂起也只是针对这个poll或者take操作的线程,而执行任务task.run()应该启动的是task的线程,下一个task仍然要启动一个新线程,不知道这个理解对不对
        美团Java:@范协鼎 不是一个线程对应一个task,线程一直在跑,而且是执行task的run方法,我建议你看一下源码
      • 曹真:自己写的一篇文章http://www.jianshu.com/p/6c32868876f1 请各位指点
      • 45c172d52b84:对于低29位这些运算符有点不明白为啥要这样做
        美团Java:@问号_fallen 多看看
        45c172d52b84:@占小狼 不是很懂其中的原理……看源码好多时候都会看不懂这些
        美团Java:@问号_fallen 涉及到运算效率吧
      • landy8530:你好,我是从importNew微信公众号中看到你文章关注你的,写的文章不错!请问一下,你事例中的IDE是idea吗?还是?
        美团Java:@张飞_007 pocesson
        Britney_z:楼主是用什么工具画的图
        美团Java:@LYX_XM 对的
      • 9caaf0a36da0:有条不紊的将java线程池知识点的来龙去脉都介绍了一遍,技术上点到即止,不会太深,又不至于不明白某些底层代码所做的事情,感谢作者的贡献!
      • 58304365bd15:LinkedBlockingQueue Queue 单词写错了、、:smirk:
        58304365bd15:可以问个问题吗?
        CallerRunsPolicy();策略会保证所有线程都会执行吗?
        写了一个demo测试了,100个线程,5个core,10个max,LinkedBlockingQueue10个,发现100线程都会执行结束。
        会不会有情况下和DiscardPolicy();一样,抛弃部分线程?
        58304365bd15:@占小狼 主要是文章写的好,反复阅读、思考才会发现错误。
        美团Java:@武金亮 :joy::joy::joy:感谢
      • hbbbbb:不错
      • 8a8805cf3c81:写的不错,Java并发实战更详细。
      • jianshuhua:您好,有个疑问,FutureTask.run实现里,如果工作线程已经进入到了call()里,那么,通过FutureTask.cancel() 方法其实应该是无法中断线程执行任务的,例如,call()函数就是个简单的一千万次计数循环。也就是说FutureTask.cancel(boolean mayInterruptIfRunning) 是做不到 Interrupt If Running 的,call()函数一旦进入,就是可以执行完的。不知我的理解是否存在什么问题?
        美团Java:@jianshuhua 这个我下午再看看哈:smile:
        jianshuhua: @占小狼但是调用t.interrupt(),只是给t线程发送了一个中断信号,而假设此时t线程已经执行到了了FutureTask的run方法里的call方法了,t线程应该是不会处理这个中断信号的,而是继续执行call方法。假设call方法里是个死循环,那就永远暂停不了这个线程了。不知道我哪里理解出问题了
        美团Java:可以中断的,因为FutureTask的run方法中,UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()会把当前的执行线程赋值给runner变量,当执行cancel方法时,会调用当前线程的t.interrupt()方法,中断当前线程
      • 文曜繁星:描述的很清晰,从代码的角度阐述,通俗易懂。实际作用中,要万分小心线程池中的异常任务处理。
      • 989ae418fc7b:写的不错 :+1:
        美团Java:@989ae418fc7b 嘿嘿,多谢支持
      • pseudo_niaonao:不错,已分享 :smile:
        美团Java:@_niaonao 挺久了:sunglasses:
        pseudo_niaonao:@占小狼 作者是java大神啊,研究使用java多久了
        美团Java:@_niaonao :smile:
      • b8002c176069:赞一个
      • 48892085f47c:受益匪浅 :+1:
        美团Java:@程序猿小屌丝 :smile: 我是知识的服务者
      • ab0d6913a23d:小伙,技术杠杠滴啊,做技术多久了啊?膜拜下
        美团Java:@bugken 多谢肯定,身体才是本钱:smile:
        ab0d6913a23d:@占小狼 厉害啊,看你文章看到现在这个点。加油哈,写出更多更好的文章。谢谢
        美团Java:@bugken 3年了
      • 5079b8d519f3:图画得很漂亮 :smiley:
        ab0d6913a23d:@占小狼 总什么工具画的啊?
        美团Java:@timesongjie 惭愧,这些图不是我画的,一开始不知道怎么画图,不过现在都是自己画图了
      • 愚公300代:加油!!!
      • 4100698877a9:看了一早上你的文章 受益匪浅 把原来模糊不清的东西又捋顺了一遍 感谢:heart:
        美团Java:@Symars 多谢肯定
      • 辣鸡简书_随意拉黑:最近总工作中用过一次,效率提高的太多了
        美团Java:@愚行者 :smile:

      本文标题:深入分析java线程池的实现原理

      本文链接:https://www.haomeiwen.com/subject/ecnejttx.html