jdk1.8 ConcurrentHashMap代码浅析

作者: millions_chan | 来源:发表于2017-03-19 15:42 被阅读438次

在日常开发中,我们经常会使用HashMap,然而HashMap不是线程安全的,在多线程公用一个Map的情况下,ConcurrentHashMap通常是一个更好的选择。

ConcurrentHashMap采用下面的结构来存储数据,并使用CAS+Synchronized来保证并发更新的安全。当几个Node的hash值相同时,限将其存储为一个链表,当链表上的元素个数大于某个数时,就将其转化为一颗红黑树。

ConcurrentHashMap存储结构

成员变量


//承载数据的数据,大小为2^n
transient volatile Node<K,V>[] table;

//扩容时使用的
private transient volatile Node<K,V>[] nextTable;

//为-1说明由线程在进行初始化操作;
//为-n(n>1)  使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重叠,  当有n个线程正在执行扩容时,sizeCtl在值变为 (基数 + n)  
//否则,在未初始化时记录table的大小,在初始化后记录hash表的容量(n - n >>>2 n为table大小,此时该值为0.75n)
private transient volatile int sizeCtl;

上面的代码中,Node为存储数据的节点,将Node的hash值设置为MOVED 、TREEBIN 、RESERVED 可以表示不同的含义 :

    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }                                                   ...
    }

   //红黑树节点
   static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
                                  ...
   }

    //特殊的Node Hash值为MOVED,说明在扩容,存储nextTable引用
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
     }

Hash


既然是一个HashMap,如何做Hash就是一个问题。ConcurrentHashMap 处理的一些思路非常巧妙。

首先是table的大小,会在传入的size的基础上进行处理,变成2^k。

    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

上面的代码先用一系列的移位和|操作把n变成二进制表示为0...01...1的形式,然后加1,就会变成2^k了。这样做有什么好处呢?我们继续看。

//计算节点的位置 h 是 key的hash n 是table的大小
int index = (n - 1) & h;

上面我们知道了table的大小n是2^k,那么n-1就会是0..1....1的形式,并且小于n,通过与h按位与就可以得到对应的index了。而在扩容时,ConcurrentHashMap会将table的大小变为原来的2倍,这样,计算index时就变成了(2*n-1)&h,那么n-1为0011的话,2*n-1就是0111,这样扩容后的index要么等于扩容前的index,否则就等于index+n,减小了扩容时的计算量。

table初始化


table会延迟到第一次put时初始化,同过使用循环+CAS的套路,可以保证一次只有一个线程会初始化table。在table为空的时候如果sizeCtl小于0,则说明已经有线程开始初始化了,其它线程通过Thread.yield()让出CPU时间片,等待table非空即可。否则使用CAS将sizeCtl的值换为-1,置换成功则初始化table。注意table的大小为sizeCtl,初始化后将sizeCtl的值设为n - (n >>> 2)即0.75n,这个值用来确定是否需要为table扩容。这种初始化的方式值得我们借鉴。

   private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

put操作

初始化后,使用tabAt函数通过Unsafe的getObjectVolatile方法获取index位置的值,如果为空,就进行cas设置table[index]的值,设置成功说明put完成,返回即可。不成功则重新进入for循环,进行其他逻辑。

    for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
             if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
                  ...
    }

如果发生了table[index]非空,则可能是:1.发生了碰撞;2. 该key之前已经存在。这个时候需要对table[index]加锁。然后再检查一次index位置的node是否没有变化。这里的fh是节点f的hash值。还记得之前说过的红黑树首节点hash为TREEBIN等于-2么,这时就能区分到底是一个红黑树还是链表了,而后就可以进行链表\红黑树的插入or更新操作了。在操作成功后,如果节点对应的数据结构是一个链表且长度超过TREEIFY_THRESHOLD的话将会把该链表更改为一个红黑树。

V oldVal = null;
synchronized (f) {
    if ( tabAt( tab, i ) == f )
    {
        if ( fh >= 0 )
        {
            binCount = 1;
            for ( Node<K, V> e = f;; ++binCount )
            {
                K ek;
                if ( e.hash == hash &&
                     ( (ek = e.key) == key ||
                       (ek != null && key.equals( ek ) ) ) )
                {
                    oldVal = e.val;
                    if ( !onlyIfAbsent )
                        e.val = value;
                    break;
                }
                Node<K, V> pred = e;
                if ( (e = e.next) == null )
                {
                    pred.next = new Node<K, V>( hash, key,
                                    value, null );
                    break;
                }
            }
        }else if ( f instanceof TreeBin )
        {
            Node<K, V> p;
            binCount = 2;
            if ( (p = ( (TreeBin<K, V>)f).putTreeVal( hash, key,
                                  value ) ) != null )
            {
                oldVal = p.val;
                if ( !onlyIfAbsent )
                    p.val = value;
            }
        }
    }
}
if ( binCount != 0 )
{
        //如果超过这个值,就把tab的i位置更改为红黑树哦
    if ( binCount >= TREEIFY_THRESHOLD )
        treeifyBin( tab, i );
    if ( oldVal != null )
        return(oldVal);
    break;
}

最后一种情况就是节点的hash值为MOVED,-1了。这说明该节点是一个ForwardingNode,当前table正在向newTable迁移中,所以当前线程过去帮助迁移就好。

     else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);

ConcurrentHashMap的扩容

老实说,扩容这块的代码比较复杂,很多细节我还没有弄清楚,只能大概讲一讲我自己的理解,权当抛砖引玉,希望以后了解的更加深入以后再来补全。

上面我们提到,扩容时我们会将sizeCtl设置为一个与扩容前table大小n有关的数。这里的计算方式就是:

int rs = resizeStamp(n);
sizeCtl= (rs << RESIZE_STAMP_SHIFT) + 2;

static final int resizeStamp(int n) {
       return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

这里可以看出,这个值肯定是一个负数,因为根据RESIZE_STAMP_SHIFT和RESIZE_STAMP_BITS的关系我们知道他的最高位肯定是1,再按位或Integer.numberOfLeadingZeros(n)的值,从直觉看,应当是一个绝对值很大的负数。具体可以带入不同的n值计算一下:

n                  sizeCtl
16               -2145714174
32               -2145779710
64               -2145845246
128             -2145910782
256             -2145976318
512             -2146041854
1024               -2146107390
2048               -2146172926
4096               -2146238462
8192               -2146303998
16384             -2146369534

这里可以看出jdk1.8的代码里对sizeCtl的注释是有问题的。理解了rs是干什么的,以及rs和扩容时的sizeCtl的关系,我们再来分析下面的代码:

  1. 当sc小于0时,说明已经开始了扩容,如果当前线程需要参与扩容,则通过CAS将sizeCtl的值加1
  2. sc大于0,说明还没开始扩容,则按照我们上面讲的方法计算出sizeCtl的值,初始化之,开始扩容
     if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }

扩容操作的中,如果nextTable为空,则初始化之,大小为之前table的2倍:

        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }

计算一次扩容迁移的slot数目:

      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range

计算扩容开始和结束的index:

  for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            ...
}

如果当前index为空则将其设置为一个FowardingNode,如果节点的hash为-1则说明已经处理过了,直接跳过就好

            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

接下来就是重头戏了:如何迁移一个链表或红黑树节点:

synchronized (f) {
    if ( tabAt( tab, i ) == f )
    {
        Node<K, V> ln, hn;
        if ( fh >= 0 )
        {
            int     runBit  = fh & n;
            Node<K, V>  lastRun = f;
            for ( Node<K, V> p = f.next; p != null; p = p.next )
            {
                int b = p.hash & n;
                if ( b != runBit )
                {
                    runBit  = b;
                    lastRun = p;
                }
            }
            if ( runBit == 0 )
            {
                ln  = lastRun;
                hn  = null;
            }else  {
                hn  = lastRun;
                ln  = null;
            }
            for ( Node<K, V> p = f; p != lastRun; p = p.next )
            {
                int ph = p.hash; K pk = p.key; V pv = p.val;
                if ( (ph & n) == 0 )
                    ln = new Node<K, V>( ph, pk, pv, ln );
                else
                    hn = new Node<K, V>( ph, pk, pv, hn );
            }
            setTabAt( nextTab, i, ln );
            setTabAt( nextTab, i + n, hn );
            setTabAt( tab, i, fwd );
            advance = true;
        }else if ( f instanceof TreeBin )
        {
            TreeBin<K, V>   t   = (TreeBin<K, V>)f;
            TreeNode<K, V>  lo  = null, loTail = null;
            TreeNode<K, V>  hi  = null, hiTail = null;
            int     lc  = 0, hc = 0;
            for ( Node<K, V> e = t.first; e != null; e = e.next )
            {
                int     h   = e.hash;
                TreeNode<K, V>  p   = new TreeNode<K, V>
                                  ( h, e.key, e.val, null, null );
                if ( (h & n) == 0 )
                {
                    if ( (p.prev = loTail) == null )
                        lo = p;
                    else
                        loTail.next = p;
                    loTail = p;
                    ++lc;
                }else  {
                    if ( (p.prev = hiTail) == null )
                        hi = p;
                    else
                        hiTail.next = p;
                    hiTail = p;
                    ++hc;
                }
            }
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify( lo ) :
                 (hc != 0) ? new TreeBin<K, V>( lo ) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify( hi ) :
                 (lc != 0) ? new TreeBin<K, V>( hi ) : t;
            setTabAt( nextTab, i, ln );
            setTabAt( nextTab, i + n, hn );
            setTabAt( tab, i, fwd );
            advance = true;
        }
    }
}
  1. 和put操作一样,先判断是红黑树还是链表。
  2. 根据前面的分析,由于扩容前table的大小为n=2^k,且扩容后的n2 = n * 2,所以节点在nextTable上的位置要么和在table上的位置一样,要么是在table上的位置加上n(比如hash&(n-1)为2,那么hash&(2*n-1)要么是2,要么是n+2),并且这个值只与n的非0bit的位置有关,所以使用fh & n可以将链表或红黑树上的的节点分为2类,一类是index没有改变的,一类是index变成了index+n的。
  3. 如果是链表,则生成2个新链表,放到对应的位置
  4. 如果是红黑树,则根据节点数目生成2个红黑树或链表(如果节点数目比较少则生成链表)
  5. 完成迁移后在原位置放一个FowardingNode标记已经处理成功

最后,如果已经完成全部迁移,则更新sizeCtl的值,用nextTable替换table。如果只是当前线程完成了自己的迁移任务,则修改sizeCtl的值。这里需要注意的是这一句:(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 判断是否所有线程都完成了自己的操作,是的话就说明整个迁移完成了,将finishing设置为true。

              if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }

size

ConcurrentHashMap还有一点需要注意的是其size方法其实是调用了下面的sumCount方法:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

这个函数返回的数据只是对所有CounterCell的数据的收集,在收集过程中很可能其他线程进行了增删等操作,所以是不准确的。想准确获取结果的话可能需要使用LongAdder或AtomicLong之类的对象在插入或删除对象时进行计数来实现了。

总结

Doug Lea 大神的代码思路很飘逸,各种CAS也是出神入化,尤其是扩容时把工作分开,多处同时执行的思路更是让人觉得代码居然还可以这么写。本文还留了一个坑,那就是为什么相同hash的节点超过一定数目后需要使用红黑树存储,红黑树有什么优势,它的实现原理是什么?鉴于本文篇幅已经很长了,我将会在下一篇文章中对红黑树进行解释。

相关文章

网友评论

    本文标题:jdk1.8 ConcurrentHashMap代码浅析

    本文链接:https://www.haomeiwen.com/subject/adjmnttx.html