美文网首页
对于ConcurrentHashMap的理解

对于ConcurrentHashMap的理解

作者: 倚仗听江 | 来源:发表于2020-09-08 15:24 被阅读0次

    简单来说, ConcurrentHashMap 是一个 Segment 数组, Segment 通过继承ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

    JDK1.7

    jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:


    concurrentHashMap1.7.png

    ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每个Segment中HashEntry数组的大小cap,并初始化Segment数组的第一个元素;其中ssize大小为2的幂次方,默认为16,cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量initialCapacity进行计算,计算过程如下:

    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
    

    ssize是Segment数组的大小,c是当前初始容量和Segment数组长度下的最小容量,再经过位运算,得到是2的次方幂的cap的大小,cap即为HashEntry数组的大小。

    其中Segment在实现上继承了ReentrantLock,这样就自带了锁的功能。

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
    ...
    }
    

    put方法:
    当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,如果相应位置的Segment还未初始化,则通过CAS进行赋值,接着执行Segment对象的put方法通过加锁机制插入数据,实现如下:

    public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }
    

    计算当前元素在Segment数组中的下标

    int j = (hash >>> segmentShift) & segmentMask;
    

    当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。

    场景:线程A和线程B同时执行相同Segment对象的put方法
    1、线程A执行tryLock()方法成功获取锁,则把HashEntry对象插入到相应的位置;
    2、线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;
    3、当线程A执行完插入操作时,会通过unlock()方法释放锁,接着唤醒线程B继续执行;

    size实现
    因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了如下方式

    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    

    先采用不加锁的方式,连续计算元素的个数,最多计算3次:
    1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
    2、如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;

    ConcurrentHashMap1.7的时候底层用的是一个Segment数组,每个Segment对象下面又有一个被volatile关键字修饰的HashEntry数组。在调用构造方法进行初始化的时候,会先根据Segment数组的大小和初始化容量计算出每个HashEntry数组的大小,默认情况下Segment数组的长度为16,HashEntry数组的长度为2。在调用put方法的时候,首先会根据key值计算出hash值,再根据hash值计算出对应Segment数组的索引,然后去判断Segment数组这个位置上是否存在对象。如果不存在对象,那么就需要去生成一个Segment对象。这里ConcurrentHashMap没有加锁但却保证了线程安全。它采用的是CAS的思想,不断的去判断这个位置上的对象是否为空,如果为空则生成一个Segment对象,如果不为空,那么就返回这个Segment对象就可以了。然后就可以去调用Segment对象的put方法了。Segment对象它是继承自ReentrantLock的,所以它是有一些锁的特性的,可以去tryloock、lock、unlock。在put方法里面,它是先去trylock了一下,trylock是一种非阻塞的获取锁,就是如果获取到锁了那就获取到了,如果获取不到的话我不阻塞,而是直接返回一个false。ConcurrentHashMap就是这样,如果第一次没有获取到锁的话,那么就会用自旋锁的思想,不断的尝试去trylock,直到超过最大重试次数。这个最大重试次数是根据你的处理器核数来确定的,单核CPU的最大重试次数只有1,而多核CPU的重试次数有64。超过了最大重试次数之后,它就不会不断去trylock了,而是直接lock,用这种阻塞式的获取锁,等待其他线程释放锁之后,我再去获取锁,去进行元素的赋值操作。还有一点就是,ConcurrentHashMap对于元素的操作其实和HashMap很像,它也是数组加链表的形式,它同样也允许key值重复进行覆盖。

    JDK 1.8

    改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

    改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

    去除 Segment + HashEntry + Unsafe 的实现,
    改为 Synchronized + CAS + Node + Unsafe 的实现
    其实 Node 和 HashEntry 的内容一样,但是HashEntry是一个内部类。
    用 Synchronized + CAS 代替 Segment ,这样锁的粒度更小了,并且不是每次都要加锁了,CAS尝试失败了在加锁。

    1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:


    concurrentHashMap1.8.png

    Node是ConcurrentHashMap存储结构的基本单元,就是一个链表,但是只允许对数据进行查找,不允许进行修改。

    static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;
    ...
    }
    

    TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构。

    static final class TreeNode<K,V> extends Node<K,V> {
            TreeNode<K,V> parent;  // red-black tree links
            TreeNode<K,V> left;
            TreeNode<K,V> right;
            TreeNode<K,V> prev;    // needed to unlink next upon deletion
            boolean red;
    ...
    }
    

    put方法:

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) { //对这个table进行迭代
            Node<K,V> f; int n, i, fh;
            //这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) { //表示该节点是链表结构
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里涉及到相同的key进行put就会覆盖原先的value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {  //插入链表尾部
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//红黑树结构
                            Node<K,V> p;
                            binCount = 2;
                            //红黑树结构旋转插入
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//统计size,并且检查是否需要扩容
        return null;
    }
    
    1. 如果没有初始化就先调用initTable()方法来进行初始化过程。
    2. 如果没有hash冲突就直接CAS插入。
    3. 如果还在进行扩容操作就先进行扩容。
    4. 如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入,。
    5. 最后一个如果该链表的数量大于阈值8,就要先转换成红黑树的结构,break再一次进入循环。
    6. 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容。

    resize方法
    1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:

    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    

    支持并发扩容,HashMap扩容在1.8中由头插改为尾插(为了避免死循环问题),ConcurrentHashmap也是,迁移也是从尾部开始,扩容前在桶的头部放置一个hash值为-1的节点,这样别的线程访问时就能判断是否该桶已经被其他线程处理过了。

    size方法

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a; //变化的数量
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了,可以注意一下Put函数,里面就有addCount()函数,早就计算好的,然后你size的时候直接给你,JDK1.7是在调用size()方法才去计算。

    参考:
    https://blog.csdn.net/xingxiupaioxue/article/details/88062163
    https://blog.csdn.net/u013374645/article/details/88700927
    https://www.jianshu.com/p/e694f1e868ec

    相关文章

      网友评论

          本文标题:对于ConcurrentHashMap的理解

          本文链接:https://www.haomeiwen.com/subject/kegvsktx.html