美文网首页Android技术知识Android进阶之路Android开发
并发整理(三)— 并发集合类与线程池

并发整理(三)— 并发集合类与线程池

作者: kachidokima | 来源:发表于2017-05-21 14:28 被阅读798次

    并发整理最后一篇,之前两篇
    并发整理(一)— Java并发底层原理
    并发整理(二)— Java线程与锁

    这篇讲的主要是JDK中运用之前说的并发基础来包装的一些类给开发者来并发调用,仔细研究这些有利于我们加深对并发处理的理解

    ConcurrentHashMap

    HashMap是非线程安全的,如果在多线程下使用很容易形成环状链表

    HashMap死循环问题

    关于HashMap,java8也做了很多改进可以看下面的文章

    Java8重新认识HashMap

    HashTable虽然是线程安全的但是其会把每个操作都会加锁,会严重影响性能所以java推荐使用ConcurrentHashMap来在并发环境下代替HashMap。但是不同的版本有不同的实现方法,主要看以下

    注:源码内容太多,所以我没有贴代码,可以自行结合先说的逻辑看源码

    Java7

    以前的版本主要采用的锁分段技术,主要以下:

    • 其内部由Segment数组与HashEntry数组组成,其中Segment继承自ReentrantLockSegment结构与HashMap一致,所以这样拆分就解决了HashTable整个加锁的效率问题
    • 每次操作元素,都需要先hash一次到segment然后再hash一次到每个桶,并且两次用的hash算法也是不一样的
    • get操作:ConcurrentHashMap中Entry是volatile所以可以不用加锁也可以保证原子性
    • put操作:ConcurrentHashMap先判断是否要扩容,然后才添加逻辑。这样的逻辑比HashMap更加合理,因为原来的方式很可能造成扩容后没有元素还需要添加了,这样就造成了浪费
    • 扩容:扩容不会把ConcurrentHashMap整个容器扩容,只是对某个segment进行扩容
    • size操作:ConcurrentHashMap的计数非常巧妙,一般HashMap里面都有一个ModCount来标志所以的操作量,在计数时ConcurrentHashMap会先尝试2次统计单个segment不加锁直接统计,如果这个时候发现modcount变了说明并发存在,于是之后每次统计都会锁住segment

    探索 ConcurrentHashMap 高并发性的实现机制

    Java8

    Node的不同

    • Node节点中加了volatile关键字

      volatile V val
      volatile Node<K,V> next
      
    • 新增find方法,便于get操作,把查找的工作全部交给节点,其实也是因为加入了红黑树所以查找单独封装在相应节点更好(并发)

    • 不能直接通过Node中的setValue直接设置,为了解决并发问题只能用ConcurrentHashMap封装的CAS操作

    • 已经知道了HashMap中大于8节点的桶会变成红黑树桶,但是与HashMap不同的是,ConcurrentHashMap的桶是TreeBin,其封装了红黑树增删以及旋转节点操作,并且存了树的根节点,其里面存的才是TreeNode

      并且:TreeBin中通过一个简单的volatile型变量通过CAS操作来实现了一个简易读写锁来支持并发

    扩容

    前面说了在HashMap的时候很容易由于扩容问题在并发中造成环节点,但是8中的ConcurrentHashMap也摒弃了之前的分段锁,使用了更优美的方法:

    • ConcurrentHashMap中加入了标志量为Moved的节点ForwardingNode,其储存了新扩容的table,这个节点只有在发送扩容时才存在
    • 如果一个元素put完之后需要扩容了,就会进入流程,每个原来Table的桶扩容到新的table完了后都会在原来的桶中加入ForwardingNode节点代表已经扩容,桶里没有东西的,也直接加入ForwardingNode。扩容的时候也会对每个桶用synchronized加锁,避免并发问题
    • 只要不超过最大并发数,ConcurrentHashMap允许并发的扩容加快速度。所以,如果在并发时如果put操作发现了ForwardingNode节点,有线程在进行并发,所以加入扩容,并且CAS更新sizeCtl全局变量代表当前并发数
    • 在扩容的时候和新版的HashMap不一样,每个桶其不管是普通节点还是树节点,扩容后构建的都是原来倒序的
    • 遍历完所有节点,就完成了复制工作,让扩容的table作为新的table

    Put操作

    由于摒弃了分段锁,所以不再是以前那样的两次hash,这次通过不允许全部的KEYValue为null,用CAS操作和每个桶加锁来保证并发的安全:

    • hash计算出在哪个桶之后,进入死循环,知道插入成功为止
    • 如果这个桶空的,那么就直接CAS放入,不用加锁
    • 如果这个桶是ForwardingNode(Moved)节点,则加入并发扩容
    • 不是以上就给桶上锁(synchronized),之后用是普通节点遍历节点插入,不然就用树的插入方法插入

    get操作

    这个比较简单,而且ConcurrentHashMap直接把查找节点封装到了桶,直接调用find就好

    ConcurrentHashMap源码分析(JDK8版本)

    ConcurrentLinkedQueue

    与HashMap的同步处理相比,这个简单的多

    ConcurrentLinkedQueue采用非阻塞,不加锁的CAS算法来保证并发的问题

    类中有volatile修饰的head和tail变量,下面以入队列offer为例,出队列是一样的逻辑

    • 最简单的可以想到每次添加过后CAS去更新tail就可以了,但是这样效率太低了,所以Doug Lea用了一种非常巧妙的方法
    • 所以添加之后尽量减少tail的CAS操作,在以前的版本有一个HOPS常量,默认是1,当前tail节点和真正的尾节点差值大于HOPS才会CAS更新tail,而如果小于则不会更新这样就减少了CAS操作,并发的效率变高,所以才有很多地方都有的结论,tail节点不总是尾节点
    • java8源码里面把HOPS这个变量去了,直接去判断如果tail节点的next节点不为空,则将入队节点CAS设置成tail节点(相当于大于HOPS=1),如果tail节点next为null则直接把新节点放到tail后面,不更新tail

    因为ConcurrentLinkedQueue是无界的所以offer始终返回true,不要通过返回值去判断入队成功

    ConcurrentLinkedQueue源码分析

    非阻塞算法在并发容器中的实现

    BlockingQueue

    阻塞队列完全是由生产者与消费者场景产生的,其在队列基础上支持两个阻塞的插入与删除操作

    • ArrayBlockingQueue:是一个基于数组实现的有界阻塞队列,可以实现公平与非公平阻塞
    • LinkedBlockingQueue:基于链表实现的有界阻塞队列,最大为Integer.MAX_VALUE
    • PriorityBlockingQueue:支持优先级(即可用comparator定义排序规则,但是不能保证同优先级顺序)的无界阻塞队列
    • DelayQueue:支持延时获取元素无界阻塞队列,内部由一个PriorityQueue实现,其入队的元素需要实现Delay接口
    • SynchronousQueue:不储存元素的阻塞队列,每个put都要等一个take,否则不能继续添加,支持公平与非公平
    • LinkedTransferQueue:对于其他阻塞队列,多了tryTransfertransfer方法,意思就是先尝试立即给消费者,如果没有,才会加入队列
    • LinkedBlockingDeque:双端阻塞队列

    原理

    ArrayBlockingQueue为例,使用并发中典型的通知模式,就比如,如果队列满了,则添加就会阻塞,直到消费了一个之后再通知生产者当前队列可以用

    • 其内部有一个ReentrantLock,用来生成阻塞生产与消费的两个方法的Condition变量notEmptynotFull

      /** Main lock guarding all access */
      final ReentrantLock lock;
      /** Condition for waiting takes */
      private final Condition notEmpty;
      /** Condition for waiting puts */
      private final Condition notFull;
      
    • 由于主要基于生产与消费,所以不会考虑一般队列那样的并发处理,在每次入队出队都会加锁

    • 每次消费如果容器count==0则会notEmpty.await(),而生产的时候都会调用notEmpty.signal(),同理生产阻塞

    Executor框架

    结构

    主要由以下组成

    • 任务:Runnable接口或者Callable接口的实现类
    • 执行者(线程调度者):Executor接口的实现类,其主要两个实现类:ThreadPoolExecutorScheduledThreadPoolExecutor
    • 计算结果:如果任务是Callable接口,那么会返回结果接口Future的实现类
    Executor结构图

    ThreadPoolExecutor

    流程

    其是Executor的一个主要实现了,下面三个线程池都继承它,其内部有一个核心线程池corePool,一个最大线程池maximumPool,和一个执行队列BlockingQueue,其执行的逻辑如下:

    ThreadPoolExecutor执行图
    1. 如果当前线程少于corePoolSize则获取全局锁,创建线程来执行任务
    2. 如果允运行的线程大于corePoolSize则把任务加入执行队列BlockingQueue
    3. 如果队列都满了,那就再获取全局锁,创建新线程执行任务
    4. 如果maximumPool都满了,那就使用RejectExecutionHandler拒绝任务

    其中线程池创建线程时会将线程封装在一个Worker里面,每次worker执行完任务后会循环获取任务队列中的任务来执行

    一般直接用ThreadPoolExecutor还是太复杂,所以用下面四个:

    FixedThreadPool

    可重用固定线程数的线程池,将corePoolSizemaximumPool设置相同,所以没有了第3步,并且其内使用LinkedBlockingQueue来作为工作队列,所以只要小于Integer.MAX_VALUE,都会入队,如果池满了就一直等待,线程Worker做完任务如果没有新的则立即结束

    SingleThreadExecutor

    单个线程的ExecutorcorePoolSizemaximumPoolSize都是1,维护一个工作队列

    CachedThreadPool

    有缓存线程的线程池,corePoolSize为0,maximumPoolSizeInteger.MAX_VALUE,并且每个线程Worker执行完后会等待60s没有任务才关闭,其内部工作队列为SynchronousQueue是没有容量的阻塞队列,所以每个任务只要没有空闲线程,都会直接开一个新的线程Worker,极端情况下CachedThreadPool会因为创建过多的线程而耗尽CPU和内存资源

    ScheduledThreadPoolExecutor

    支持延时以及周期的执行任务,其将maximumPool置为无效,并且内部使用DelayQueue来周期执行任务

    • 池中的任务全部封装成一个ScheduledFutureTask,包含了任务要被执行的时间time,任务执行的间隔时间period
    • 获取任务:因为DelayQueue内部是一个PriorityQueue,根据每个Task的time来优先级排序,每次获取任务都用阻塞队列take操作直到有time大于当前时间
    • 周期任务:周期任务执行完后会修改task中的time为下次执行时间,再将其放回队列

    相关文章

      网友评论

        本文标题:并发整理(三)— 并发集合类与线程池

        本文链接:https://www.haomeiwen.com/subject/etamxxtx.html