jdk1.8 ConcurrentHashMap代码浅析

作者: millions_chan | 来源:发表于2017-03-19 15:42 被阅读438次

    在日常开发中,我们经常会使用HashMap,然而HashMap不是线程安全的,在多线程公用一个Map的情况下,ConcurrentHashMap通常是一个更好的选择。

    ConcurrentHashMap采用下面的结构来存储数据,并使用CAS+Synchronized来保证并发更新的安全。当几个Node的hash值相同时,限将其存储为一个链表,当链表上的元素个数大于某个数时,就将其转化为一颗红黑树。

    ConcurrentHashMap存储结构

    成员变量


    //承载数据的数据,大小为2^n
    transient volatile Node<K,V>[] table;
    
    //扩容时使用的
    private transient volatile Node<K,V>[] nextTable;
    
    //为-1说明由线程在进行初始化操作;
    //为-n(n>1)  使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重叠,  当有n个线程正在执行扩容时,sizeCtl在值变为 (基数 + n)  
    //否则,在未初始化时记录table的大小,在初始化后记录hash表的容量(n - n >>>2 n为table大小,此时该值为0.75n)
    private transient volatile int sizeCtl;
    

    上面的代码中,Node为存储数据的节点,将Node的hash值设置为MOVED 、TREEBIN 、RESERVED 可以表示不同的含义 :

        static final int MOVED     = -1; // hash for forwarding nodes
        static final int TREEBIN   = -2; // hash for roots of trees
        static final int RESERVED  = -3; // hash for transient reservations
    
        static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;
    
            Node(int hash, K key, V val, Node<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.val = val;
                this.next = next;
            }                                                   ...
        }
    
       //红黑树节点
       static final class TreeNode<K,V> extends Node<K,V> {
            TreeNode<K,V> parent;  // red-black tree links
            TreeNode<K,V> left;
            TreeNode<K,V> right;
            TreeNode<K,V> prev;    // needed to unlink next upon deletion
            boolean red;
    
            TreeNode(int hash, K key, V val, Node<K,V> next,
                     TreeNode<K,V> parent) {
                super(hash, key, val, next);
                this.parent = parent;
            }
                                      ...
       }
    
        //特殊的Node Hash值为MOVED,说明在扩容,存储nextTable引用
        static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
         }
    

    Hash


    既然是一个HashMap,如何做Hash就是一个问题。ConcurrentHashMap 处理的一些思路非常巧妙。

    首先是table的大小,会在传入的size的基础上进行处理,变成2^k。

        private static final int tableSizeFor(int c) {
            int n = c - 1;
            n |= n >>> 1;
            n |= n >>> 2;
            n |= n >>> 4;
            n |= n >>> 8;
            n |= n >>> 16;
            return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
        }
    

    上面的代码先用一系列的移位和|操作把n变成二进制表示为0...01...1的形式,然后加1,就会变成2^k了。这样做有什么好处呢?我们继续看。

    //计算节点的位置 h 是 key的hash n 是table的大小
    int index = (n - 1) & h;
    

    上面我们知道了table的大小n是2^k,那么n-1就会是0..1....1的形式,并且小于n,通过与h按位与就可以得到对应的index了。而在扩容时,ConcurrentHashMap会将table的大小变为原来的2倍,这样,计算index时就变成了(2*n-1)&h,那么n-1为0011的话,2*n-1就是0111,这样扩容后的index要么等于扩容前的index,否则就等于index+n,减小了扩容时的计算量。

    table初始化


    table会延迟到第一次put时初始化,同过使用循环+CAS的套路,可以保证一次只有一个线程会初始化table。在table为空的时候如果sizeCtl小于0,则说明已经有线程开始初始化了,其它线程通过Thread.yield()让出CPU时间片,等待table非空即可。否则使用CAS将sizeCtl的值换为-1,置换成功则初始化table。注意table的大小为sizeCtl,初始化后将sizeCtl的值设为n - (n >>> 2)即0.75n,这个值用来确定是否需要为table扩容。这种初始化的方式值得我们借鉴。

       private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    put操作

    初始化后,使用tabAt函数通过Unsafe的getObjectVolatile方法获取index位置的值,如果为空,就进行cas设置table[index]的值,设置成功说明put完成,返回即可。不成功则重新进入for循环,进行其他逻辑。

        for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                 if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                      ...
        }
    

    如果发生了table[index]非空,则可能是:1.发生了碰撞;2. 该key之前已经存在。这个时候需要对table[index]加锁。然后再检查一次index位置的node是否没有变化。这里的fh是节点f的hash值。还记得之前说过的红黑树首节点hash为TREEBIN等于-2么,这时就能区分到底是一个红黑树还是链表了,而后就可以进行链表\红黑树的插入or更新操作了。在操作成功后,如果节点对应的数据结构是一个链表且长度超过TREEIFY_THRESHOLD的话将会把该链表更改为一个红黑树。

    V oldVal = null;
    synchronized (f) {
        if ( tabAt( tab, i ) == f )
        {
            if ( fh >= 0 )
            {
                binCount = 1;
                for ( Node<K, V> e = f;; ++binCount )
                {
                    K ek;
                    if ( e.hash == hash &&
                         ( (ek = e.key) == key ||
                           (ek != null && key.equals( ek ) ) ) )
                    {
                        oldVal = e.val;
                        if ( !onlyIfAbsent )
                            e.val = value;
                        break;
                    }
                    Node<K, V> pred = e;
                    if ( (e = e.next) == null )
                    {
                        pred.next = new Node<K, V>( hash, key,
                                        value, null );
                        break;
                    }
                }
            }else if ( f instanceof TreeBin )
            {
                Node<K, V> p;
                binCount = 2;
                if ( (p = ( (TreeBin<K, V>)f).putTreeVal( hash, key,
                                      value ) ) != null )
                {
                    oldVal = p.val;
                    if ( !onlyIfAbsent )
                        p.val = value;
                }
            }
        }
    }
    if ( binCount != 0 )
    {
            //如果超过这个值,就把tab的i位置更改为红黑树哦
        if ( binCount >= TREEIFY_THRESHOLD )
            treeifyBin( tab, i );
        if ( oldVal != null )
            return(oldVal);
        break;
    }
    

    最后一种情况就是节点的hash值为MOVED,-1了。这说明该节点是一个ForwardingNode,当前table正在向newTable迁移中,所以当前线程过去帮助迁移就好。

         else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
    

    ConcurrentHashMap的扩容

    老实说,扩容这块的代码比较复杂,很多细节我还没有弄清楚,只能大概讲一讲我自己的理解,权当抛砖引玉,希望以后了解的更加深入以后再来补全。

    上面我们提到,扩容时我们会将sizeCtl设置为一个与扩容前table大小n有关的数。这里的计算方式就是:

    int rs = resizeStamp(n);
    sizeCtl= (rs << RESIZE_STAMP_SHIFT) + 2;
    
    static final int resizeStamp(int n) {
           return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
    
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    

    这里可以看出,这个值肯定是一个负数,因为根据RESIZE_STAMP_SHIFT和RESIZE_STAMP_BITS的关系我们知道他的最高位肯定是1,再按位或Integer.numberOfLeadingZeros(n)的值,从直觉看,应当是一个绝对值很大的负数。具体可以带入不同的n值计算一下:

    n                  sizeCtl
    16               -2145714174
    32               -2145779710
    64               -2145845246
    128             -2145910782
    256             -2145976318
    512             -2146041854
    1024               -2146107390
    2048               -2146172926
    4096               -2146238462
    8192               -2146303998
    16384             -2146369534
    

    这里可以看出jdk1.8的代码里对sizeCtl的注释是有问题的。理解了rs是干什么的,以及rs和扩容时的sizeCtl的关系,我们再来分析下面的代码:

    1. 当sc小于0时,说明已经开始了扩容,如果当前线程需要参与扩容,则通过CAS将sizeCtl的值加1
    2. sc大于0,说明还没开始扩容,则按照我们上面讲的方法计算出sizeCtl的值,初始化之,开始扩容
         if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
    

    扩容操作的中,如果nextTable为空,则初始化之,大小为之前table的2倍:

            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;
            }
    

    计算一次扩容迁移的slot数目:

          if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
    

    计算扩容开始和结束的index:

      for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                ...
    }
    

    如果当前index为空则将其设置为一个FowardingNode,如果节点的hash为-1则说明已经处理过了,直接跳过就好

                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
    

    接下来就是重头戏了:如何迁移一个链表或红黑树节点:

    synchronized (f) {
        if ( tabAt( tab, i ) == f )
        {
            Node<K, V> ln, hn;
            if ( fh >= 0 )
            {
                int     runBit  = fh & n;
                Node<K, V>  lastRun = f;
                for ( Node<K, V> p = f.next; p != null; p = p.next )
                {
                    int b = p.hash & n;
                    if ( b != runBit )
                    {
                        runBit  = b;
                        lastRun = p;
                    }
                }
                if ( runBit == 0 )
                {
                    ln  = lastRun;
                    hn  = null;
                }else  {
                    hn  = lastRun;
                    ln  = null;
                }
                for ( Node<K, V> p = f; p != lastRun; p = p.next )
                {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    if ( (ph & n) == 0 )
                        ln = new Node<K, V>( ph, pk, pv, ln );
                    else
                        hn = new Node<K, V>( ph, pk, pv, hn );
                }
                setTabAt( nextTab, i, ln );
                setTabAt( nextTab, i + n, hn );
                setTabAt( tab, i, fwd );
                advance = true;
            }else if ( f instanceof TreeBin )
            {
                TreeBin<K, V>   t   = (TreeBin<K, V>)f;
                TreeNode<K, V>  lo  = null, loTail = null;
                TreeNode<K, V>  hi  = null, hiTail = null;
                int     lc  = 0, hc = 0;
                for ( Node<K, V> e = t.first; e != null; e = e.next )
                {
                    int     h   = e.hash;
                    TreeNode<K, V>  p   = new TreeNode<K, V>
                                      ( h, e.key, e.val, null, null );
                    if ( (h & n) == 0 )
                    {
                        if ( (p.prev = loTail) == null )
                            lo = p;
                        else
                            loTail.next = p;
                        loTail = p;
                        ++lc;
                    }else  {
                        if ( (p.prev = hiTail) == null )
                            hi = p;
                        else
                            hiTail.next = p;
                        hiTail = p;
                        ++hc;
                    }
                }
                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify( lo ) :
                     (hc != 0) ? new TreeBin<K, V>( lo ) : t;
                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify( hi ) :
                     (lc != 0) ? new TreeBin<K, V>( hi ) : t;
                setTabAt( nextTab, i, ln );
                setTabAt( nextTab, i + n, hn );
                setTabAt( tab, i, fwd );
                advance = true;
            }
        }
    }
    
    1. 和put操作一样,先判断是红黑树还是链表。
    2. 根据前面的分析,由于扩容前table的大小为n=2^k,且扩容后的n2 = n * 2,所以节点在nextTable上的位置要么和在table上的位置一样,要么是在table上的位置加上n(比如hash&(n-1)为2,那么hash&(2*n-1)要么是2,要么是n+2),并且这个值只与n的非0bit的位置有关,所以使用fh & n可以将链表或红黑树上的的节点分为2类,一类是index没有改变的,一类是index变成了index+n的。
    3. 如果是链表,则生成2个新链表,放到对应的位置
    4. 如果是红黑树,则根据节点数目生成2个红黑树或链表(如果节点数目比较少则生成链表)
    5. 完成迁移后在原位置放一个FowardingNode标记已经处理成功

    最后,如果已经完成全部迁移,则更新sizeCtl的值,用nextTable替换table。如果只是当前线程完成了自己的迁移任务,则修改sizeCtl的值。这里需要注意的是这一句:(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 判断是否所有线程都完成了自己的操作,是的话就说明整个迁移完成了,将finishing设置为true。

                  if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
    

    size

    ConcurrentHashMap还有一点需要注意的是其size方法其实是调用了下面的sumCount方法:

        final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    这个函数返回的数据只是对所有CounterCell的数据的收集,在收集过程中很可能其他线程进行了增删等操作,所以是不准确的。想准确获取结果的话可能需要使用LongAdder或AtomicLong之类的对象在插入或删除对象时进行计数来实现了。

    总结

    Doug Lea 大神的代码思路很飘逸,各种CAS也是出神入化,尤其是扩容时把工作分开,多处同时执行的思路更是让人觉得代码居然还可以这么写。本文还留了一个坑,那就是为什么相同hash的节点超过一定数目后需要使用红黑树存储,红黑树有什么优势,它的实现原理是什么?鉴于本文篇幅已经很长了,我将会在下一篇文章中对红黑树进行解释。

    相关文章

      网友评论

        本文标题:jdk1.8 ConcurrentHashMap代码浅析

        本文链接:https://www.haomeiwen.com/subject/adjmnttx.html