concurrentHashMap 1.8原理解析

作者: 后来丶_a24d | 来源:发表于2020-04-08 13:45 被阅读0次

    目录

    目录.png

    源码解析

    • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
    • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了

    是否允许空, 是否允许重复数据, 是否有序, 是否线程安全

    关 注 点 结论
    是否允许空 put时key, value不允许为空
    是否允许重复数据 重复时会更新数据
    是否有序 无序
    是否线程安全 线程安全

    value的Comparator 排序比较

    public static void main(String[] args) {
        Map<String, String> map = new ConcurrentHashMap<>();
        map.put("a", "dd");
        map.put("c", "bb");
        map.put("d", "aa");
        map.put("b", "cc");
    
        List<Map.Entry<String, String>> list = new ArrayList<>(map.entrySet());
    
        Collections.sort(list, (Map.Entry<String, String> o1,
                                    Map.Entry<String, String> o2) -> o1.getValue().compareTo(o2.getValue()));
    
        list.stream().forEach((mapping) -> System.out.println(mapping.getKey() + ":" + mapping.getValue()));
    
    }
    

    初始化

    • 调用默认构造函数时创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。
    • 调用指定大小的构造方法会调用tableSizeFor指定大小Node数组大小,最后会是大于c的最小的2的幂。默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。
    // 要求获得的是大于c的最小的2的幂 也就是获取某个位为1并且左边右边都是1
    private static final int tableSizeFor(int c) { 
        // 那11举例子
        int n = c - 1;
        // 1010 |= 0101 此时为1111已经符合要求了
        // 1111 |= 0011 还是1111
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        // 此时n值为左边都是0 右边都是1 +1之后就变成 10000...多个0就是2的n次幂
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
    
    • 契机,第一次put的时候会进行初始化
    /**
        * 初始化数组table,sizeCtl可以在初始化数组时指定
        * 如果volatile sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
        * 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
        * 否则的话初始化一个默认大小(16)的数组
        * 然后设置sizeCtl的值为数组长度的3/4
    */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {   
            // sizeCtl小于0,则进行线程让步等待
            if ((sc = sizeCtl) < 0)                          
                Thread.yield(); 
            // cas比较sizeCtl的值与sc是否相等,相等则用-1替换
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                try {
                    // 双重检测机制,类似单例模式的写法
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;        /
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //初始化后,sizeCtl长度为数组长度的3/4
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;          
                }
                break;
            }
        }
        return tab;
    }
    

    get

    • get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。数组用volatile修饰(保证指向的地址改变了,其他线程能即使获取到)主要是保证在数组扩容的时候保证可见性。
    /*
     * 相比put方法,get就很单纯了,支持并发操作,
     * 当key为null的时候回抛出NullPointerException的异常
     * get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置
     * 然后遍历该位置的所有节点
     * 如果不存在的话返回null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 计算key的hash值
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
            if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等
                    // 返回值
                    return e.val;
            }
            else if (eh < 0) // 结点hash值小于0, 可能在扩容
                // 在桶(链表/红黑树)中查找
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 对于结点hash值大于0的情况
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    

    put

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 键或值为空,抛出异常
        if (key == null || value == null) throw new NullPointerException(); 
        // 键的hash值经过计算获得hash值
        int hash = spread(key.hashCode());
        // 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
        int binCount = 0;
        // 无限循环, 如果多个线程走初始化流程,一个线程初始化完之后,会走第二个for循环。初始化线程初始化完后走第二个for循环
        for (Node<K,V>[] tab = table;;) { 
            Node<K,V> f; int n, i, fh;
            // 表为空或者表的长度为0
            if (tab == null || (n = tab.length) == 0) 
                // 初始化表
                tab = initTable();
            // 表不为空并且表的长度大于0,并且该桶为空 (n - 1) & hash是size要为2 n次方原因,tabAt是获取volatile的table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 
                // 比较并且交换值CAS,如tab的第i项为空则用新生成的node替换
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null))) 
                    break;                   
            }
            // 该结点的hash值为MOVED
            else if ((fh = f.hash) == MOVED) 
                // 帮助结点的扩容(在扩容的过程中)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 加锁同步
                synchronized (f) { 
                    // 找到table表下标为i的节点
                    if (tabAt(tab, i) == f) { 
                        // 该table表中该结点的hash值大于0
                        if (fh >= 0) { 
                            // binCount赋值为1
                            binCount = 1;
                            // 无限循环
                            for (Node<K,V> e = f;; ++binCount) { 
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等
                                    // 保存该结点的val值
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) // 进行判断
                                        // 将指定的value保存至结点,即进行了结点值的更新
                                        e.val = value;
                                    break;
                                }
                                // 保存当前结点
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
                                    // 新生一个结点并且赋值给next域
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    // 退出循环
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 结点为红黑树结点类型
                            Node<K,V> p;
                            // binCount赋值为2
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) { // 将hash、key、value放入红黑树
                                // 保存结点的val
                                oldVal = p.val;
                                if (!onlyIfAbsent) // 判断
                                    // 赋值结点value值
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { // binCount不为0
                    if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值
                        // 进行转化
                        treeifyBin(tab, i);
                    if (oldVal != null) // 旧值不为空
                        // 返回旧值
                        return oldVal;
                    break;
                }
            }
        }
        // 增加binCount的数量
        addCount(1L, binCount);
        return null;
    }
    
    • 总结步骤
    1. 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②
    2. 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③
    3. 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤4. 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤
    4. 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥
    5. 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。

    put时helperTransfer

    • 此函数用于在扩容时将table表中的结点转移到nextTable中。
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // table表不为空并且结点类型是ForwardingNode类型,并且结点的nextTable不为空
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { 
            int rs = resizeStamp(tab.length);
            // 条件判断
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) { 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0) // 
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { 
                    // 将table的结点转移到nextTab中
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    

    扩容

    • 扩容涉及的点比较多,所以这里分多个步骤说明扩容的原理
    扩容时相关参数
    • nextTable: 扩容期间,将table数组中的元素 迁移到 nextTable, 每次扩容2倍

    • sizeCtl含义


      sizeCtl含义.png
    • transferIndex: 扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地获取迁移任务 。transferIndex是通过cas设置的

     //cas无锁算法设置 transferIndex = transferIndex - stride
    if (U.compareAndSwapInt(this, TRANSFERINDEX, 
        nextIndex,nextBound = (nextIndex > stride 
        ? nextIndex - stride : 0))) {
    }
    
    扩容前在数组最右边.png
    扩容时,不同线程负责不同地方迁移.png
    • ForwardingNode节点: 标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕。关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据。
    扩容时机
    • 当前容量超过阈值0.75n(n 为 length)
    • 当链表中元素个数超过默认设定(8个),当数组的大小还未超过64的时候,此时进行数组的扩容,如果超过则将链表转化成红黑树
    • 当发现其他线程扩容时,帮其扩容
    扩容原理分析(以长度为32分析)
    • 线程执行put操作,发现容量已经达到扩容阈值,需要进行扩容操作,此时transferindex=tab.length=32


      tab.length=32.png
    • 扩容线程A 以cas的方式修改transferindex=32-16=16 ,然后按照降序迁移table[31]--table[16]这个区间的hash桶


      transferindex=31-16=16.png
    • 迁移hash桶时,会将桶内的链表或者红黑树,按照一定算法,拆分成2份,将其插入nextTable[i]和nextTable[i+n](n是table数组的长度)。 迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其他线程,此节点已经迁移完毕

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
       ...//省略无关代码 同步方法迁移
        synchronized (f) {
            //将node链表,分成2个新的node链表
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                   ln = new Node<K,V>(ph, pk, pv, ln);
                else
                   hn = new Node<K,V>(ph, pk, pv, hn);
            }
            //将新node链表赋给nextTab
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
        }
        ...//省略无关代码
    }
    
    • 此时线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。

    • 线程2加入扩容,线程2会自旋等待线程1处理完毕之前的桶,负责15-0的桶迁移


      线程2加入扩容.png
    • 如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回。

    1. 发现transferIndex=0,即所有node均已分配
    2. 发现扩容线程已经达到最大扩容线程数
    tranfer源码解析
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //计算需要迁移多少个hash桶(MIN_TRANSFER_STRIDE该值作为下限,以避免扩容线程过多)
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
       
        if (nextTab == null) {            // initiating
            try {
                //扩容一倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
      
        //1 逆序迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶
        //2 如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法
        //3 如果是helperTransfer,则自旋等待上一个线程处理完毕
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            
            //更新待迁移的hash桶索引
            while (advance) {
                int nextIndex, nextBound;
                //更新迁移索引i。
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    //transferIndex<=0表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出
                    i = -1;
                    advance = false;
                }
                //当迁移完bound这个桶后,尝试更新transferIndex,,获取下一批待迁移的hash桶
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //退出transfer
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    //最后一个迁移的线程,recheck后,做收尾工作,然后退出
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    /**
                     第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                     后续帮其扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1
                     每一个退出transfer的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
                     那么最后一个线程退出时:
                     必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                    */
                    
                    //不相等,说明不到最后一个线程,直接退出transfer方法
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    //最后退出的线程要重新check下是否全部迁移完毕
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //迁移node节点
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //链表迁移
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //将node链表,分成2个新的node链表
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //将新node链表赋给nextTab
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //红黑树迁移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    
    • 多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。

    获取size

    • 在 JDK1.7 中,第一种方案他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的。 第二种方案是如果第一种方案不符合,他就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回。
    • jdk 1.8中size, mappingCount都是计算size,但是mappingCount返回long计算的限制更小一些,不怕超过int的长度, 但是核心都是sumCount函数的调用
    • 看下sumCount代码
    /**
    * 代码比较简单,就是在baseCount的基础上,循环counterCells累加
    * 在调用addCount时,如果 counterCells == null, 则对 baseCount 做 CAS 自增操作
    * 如果并发导致 baseCount CAS 失败了使用 counterCells
    * 如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功
    **/
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
           for (int i = 0; i < as.length; ++i) {
               if ((a = as[i]) != null)
                   sum += a.value;
               }
           }
        return sum;
    }
    
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
    

    参考文章

    相关文章

      网友评论

        本文标题:concurrentHashMap 1.8原理解析

        本文链接:https://www.haomeiwen.com/subject/cshtmhtx.html