美文网首页java并发编程
ConcurrentHashMap解析

ConcurrentHashMap解析

作者: wangpeng123 | 来源:发表于2020-12-12 23:06 被阅读0次

    关注微信公众号:程序猿的日常分享,定期更新分享。

    上一篇我们从HashMap的数据结构及线程安全方面进行了一个了解,我们都知道HashMap是线程不安全的,所以在多线程环境下,为了线程安全我们有3种选择:ConcurrentHashMap、HashTable、Collections.SynchronizedMap(new HashMap()),后两种的实现原理类似,就是几乎每个方法都被 synchronized 关键字所修饰了,这也就保证了线程安全。由于每个方法都加上同步锁,导致性能下降,所以从性能考虑我们的最优选择是ConcurrentHashMap,那么接下来我们详细分析ConcurrentHashMap的实现原理。

    Java 7 版本中的 ConcurrentHashMap

    image.png

    从图中我们可以看出,在 ConcurrentHashMap 内部进行了 Segment 分段,Segment 继承了 ReentrantLock,可以理解为一把锁,各个 Segment 之间都是相互独立上锁的,互不影响。相比于之前的 Hashtable 每次操作都需要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。

    每个 Segment 的底层数据结构与 HashMap 类似,仍然是数组和链表组成的拉链法结构。默认有 0~15 共 16 个 Segment,所以最多可以同时支持 16 个线程并发操作(操作分别分布在不同的 Segment 上)。16 这个默认值可以在初始化的时候设置为其他值,但是一旦确认初始化以后,是不可以扩容的。

    Java 8 版本的 ConcurrentHashMap

    image.png

    在jdk1.8中,将元素改为了Node,每个node下存在两种数据结构,分别是链表和红黑树,通HashMap的数据结构类似,并且逻辑也是一样的,当链表的长度大于等于8的时候,并且容量满足要求,链表会转化为红黑树。
    我们先来看看最基础的内部存储结构 Node,如这段代码所示:

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }
    

    可以看出,每个 Node 里面是 key-value 的形式,并且把 value 和 内部的next 用 volatile 修饰,以便保证可见性,next 指针的作用是方便产生链表结构。

    ConcurrentHashMap在put方法中采用了cas+synchronized的方式保证了线程安全,主要过程如下:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) {
            throw new NullPointerException();
        }
        //计算 hash 值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;
            //如果数组是空的,就进行初始化
            if (tab == null || (n = tab.length) == 0) {
                tab = initTable();
            }
            // 找该 hash 值对应的数组下标
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置是空的,就用 CAS 的方式放入新值
                if (casTabAt(tab, i, null,
                        new Node<K, V>(hash, key, value, null))) {
                    break;
                }
            }
            //hash值等于 MOVED 代表在扩容
            else if ((fh = f.hash) == MOVED) {
                tab = helpTransfer(tab, f);
            }
            //槽点上是有值的情况
            else {
                V oldVal = null;
                //用 synchronized 锁住当前槽点,保证并发安全
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //如果是链表的形式
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历链表
                            for (Node<K, V> e = f; ; ++binCount) {
                                K ek;
                                //如果发现该 key 已存在,就判断是否需要进行覆盖,然后返回
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) {
                                        e.val = value;
                                    }
                                    break;
                                }
                                Node<K, V> pred = e;
                                //到了链表的尾部也没有发现该 key,说明之前不存在,就把新值添加到链表的最后
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K, V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        //如果是红黑树的形式
                        else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            binCount = 2;
                            //调用 putTreeVal 方法往红黑树里增加数据
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent) {
                                    p.val = value;
                                }
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //检查是否满足条件并把链表转换为红黑树的形式,默认的 TREEIFY_THRESHOLD 阈值是 8
                    if (binCount >= TREEIFY_THRESHOLD) {
                        treeifyBin(tab, i);
                    }
                    //putVal 的返回是添加前的旧值,所以返回 oldVal
                    if (oldVal != null) {
                        return oldVal;
                    }
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    总结ConcurrentHashMap在java7和8的异同点和优缺点

    数据结构

    Java 7 采用 Segment 分段锁+哈希表实现
    Java 8 中的 ConcurrentHashMap 使用Node + 链表 + 红黑树

    并发度

    Java 7 中,每个 Segment 独立加锁,最大并发个数就是 Segment 的个数,默认是 16。
    Java 8 中,锁粒度更细,理想情况下table 数组元素的个数(也就是数组长度)就是其支持并发的最大个数,并发度比之前有提高。

    线程安全

    Java 7 采用 Segment 分段锁来保证安全,而 Segment 是继承自ReentrantLock。

    Java 8 中放弃了 Segment 的设计,采用 Node + CAS + synchronized 保证线程安全。

    Hash 碰撞

    Java 7 在 Hash 冲突时,会使用拉链法,也就是链表的形式。

    Java 8 先使用拉链法,在链表长度超过一定阈值时,将链表转换为红黑树,来提高查找效率。

    查询时间复杂度

    Java 7 遍历链表的时间复杂度是 O(n),n 为链表长度。

    Java 8 如果变成遍历红黑树,那么时间复杂度降低为 O(log(n)),n 为树的节点个数。

    关注微信公众号:程序猿的日常分享,定期更新分享。

    相关文章

      网友评论

        本文标题:ConcurrentHashMap解析

        本文链接:https://www.haomeiwen.com/subject/gvnygktx.html