美文网首页
JUC之线程池精讲

JUC之线程池精讲

作者: StrongManAlone | 来源:发表于2018-04-15 20:52 被阅读0次

前言:过去的半年里,我确实是以自己能看到的速度在成长,而这成长也来之不易。哎这一身肉嗖嗖的长啊,哈哈,开个玩笑。感谢这半年间对我给予过帮助和鼓励的人,还要感谢那些志同道合的人,是你们让我感觉到在阅读源码这条路上我并不孤单,虽然我们不曾见面,但是我感觉你们一定也是胖子,,哈哈,,。之所以要写这篇文章是因为想总结一下自己在研读JUC的时候总结的一点东西,当然这一篇文章不可能把整个JUC讲到详尽,只是想让这篇《线程池精讲》引出JUC中非常关键的几个技术点,在以后的文章中会详细的介绍,大概会包括以下的内容:自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁、重量级锁、锁升级、锁降级、的原理讲解,还有像ReentrantLock这种重入锁的源码实现,还有ReentrantReadWriteLock读写锁的源码实现及读写锁之间的升级降级,当然还有原子类里面的内容,再有就是他们都会用到的两个最最重要的比较底层的两个东西,volatile和CAS,如果你还不知道这两个东西是啥,或者说是volatile你只停留在了N年前的 内存可见性,禁止重排序,实现原理 内存屏障的话,说实话你有必要关注一下我的简书了。CAS的话你只知道比较并替换却不知道ABA,也不知道如何解决ABA问题的话,说实话你真的有必要关注走一波,在以后的关于JUC的文章里面我都会有相应的讲解。废话说了这么多下面开始进入今天的主题,线程池。

本次源码分析基于JDK1.8

首先来说一下如何正确的使用线程池,以及使用不当带来的后果:

虽然使用Executors创建线程池非常的简单,种类也比较全面,可以适应各种场景下,但是还是有一些隐患的,下面我们根据源码逐一分析一下原因:

Executors支持创建的线程池种类 线程数固定的线程池

如上图所示是一种创建固定数目的线程池的简便方法,它的特点就是线程数得到很好地控制,但是拥有长度不受限制的基于链表的的阻塞队列,这就意味着当你的线程池的任务激增的时候,可能会因为处理能力不够造成任务积压,最终因为队列容量不断增大而导致OOM。

newWorkStealingPool在今天的文章中就不介绍了是基于ForkJoinPool的。

单线程的线程池

newSingleThreadExecutor和newFixedThreadPool的原理完全相同,就是最大线程数为1,它存在的意义可能就会作为一个任务队列吧。

超大线程数的线程池

你会看到很多文章中介绍它是一个线程数无限大,或者最大值是Integer.MAX_VALUE的,其实是不对的,为什么不对,下面的内容中我会介绍到的。newCachedThreadPool是个最大线程数超级大的线程池,线程拥有60秒的空闲保持时间,超过这个时间会被销毁,还有一个阻塞队列,它的作用不像传统的队列,只是阻塞的效果,这回带来什么问题呢?当然是线程数不断增大带来的线程切换频繁带来的开销,还有线程过多导致的OOM了。(什么为什么线程数过多也会OOM,因为创建线程的时候你要给它分配栈空间啊。) 

Executors快捷创建线程池的其他方法我就不逐一介绍了,(什么,这都不介绍了还叫精讲,听我细细道来)由于其他的创建线程池和前面提到的两种极其类似,存在的隐患也是一样的,只不过是多了自定义线程工厂,和延时队列等一些东西,不了解的话也没有关系,一看便知。

下面即将迎来精髓,也就是线程池的源码分析:

老样子还是从构造函数开始看起:

ThreadPoolExecutor的构造函数

ThreadPoolExecutor提供了四个构造函数,其实前三个都是调用第四个构造函数进行初始化的,只是提供了一些默认的参数而已,下面来说下第四种构造函数的每个参数的作用。

ThreadPoolExecutor的构造函数

从上图可以看出来ThreadPoolExecutor的初始化过程,需要设置一个核心线程数,最大线程数,(默认非核心线程保持时间,也可以设置核心线程也受此约束)空闲保持时间,时间单位,阻塞队列,线程工厂,以及一个用于处理线程池队列爆满,以及达到最大线程数之后的Handler。下面我用第一种构造函数初始化一个线程池。(其实我最推荐第四种,因为项目中常常拥有多个线程池,默认的线程工厂是以一个全局的原子变量拼接当前线程池中一个原子变量进行命名的,出现问题很难分辨是哪个线程池中的线程,所以推荐大家自定义线程工厂,以线程池相关功能名称拼接当前线程池中原子变量序号的方式进行线程命名,以减小排查故障的难度。这里我先用默认的,因为用默认参数的人比较多一点)代码如下:

测试代码

下面我们来详细的讲解一下上面的代码定义的线程池以及它的工作过程。从上面的代码中可以看出我写了个无限循环来提交任务。以便测试线程池发生的各种场景,这篇文章中我就不以debug每行代码的形式进行讲解了,因为线程池这块的代码太多了,而且也不太集中,很容易就会造成篇幅过长。我将会以标注的方式讲解其中需要注意的地方。

execute过程

从上图可以看出execute过程的第一步是一个非空校验,然后第一个需要讲解的点来了,没错就是变量-ctl-,别看它只是个小小的变量,但是它在线程池里面起到了相当重要的作用,下面我带大家认识一下它。

ctl变量

首先ctl是一个原子对象,它的value是一个int型整数,被volatile修饰。那ctl为什么要用原子类呢,当然是要保证并发安全啊,它是如何做到原子性的,这就要引出CAS了。前言中已经说到这篇文中只是引出这些知识点,想要知道详细的内容期待下篇文章吧。ctl变量在线程池中分别维护了线程池的运行状态,和线程数,它是怎么做到的呢,其实很简单,就是int类型用二进制表示为32位0,1码,把这32分割一下就可以代表两个变量了啊。接下来看下代表着线程池运行状态的几个变量吧,分别为RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。按照上图的代码COUNT_BITS应该是29。

CAPACITY:0001 1111 1111 1111 1111 1111 1111 1111

~CAPACITY:1110 0000 0000 0000 0000 0000 0000 0000

RUNNING:1010 0000 0000 0000 0000 0000 0000 0000

SHUTDOWN:0000 0000 0000 0000 0000 0000 0000 0000

STOP:0010 0000 0000 0000 0000 0000 0000 0000

TIDYING:0100 0000 0000 0000 0000 0000 0000 0000

TERMINATED:0110 0000 0000 0000 0000 0000 0000 0000

runStateOf(int c)函数的作用明显是取前三位为有效数位,排除后29位的影响,返回运算后数字

workerCountOf(int c)函数恰恰相反,取后29位为有效数位,排除前三位的影响,并返回运算后的整数,明显它的最大值不会是Integer.MAX_VALUE,大概是536870911。(不知道我这个大概你们是否满意啊,哈哈)

ctlOf(int rs, int wc)函数明显是把经过上述两个步骤的整数结果合并成原来整数的作用。

每种状态它们的大小关系如下(对于理解源码这很重要):RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED

了解完ctl,我们回过头来再看一下execute()的整体流程,通过注释我们也可以猜出个大概,分为三种情况,分别如下:

1、根据ctl的值获取线程池中的有效线程数,如果该数值小于核心线程数,调用addWorker(command, true)方法,返回true则退出,返回false则再次获取ctl的值。并继续下面的逻辑。

2、判断线程池是否处于RUNNING状态,并且阻塞队列未满,满足条件的话再次获取ctl的值。   

    2.1、判断是否已经不处于RUNNING状态,并且从队列中移除任务成功,满足的话调用reject(command)。

    2.2、判断线程池中的有效线程数是否为0,满足条件的话调用addWorker(null, false)。

3、调用addWorker(command, false),判断返回是否为false,满足条件的话调用reject(command)。

了解完execute()的整体流程,我们会发现根据线程池的各种参数状态分别会调用三种参数的addWorker()方法和reject(command)方法。接下来让我们去探索下这两个方法里面究竟发生了什么吧。源码如下:

addWorker方法

下面我来按行解读下addWorker()的源码:

1、首先进入循环,获取ctl的值,通过ctl的值获取运行状态,判断状态是否不处于RUNNING状态,以及接下来的处于SHUTDOWN状态,Runnable对象为空,阻塞队列不为空,三个条件其中一个不满足,就结束了addWorker()方法,返回false。

2、不满足上面的条件会进入一个循环内部的循环,根据ctl的值获取有效线程数,判断是否大于等于CAPACITY(536870911)或者当core=true时大于等于核心线程数,当core=false时大于等于最大线程数。满足条件结束addWorker()方法,返回false。

    2.1、不满足上述条件会执行compareAndIncrementWorkerCount(c)操作,其实就是一个CAS原子操作,作用是C数字加一,操作成功的话跳出循环。

    2.2、上述操作不成功的话,会重新获取ctl的值,并判断状态是否已经改变,如果已经改变的话,跳到开头,重新执行外部循环,否则继续执行内部循环。

3、定义两个boolean变量,workerStarted,workerAdded,初始化赋值都为false。一个Worker对象,介绍完addWorker()方法我会详细的介绍Worker这个内部类。这里你就知道他是线程池中创建线程的包装类就好了。然后判断包装类里面的线程是否不为null,满足条件,下面进行加锁(ReentrantLock,我说过会通过线程池的讲解引出前言中的知识点,没骗你吧。),再次获取ctl的值,并通过该值获取线程池状态,当处于RUNNING状态或者处于SHUTDOWN并且Runnable类型参数不为null,满足条件进行判断,刚刚Worker对象中的线程对象是否处于激活状态(或者说是已经启动还未终止),满足条件直接抛出异常,否则将此Worker对象添加到workers中,workers是一个HashSet<Worker>,看到此处应该知道线程池中的线程是保存在一个HashSet中的。然后获取这个HashSet的元素数量,如果大于largestPoolSize(largestPoolSize是一个int变量,用于记录线程池生命周期中达到的最大线程数),会更新largestPoolSize的数值。并把workerAdded设置为true。释放锁。|||||      然后判断workerAdded是否为true,满足条件,启动刚刚Worker对象中的线程。并把workerStarted变量设置为true,最终判断workerStarted是否为false,满足条件说明创建Worker失败了,要进行回滚,addWorkerFailed()方法的作用就是回滚Worker创建,并把信息传播出去。后面会讲到addWorkerFailed()方法里面的具体内容。最后返回workerStarted变量。此方法就结束了。

接下来带大家看下Worker的源码:

Worker

首先Worker的构造函数会初始化Runnable和Thread两个对象,其次这里引出了我前面提到的用CAS操作尝试获取锁的操作,再然后就是把run()方法委托给了runWorker()方法,其他的就没有什么好讲的了,下面我带大家看下runWorker()方法的源码,源码如下:

runWorker

首先一个Worker初始化完调用runWorker()方法时,会进入一个循环。第一次进入这个循环时由于task是初始化时的Runnable参数,所以直接进入循环内部,接下来会判断线程池是否为STOP,TIDYING,TERMINATED,状态,满足条件的话,中断线程,并确定状态处于上述状态,并且当前线程处于中断状态,满足条件清除中断状态,并将Worker线程中断。beforeExecute()方法其实什么都没有做,是作者留下的扩展方法,需要我们自己去重写,再然后就是执行task了,执行完之后调用afterExecute()方法和beforeExecute()一样,也是一个用于扩展的空方法。最后task置为null,Worker的执行计数器+1,task置为null是因为下次循环直接以getTask()作为条件,getTask()是一个去阻塞队列里面取Runnable对象的操作,具体内容在下一个片段介绍。如果getTask()返回来的是null,那么就会跳出循环,completedAbruptly会被置为false,一个线程即将执行完所有的代码,那就意味这它的生命周期即将结束,processWorkerExit()方法是它在生命的最后片段需要做的事情,把Worker从workers中清理掉,还有其他的一些清理工作,走到这个方法的线程一般会有三种情况,线程运行时发生异常挂掉,非核心线程空闲状态下到达保持时间,寿寝正终,还有就是线程池关闭了,最终所有的线程都会GG。runWorker()方法讲到这里差不多就讲完了,下面我们来看下getTask()方法的源码,源码如下:

getTask

首先进入getTask()方法会进入一个循环,获取ctl的值,并根据该值取得运行状态,当线程池处于 STOP,TIDYING,TERMINATED状态时,或者线程池阻塞队列为空时,ctl值减一(也就是相当于有效线程数减一),并返回null。 接下来这个timed变量是决定这条线程是否受空闲保持时间限制的一个条件,当allowCoreThreadTimeOut为true,或者有效线程数大于核心线程数时此条线程就会受到空闲保持时间的限制。所谓的空闲保持时间就是用阻塞队列来实现的,当timed=true时,去队列取数据用的是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,规定时间内没有取得数据就会结束阻塞,并返回null,将timedOut 设置为true,并在下次循环中退出,并且此条线程生命周期结束。timed=false时,去队列里面取数据用的是take()方法,会一直阻塞,直到取得数据为止。所以不会受到空闲保持时间的限制。取得数据之后,返回该数据。  讲到这里大家也应该明白线程池是怎么复用线程的了吧,其实很简单,就是创建一个线程之后让他去无限循环的去队列里面去取任务,取到了就执行,取不到就阻塞,这样就可以达到线程复用的目的了啊。哈哈。。

上述方法和execute()方法密切相关,只有把这些方法放在一起分析才可以真正的理解线程池添加任务进来的整个流程,现在execute()和addWorker()方法已经讲完了,再来带大家看看前面提到的threadFactory和RejectedExecutionHandler,讲完两者我会对线程池添加任务的整个流程做一个总结,以便大家理解的更加的透彻。在Worker类中通过getThreadFactory()方法获取线程池中的线程工厂,并用此线程工厂去创建线程。当然除了使用默认的线程池工厂外你也可以用构造函数或者setThreadFactory()方法指定个性化的线程工厂,下面我带大家看看默认的线程工厂源码:

DefaultThreadFactory

前面我说过了推荐用第三个构造函数进行创建线程池,为什么呢,你可以看看默认的线程工厂里面有什么东西,DefaultThreadFactory实现了ThreadFactory接口,这个接口就定义了一个newThread()方法,用来创建新的线程,其中poolNumber变量代表是第几个池子,threadNumber变量代表是该池子里面的第几条线程,主要用于命名线程,还有就是用于创建线程的参数,ThreadGroup,Runnable,name,stackSize,以及设置线程的类型,以及优先级,我感觉这些参数修改的意义都不大,除非你有特殊需求,比如你有强迫症,就得创建见名知意的线程名字,什么数字序号看见就难受,或者是你想要创建用于很深的递归的线程,加大创建线程的栈空间(这里的栈空间长度参数为0,表示使用者不关心栈空间大小,走jvm的默认大小),等等。否则的话就别费事了,就用默认的吧,挺好。

下面再来看下RejectedExecutionHandler,这个东西是用来在线程池阻塞队列爆满之后,最大线程数也爆满的情况下的一个拒绝策略处理器,线程池中实现了好几种,下面我们从默认的开始看起,源码如下:

AbortPolicy()(默认的):

AbortPolicy

AbortPolicy是线程池默认的拒绝策略,策略很简单,就是直接抛出异常。

以下的几种策略大部分代码都一样,为了缩短篇幅只贴出rejectedExecution()方法代码

DiscardPolicy():

DiscardPolicy

DiscardPolicy拒绝策略也很简单,就是什么都不做。

DiscardOldestPolicy():

DiscardOldestPolicy

DiscardOldestPolicy拒绝策略路子就比较野了,它是把队列的第一个任务剔除掉,然后再让当前任务重新提交。

CallerRunsPolicy():

CallerRunsPolicy

CallerRunsPolicy拒绝策略路子也有点野,居然用当前提交任务的这条线程去执行了这个任务。

从上面的代码可以看出,线程池提供的几种拒绝策略都不太友好,所以推荐大家自己定制化自己的拒绝策略,所以啦,推荐大家用第三种构造函数去创建线程池。

讲到这里基本上线程池的大部分代码都已经讲到了,其中还有一些关于改变线程池状态,设置线程池各种参数,获取线程池运行时的各种参数的方法就不讲了,还有一些比如初始化完线程池就创建完所有的核心线程的方法(主要是减小第一批任务的延迟),那些基本都很简单,有了前面的基础,应该一看就懂。稀里哗啦的讲了一大堆,可能看着有点晕,下面我就开始说好的总结一下execute()方法的过程。

1、如果线程池的有效线程数少于核心线程数,创建并启动一个Worker来执行新提交的任务。

2、如果线程池的有效线程数达到了核心线程数,线程池的阻塞队列未满, 将新提交的任务加入到该阻塞队列中。

3、如果线程池的有效线程数达到了核心线程数,但却小于最大线程数 ,并且线程池的阻塞队列已满, 创建并启动一个Worker来执行新提交的任务。

4、如果线程池的有效线程数达到了最大线程数, 并且线程池的阻塞队列已满,  RejectedExecutionHandler 根据它的拒绝策略来处理该任务。

5、如果线程池不处于RUNNING状态,RejectedExecutionHandler 根据它的拒绝策略来处理该任务。

为了大家更好的理解execute()整个调度过程,在这里献上一张流程图:

ThreadPoolExecutor流程图

还有来自山哥亲情奉献的一张关于本文中一些知识点的大纲:

java多线程脑图

以及接下来的有买有赠环节:

到此线程池精讲这篇文章就结束了,希望你看到这里的时候会有些许的收获,或者是新的感悟,不枉当初点进来阅读这篇文章的冲动。最后送给你们一张神图,是关于juc全部知识点的脑图,非常全面,非常经典,老手查漏补缺,新手循环渐进挨个攻克,肯定受益匪浅。

郑重声明:此图来自简书用户“小程故事多”,若侵删

juc脑图

原图链接:juc脑图

相关文章

  • JUC之线程池精讲

    前言:过去的半年里,我确实是以自己能看到的速度在成长,而这成长也来之不易。哎这一身肉嗖嗖的长啊,哈哈,开个玩笑。感...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • JUC之线程池

    线程池 线程池做的工作主要是控制运行的线程的数量 ,处理过程中将任务加入队列 ,然后在线程创建后启动这些任务, 如...

  • JUC 线程池

    概述 线程池的作用:节省资源、提升响应、削峰限流、管理线程 ThreadPoolExecutor的核心参数:cor...

  • juc之四:线程 & 线程池 & ForkJoinPool

    1.基础知识 1.1阻塞队列(BlockingQueue) 方法抛出异常返回特殊值一直阻塞超时退出插入方法add(...

  • 线程池-核心参数-03

    当我们使用 callable 接口实现线程时,常会和 juc 下面的线程池一起使用。这里主要看看 线程池的核心参数...

  • Java 线程池创建、使用、停止

    JUC 已经提供了一些现成的线程池给开发者使用,但是这些线程池或多或少不能满足具体的业务开发需求,所以在使用线程池...

  • 博客系列-2019年时间轴

    2019年 JUC线程池服务ExecutorService接口实现源码分析 Github Page:http://...

  • JUC线程池(4):线程池状态

    我们都知道,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。线程池也有5种状态;然而,线程池不同...

  • JUC线程池(1):线程池架构

    线程池架构图 1.Executor 它是一个接口,用来执行任务的。准确来说,Executor提供了execute(...

网友评论

      本文标题:JUC之线程池精讲

      本文链接:https://www.haomeiwen.com/subject/dgjlkftx.html