美文网首页
BiBi - 并发编程 -11- 并发容器

BiBi - 并发编程 -11- 并发容器

作者: 奋飞的蜗牛ing | 来源:发表于2019-12-10 00:45 被阅读0次

    From:Java并发编程的艺术

    1. ConcurrentHashMap

    JDK1.7中的ConcurrentHashMap
    ConcurrentHashMap使用锁分段技术

    HashTable效率低下,使用synchronized来保证线程安全,线程1使用put进行元素添加,线程2不但不能使用put进行添加也不能使用get来获取元素。

    ConcurrentHashMap把数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其它段的数据也能被其它线程访问。

    ConcurrentHashMap由Segment数组和HashEntry数组组成,其中Segment继承ReentrantLock,static class Segment<K,V> extends ReentrantLock,扮演锁的角色;HashEntry用于存储键值对数据。Segment和HashEntry的数据结构类似为:数组和链表结构。一个ConcurrentHashMap中包含一个Segment数组,每个Segment中包含一个HashEntry数组。当对HashEntry数组进行修改时,需要获得与他对应的Segment锁。

    ConcurrentHashMap实现细节

    1)为了使用【按位与的散列算法】来定位segments数组的索引,segments数组的长度必须是2的N次方。

    2)定位Segment。使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列,来减少散列冲突。

    3)get操作不需要加锁。原因是他的get方法里将要使用的共享变量都定义成了volatile类型,如:用于统计当前Segment大小的count和用于存储值的HashEntry的value。

    4)put需要加锁。在插入元素前会先判断Segment里的HashEntry数组是否超过容量,在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

    put操作具体细节:Segment实现了ReentrantLock,也就带有锁的功能,当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。

    5)size操作。第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的;第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回。

    JDK1.8中的ConcurrentHashMap

    JDK1.8中的ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,Node数组+链表/红黑二叉树。synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

    Node结点
    static class Node<K,V> implements Map.Entry<K,V> {
      final int hash;
      final K key;
      volatile V val;
      volatile Node<K, V> next;
    }
    

    注意:val和next为volatile类型。key和hash为final,即只允许对数据进行查找,不允许进行修改。【HashMap的key和hash也是final】

    TreeNode

    TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树。

    static final class TreeNode<K,V> extends Node<K,V> {
      TreeNode<K, V> parent;  // red-black tree links
      TreeNode<K, V> left;
      TreeNode<K, V> right;
      TreeNode<K, V> prev;    // needed to unlink next upon deletion
      boolean red;
    }
    
    TreeBin

    TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制。

    static final class TreeBin<K,V> extends Node<K,V> {
      TreeNode<K, V> root;
      volatile TreeNode<K, V> first;
      volatile Thread waiter;
      volatile int lockState;
      // values for lockState
      static final int WRITER = 1; // set while holding write lock
      static final int WAITER = 2; // set when waiting for write lock
      static final int READER = 4; // increment value for setting read lock
    }
    
    put

    对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。
    1)如果没有初始化就先调用initTable()方法来进行初始化过程。
    2)如果没有hash冲突就直接CAS插入。
    3)如果还在进行扩容操作就先进行扩容。
    4)如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入。
    5)最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环。
    6)如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容。

    • 在某个桶的迁移过程中,别的线程想要对该桶进行put操作怎么办?
      一旦某个桶在迁移过程中了,必然要获取该桶的锁,所以其他线程的put操作要被阻塞,一旦迁移完毕,该桶中第一个元素就会被设置成ForwardingNode节点,所以其他线程put时需要重新判断下桶中第一个元素是否被更改了,如果被改了重新获取重新执行逻辑,如下代码

    • 某个桶已经迁移完成(其他桶还未完成),别的线程想要对该桶进行put操作怎么办?
      该线程会首先检查是否还有未分配的迁移任务,如果有则先去执行迁移任务,如果没有即全部任务已经分发出去了,那么此时该线程可以直接对新的桶进行插入操作(映射到的新桶必然已经完成了迁移,所以可以放心执行操作)

    get

    1)计算hash值,定位到该table索引位置,如果是首节点符合就返回。
    2)如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find()方法,查找该节点,匹配就返回。
    3)以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null。
    虽然ConcurrentHashMap的读不需要锁,但是需要保证能读到最新数据,所以必须加volatile。即数组的引用需要加volatile,同时一个Node节点中的val和next属性也必须要加volatile。

    size

    在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了。

    扩容

    https://juejin.im/post/5b00160151882565bd2582e0
    其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。
    ForwardingNode的作用就是支持扩容操作,用于占位,将已处理的节点和空节点置为ForwardingNode,并发处理时当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点往后遍历。

    当线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。

    多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。

    2. 队列

    非阻塞队列ConcurrentLinkedQueue

    使用循环CAS方式实现。
    入队列:循环获取尾结点,只有当next节点为null时,才能添加元素。【使用CAS算法将入队节点设置成尾结点的next节点,如不成功则重试】尾结点可能是tail节点,也可能是tail节点的next节点。

    阻塞队列的种类

    1)ArrayBlockingQueue 数组结构的有界阻塞队列
    2)LinkedBlockingQueue 链表结构的无界阻塞队列
    3)PriorityBlockingQueue 支持优先级排序的无界阻塞队列
    4)DelayQueue 使用优先级队列实现的无界阻塞队列
      支持延时获取元素的无界阻塞队列,适用于:缓存系统的设计和定时任务的调度
    5)SynchronousQueue 一个不存储元素的阻塞队列
      每一个put操作必须等待一个take操作,否则不能继续添加元素。
    6)LinkedTransferQueue 链表结构的无界阻塞队列
      transfer方法:可以把生产者传入的元素立刻transfer给消费者,等消费者消费了才能返
      回,期间CPU进行自旋。
      tryTransfer方法:该方法无论 消费者是否接收,方法都会立刻返回。
    7)LinkedBlockingDeque 链表结构的双向阻塞队列
      运用在【工作窃取】模式中。

    相关文章

      网友评论

          本文标题:BiBi - 并发编程 -11- 并发容器

          本文链接:https://www.haomeiwen.com/subject/lteplqtx.html