美文网首页回家
Java 并发容器 ConcurrentHashMap 实现原理

Java 并发容器 ConcurrentHashMap 实现原理

作者: tandeneck | 来源:发表于2020-05-23 22:51 被阅读0次

    前言

    之前我们了解了 HashMap(HashMap 基于 JDK 1.7 源码解析),知道其是非线程安全的,当我们只有一个线程在使用 HashMap 的时候,自然不会有问题,但是并不适用于多线程情况,可以通过 Collections.synchronizedMap() 方法或者替换为 Hashtable,但是这两种方式基本都是对整个表结构做锁定操作,性能不是很理想,这时我们可以使用 ConcurrentHashMap,需要注意的是,ConcurrentHashMap 也分 1.7,1.8版本,两者实现上略有不同。

    1.7 版本

    数据结构

    ConcurrentHashMap 数据结构为一个 Segement 数组,Segment 的数据结构为 HashEntry 数组,而 HashEntry 存储的是键值对,可以构成链表。如下图所示:

    Segment 和 HashEntry

    Segment 类继承 ReentrantLock 类,从而使 Segment 对象能充当锁的角色,不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上每当一个线程占用锁访问一个 Segment 时,不会影响其他的 Segment,这就是 ConcurrentHashMap 的锁分段技术。Segment 源码如下:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {  
     private static final long serialVersionUID = 2249069246763182397L;  
             /** 
              * 在本 segment 范围内,包含的 HashEntry 元素的个数
              * 该变量被声明为 volatile 型,保证可见性,即每次读到都是
              * 最新数据
              */  
             transient volatile int count;  
    
             /** 
              * table 被更新的次数
              */  
             transient int modCount;  
    
             /** 
              * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
              */  
             transient int threshold;  
    
             /** 
              * table 是由 HashEntry 对象组成的数组
              * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
              * table 数组的数组成员代表散列映射表的一个桶
              * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
              * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 
              */  
             transient volatile HashEntry<K,V>[] table;  
    
             /** 
              * 负载因子
              */  
             final float loadFactor;  
     }
    

    其中的 HashEntry:

     static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         final HashEntry<K,V> next;  
     } 
    

    HashEntry 几乎是不可变的,代表的是每个 hash 链中的一个节点。可以看到除了 value 其余都是 fianl 的,这意味着不能从 hash 链的中间或尾部添加加或者删除节点,因为这需要修改 next 引用值,所有的节点的修改值只能从头部开始。

    put 方法

    代码如下:

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
    

    通过 key 找到 Segment,再在 Segment 中执行 put 操作。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 加锁
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue; // 旧值
        try {
            HashEntry<K,V>[] tab = table; // table 数组
            int index = (tab.length - 1) & hash; //找到在 table 中的索引
            HashEntry<K,V> first = entryAt(tab, index); // 根据索引获取到 HashEntry
            // 遍历 HashEntry 链表
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                     // key 存在,更新值
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node); //扩容
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock(); // 释放锁
        }
        return oldValue;
    }
    

    由于 HashEntry 不能保证原子性(volatile 只能保证可见性),因此要对 put 操作要做加锁处理。

    get 方法

    public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
    

    get 逻辑比较简单,如下:

    • 根据 key 值计算 hash 定位到具体的 Segment。
    • 再通过一次 hash 定位到具体的位置上。
      由于 HashEntry 的 value 是用 volatile 修饰的,保证了可见性,所以每次获取到都是最新值,get() 方法是非常高效的,因为整个过程中都不用加锁。

    Remove 方法

        V remove(Object key, int hash, Object value) {
            lock(); //加锁
            try {
                int c = count - 1;
                HashEntry<K, V>[] tab = table;
                //根据 hash 找到 table 的索引
                int index = hash & (tab.length - 1);
                //找到索引对应的那个桶
                HashEntry<K, V> first = tab[index];
                HashEntry<K, V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;
    
                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) { //找到要删除的节点
                        oldValue = v;
                        ++modCount;
                        HashEntry<K, V> newFirst = e.next;// 待删节点的后继结点
                        for (HashEntry<K, V> p = first; p != e; p = p.next)
                            //头插法
                            newFirst = new HashEntry<K, V>(p.key, p.hash,
                                    newFirst, p.value);
                        //把桶链接到新的头结点
                        //新的头结点是原链表中,删除节点之前的那个节点
                        tab[index] = newFirst;
                        count = c;     
                    }
                }
                return oldValue;
            } finally {
                unlock(); //释放锁
            }
        }
    

    整个操作是在持有段锁的情况下执行的,需要注意的是删除结点后,该删除结点前面的结点会倒序,如下图:


    1.8 版本

    数据结构

    1. 8 的实现抛弃了 Segment 分段锁机制,利用 CAS + synchronized 来保证并发更新的安全,底层采用数组 + 链表 + 红黑树的数据结构,用 Node 替换 了 HashEntry(名称改变了而已),如下图所示:


    put 方法

        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode()); //计算 hash
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh; K fk; V fv;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable(); // 初始化 table
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;                   // 利用 CAS 尝试写入数据
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f); // 扩容处理
                else if (onlyIfAbsent // check first node without acquiring lock
                         && fh == hash
                         && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                         && (fv = f.val) != null)
                    return fv;
                else {
                    V oldVal = null;
                    synchronized (f) { // 利用 sychronized 获取锁
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key, value);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                            else if (f instanceof ReservationNode)
                                throw new IllegalStateException("Recursive update");
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    get 方法

       public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            int h = spread(key.hashCode()); // 计算 hash 值
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                if ((eh = e.hash) == h) {
                    //在桶上直接返回
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                else if (eh < 0)
                    // 通过红黑树获取值
                    return (p = e.find(h, key)) != null ? p.val : null;
                while ((e = e.next) != null) { // 链表单向查询
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    总结

    1.7 版本主要使用了分段锁的机制,将数据分成许多个 Segment,每个 Segment 拥有一把锁,当一个线程占用锁访问其中一个 Segment 的时候,其他 Segment 的数据也能被其他线程访问。有些方法需要跨 Segment,比如 size() 和 containsValue(),它们可能需要锁定整个表而而不仅仅是某个Segment,这需要按顺序锁定所有 Segment,操作完毕后,又按顺序释放所有段的锁,否则可能会出现死锁。

    1.8 版本 抛弃了 1.7 版本的分段锁机制,引入了 CAS + Synchronized 的机制,数据结构也变成了 数组 + 链表 + 红黑树 的形式,CAS 是非常轻量级的同步操作,而且也跟新版本虚拟机对 Synchronized 的优化有关,有兴趣的同学可以去具体了解。

    参考

    相关文章

      网友评论

        本文标题:Java 并发容器 ConcurrentHashMap 实现原理

        本文链接:https://www.haomeiwen.com/subject/kkmfahtx.html