美文网首页
剖析Java 集合框架(四)-ConcurrentHashMap

剖析Java 集合框架(四)-ConcurrentHashMap

作者: 李元霸抢貂蝉 | 来源:发表于2020-11-28 21:50 被阅读0次

    周末早上又被咚咚咚修路的声音弄醒,早上醒来第一件是看手表,哦不是,是看手机,看昨晚睡了多少分。哈哈哈,这是我戴手环为二作用之一,还有一个作用是座久了会滴滴滴提醒,对于我这种不管玩游戏还是写代码座下基本不起来的人还是很不错的。
    今天看了几小时的《构建之法》看点有点云里雾里,一大堆的名词,各种方法理论,感觉很高大上,看过就忘了写了啥,真得把读后感提上日程,希望能有所感悟!

    前言

    我们都知道HashMap在多线程环境下存在线程安全的问题,为了处理这种情况,一般有如下这些方案:

    • 使用 Collections.synchronizedMap(Map)创建线程安全的Map。其实就是内部维护了一个普通 Map 对象和一个 Object 对象作为排它锁。调用任何方法都是先锁排它锁然后直接执行 Map 对象的方法。
    • 使用 HashTable。每个方法都加了锁。效率低下
    • 使用 ConcurrentHashMap。主要为多线程下安全操作 Map 而生。
    • 使用 ConcurrentSkipListMap。适应多线程下需要排序的键值映射数据集合,下章具体分析。

    本文的主角为 ConcurrentHashMap,使用局部加锁代替全局锁,大地提高了并发环境下的操作速度。在JDK1.7和1.8中的使用有很大不同,我们来逐一看看这么原理,理解的同时也参悟优化的方向。

    ConcurrentHashMap 1.7

    1.数据结构

    在JDK1.7中 ConcurrentHashMap 采用了"分段锁"结构,把整个数据分成几段,每次操作都只锁数据所在的那段。段的名称为 Segment,是一个类似于 HashMap 的结构。每个 Segment 里维护了一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,用于存储键值对数据。HashEntry 数组可以进行扩容,但是 Segment 的个数一旦初始化就不能改变,默认 Segment 的个数是 16 个,也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。

    ConcurrentHashMap 1.7数据结构
    /**
    * The segments, each of which is a specialized hash table.
    */
    final Segment<K,V>[] segments;
    
    // Segment 继承自 ReentrantLock。
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        
        //  HashEntry 对象组成的数组,真正存放数据的桶
        // table 数组的数组成员代表散列映射表的一个桶
        // 如要访问table里面的数据,则需先访问到该segment
        transient volatile HashEntry<K,V>[] table;
        // 在该 segment 对象内包含 HashEntry 元素的个数,
        // 该变量被声明为 volatile 型,保证每次读取到最新的数据
        transient volatile int count;
        transient int modCount;
        // 触发table扩容的阈值
        transient int threshold;
        // 负载因子
        final float loadFactor;
    }
    
    static final class HashEntry<K,V> {
      // final修饰 避免修改
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }
    

    2.构造方法

    通过默认的无参构造方法看初始化过程

    // 默认初始化容量
    static final int DEFAULT_INITIAL_CAPACITY = 16; 
     // 默认负载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f; 
     // 默认并发级别
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
        
    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
    
    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 校验并发级别大小,最大为 65536
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 2的多少次方
        int sshift = 0;
        int ssize = 1;
        // 将ssize扩充为2的n次方
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 记录段偏移量
        this.segmentShift = 32 - sshift;
        // 记录段掩码
        this.segmentMask = ssize - 1;
        // 设置容量
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //  计算每个 Segment 中的table的容量
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        //Segment 中 table 的容量至少是2的n次幂
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 创建 Segment 数组,设置 segments[0]
        Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
    

    初始化后 concurrencyLevelsegments 数组大小)不能改变,不能再扩容,这样,即使超过了threshold,触发了扩容,rehash 也不会影响到 segments 数组,保证了在大并发的情况下,只会影响某一个 segmentrehash 而其他 segment 不会受到影响。

    3.put

    总共需要两次 hash

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        // 先第一次hash 计算出segment所在的位置
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            // 如果查找到的 Segment 为空,初始化 因为初始化时 只new一个segment
            s = ensureSegment(j);
        // segment的put方法
        return s.put(key, hash, value, false);
    }
    

    上面的代码即为第一次hash。

    1. 计算要 put 的 key 对应 Segment 所在的位置。
    2. 如果对应位置的 Segment 为空,则初始化这个 Segment。
    3. 调用segment.put方法。
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 获取锁
        HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            // 第二次hash 计算对应segment里table的位置
            int index = (tab.length - 1) & hash;
            // CAS 获取 index 对应的值
            HashEntry<K,V> first = entryAt(tab, index);
            // 死循环
            for (HashEntry<K,V> e = first;;) {
            // 如果 key 已经存在,并且和 要插入的key 相同,则替换 value
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    // 有冲突,链表头插法。
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    // 是否需要扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        // 设置到table的index下标 值为node
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    

    第二次hash过程:

    1. 获取锁
      1. tryLock() 获取锁,获取成功返回node
      2. 获取不到使用 scanAndLockForPut() 自旋获取锁。尝试自旋获取锁,当重试的次数达到了 MAX_SCAN_RETRIES 则阻塞等到直到获取到锁。
    2. 计算出 segment 里的 table 对应 key 所在的下标位置 index。
    3. 死循环遍历链表元素
      • 不为空则判断该元素的key是否和要插入的key一样,一样则覆盖旧元素的value。不一样则继续下一个元素判断
      • 是否触发扩容。
      • 使用头插法插入。
    4. 释放锁。

    4.get

    get 逻辑比较简单,也就是上面的两次hash过程。

    1. 先通过一次 hash 定位到 segment
    2. 再通过一次 hash 定位的 segment 里具体的元素。
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
    

    HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。整个过程都不需要加锁,因此 ConcurrentHashMap 的 get 方法是非常高效的。因为整个过程都不需要加锁


    JDK1.7的 ConcurrentHashMap 还是数组+链表的结构,链表线性遍历会导致性能下降。跟JJDK1.7的 HashMap 一样的问题,我们来看JDK1.8做了哪些优化。

    ConcurrentHashMap 1.8

    1.数据结构

    JDK1.8 的 ConcurrentHashMap 相对于 JDK1.7 来说变化比较大。摒弃了以前的Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表红黑树。和1.8 HashMap 类似的结构。把之前的HashEntry改成了Node,值和 next 采用了 volatile 去修饰,保证了可见性,并且在链表大于一定值(默认是8)的时候会转换红黑树。
    借用HashMap的图

    借用HashMap的图
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }
    

    弃用了原有的 Segment 分段锁,而采用了 Node + CAS + synchronized 来保证并发安全性。

    volatile

    • 实现可见性:一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
    • 禁止指令重排: 不管是虚拟机还是cpu的优化都不会对其指令重排。
    • volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。

    CAS

    CAS:Compare and Swap,即比较再交换。
    CAS需要有3个操作数:内存地址V,旧的预期值A,即将要更新的目标值B。
    CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。
    CAS 存在ABA问题。
    volatile 和 CAS 还有 synchronized 的优化,锁升级过程,还会在多线程相关分享中着重介绍,先挖个坑。

    2.put

    直接上源码

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key 或 value 为空  直接抛异常
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            // f = 对应位置元素, 
            // n = table.length
            // i = 对应位置下标
            // fh 对应位置元素的 hash 值
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化数组桶(自旋+CAS)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 位置为空 不加锁 CAS 放入,成功直接退出循环
                if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                    break;  // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // synchronized 锁 对应位置的元素
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 说明是链表
                        if (fh >= 0) {
                            binCount = 1;
                            // 循环加入新的或者覆盖节点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    大概流程如下:

    1. key/value 非空判断。
    2. 根据 key.hashcode 计算出 hash 。
    3. 死循环(执行下面分支后,还能再重新执行)
    4. 判断 table 是否需要始化。
    5. 根据 hash 定位到 table 对应的 Node 赋值给f,
    6. 判断是否为空,如果为空则利用 CAS 尝试写入,失败则自旋保证成功。成功则直接退出循环
    7. 如果f的hashcode == MOVED == -1,则需要进行扩容。
    8. 使用 synchronized 锁对应位置的元素,同JDK1.8 HaspMap链表/红黑树插入。
    9. 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

    get

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // key 所在的 hash 位置
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 如果指定位置元素存在,头结点 key 相同
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    // key hash 值相等,key值相同,直接返回元素 value
                    return e.val;
            }
            else if (eh < 0)
                // 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                // 是链表,遍历查找
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    

    因为 table数组 和 Node.next 都使用了 volatile,保证了可见性,所以 get() 未加锁。

    1. 根据 hash 值计算位置。
    2. 找到对应位置的元素,如果头节点就是要找的,直接返回 value.
    3. 如果是链表,按照链表的方式查找。
    4. 如果是红黑树,按照红黑树的方式查找。

    总结

    1.7和1.8的 ConcurrentHashMap差别还是挺大的。主要是数据结构上的变化。从最开始的锁全部数据的HashTable,再到1.7的锁一段的segment,再到1.8的只锁当前数据所在的桶。锁的数据范围越来越细,这也是很多问题优化的方向:分而治之。

    后记

    大部分都在分析源码,希望大家能看到不枯燥,作为有时一句话都写不通顺的人,又要分析原理,又要讲的通俗,又要写的不枯燥。我觉得我现在还在尽量先分析原理的程度,不过我会继续努力的。也希望自己能坚持。加油!

    量变引发质变,经常进步一点点,期待蜕变的自己。

    相关文章

      网友评论

          本文标题:剖析Java 集合框架(四)-ConcurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/pxtgiktx.html