美文网首页Android开发
ConcurrentHashMap源码分析(JDK1.8)——扩

ConcurrentHashMap源码分析(JDK1.8)——扩

作者: 低情商的大仙 | 来源:发表于2019-03-04 16:19 被阅读5次

    前言

    在分析ConcurrentHashMap之前,希望大家先读完HashMap的源码,因为ConcurrentHashMap基本算法和HashMap是一致的,只是增加了并发控制而已,有了HashMap的基础才能更好的理解ConcurrentHashMap,推荐大家先看看这两篇文章:
    HashMap的hash机制详解
    HashMap源码分析

    1. 重要成员

    /**
         * 初始化和扩容标志,也是并发控制非常重要的一员,当sizeCtl<0时
         * 表明当前正在初始化或扩容,sizeCtl=-1正在初始化,sizeCtl<-1说明正在扩容
         * 而且此时sizeCtl = -(1+正在扩容的线程数量).  
         * 当还未进行初始化时sizeCtl为初始化容量大小,默认16,
         */
        private transient volatile int sizeCtl;
    
        /**
         * 扩容时使用
         */
        private transient volatile int transferIndex;
    
    
        /**
        *真正存储数据的数组
        */
        transient volatile Node<K,V>[] table;
    

    这里要注意,此处没有了JDK1.7中的分段锁的概念了,全部都是基于CAS的。

    2. 并发基础——CAS

    整个ConcurrentHashMap完全没有方法级别的锁,到底是什么机制来保证并发的呢?这里简单的介绍下。
    首先大家对乐观锁和悲观锁要有个大致理解:

    • 悲观锁 :悲观锁认为竞争一定会发生,所以不管如何都会锁住资源,不允许其他线程进入,sychorinized关键字就是标准的悲观锁
    • 乐观锁: 所谓的乐观锁就是认为竞争不一定会发生,比如有个变量A=3,我希望将它变成A=4,那么可以先比较 如果A=3,那么说明没有其他线程竞争修改这个变量,我可以直接设置A=4, 这个比较和设置过程在硬件上是原子级别的,如果比较时发现A!=3,说明有其他线程修改了,这个情况会被返回,调用者可以针对这个情况特殊处理。
      我们看下ConcurrentHashMap是如何将一个Node节点放到数组table的一个位置上的:
        static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                            Node<K,V> c, Node<K,V> v) {
            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
        }
    

    这里重点关注传入的c和v,整个compareAndSwapObject就是先比较tab数组上某个位置(通过内存偏移量算出来的) 上的节点是不是 c,如果是就认为没有竞争,直接将该位置设置为 v,否则返回false。一般都是通过一个死循环来调用这个方法的,比如:

    for (Node<K,V>[] tab = table;;) {  
           if (casTabAt(tab, i, null, r)) {
                 //修改成功,会继续执行其他业务
                break;            
           }
           //修改失败,会死循环下次重试
    }
    

    这种机制就是保证ConcurrentHashMap高效并发的基础了。

    由于ConcurrentHashMap处理hash冲突以及hash算法都是一样的,所以这里一些基本功能不再分析,我们重点分析一些由于并发导致的和HashMap区别较大的方法。

    3. 扩容

    ConcurrentHashMap的扩容是支持多线程并发扩容的,所以扩容效率很高,在看源码之前,我们先大致看下并发扩容的思想,扩容的核心就在于将旧的table数组中的数据迁移到新的数组中来。我们先看张图:


    并发扩容.png

    之所以能并发扩容就在于这里,将现有的数据分成了几部分,每个线程领一个自己的部分 ,线程领到了自己的部分后如何复制到新的数组的呢?


    单个线程复制过程

    对于扩容的单个线程来说,每次复制都是从尾部开始,一个节点复制完毕后会在这个位置放置一个ForwardingNode节点,表明这个节点已经处理过了。

    有了上述基础我们再结合代码分析.

    3.1 确定每个线程扩容时负责的数组部分长度

            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
    

    这里主要是根据cpu的数量来计算的,但是如果算出来小于16的话,stride=16,也就是说一个线程处理的数量最少是16.

    3.2 申请新空间

    扩容第一步就是申请新的table数组,这个和HashMap一样,都是直接两倍扩容:

           if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;
            }
    

    3.2 遍历自己负责的所有元素

            boolean advance = true;
            boolean finishing = false; // to ensure sweep before committing nextTab
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing)   //分支1
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {   //分支2
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt    // 分支3
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                 //省略一些具体实现 
            }
    

    这里以一个长度32的ConcurrentHashMap扩容到64为例,记住,之前在申请空间时:

            if (nextTab == null) {            // initiating
             //省略代码
                nextTable = nextTab;
                transferIndex = n;
            }
    

    这也就意味着,一个线程刚开始扩容时,transferIndex = 32, i=0,bound=0;,所以会进入分支3,
    这个时候会通过CAS操作将transferIndex赋值为16,bound=16, i=31. 记住transferIndex,每个线程扩容起始位置都是由它决定的,这个线程将他改成了16,那么下个并发线程扩容就会从16开始了,从而做到每个线程负责自己的数据。最终大部分操作都会进入分支1,通过--i来遍历该线程负责的部分数组,从而拷贝数据。

    3.3 迁移table数组中的单个元素。

    3.3.1 该位置没有数据

              else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
    

    这个时候无需拷贝,只要将再这个位置放置一个ForwardingNode节点即可。

    3.3.2 该位置已经被拷贝过了

    else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
    

    此处的MOVED为-1,ForwardingNode的hash值都为-1,说明这个节点已经被处理过了。所有数据拷贝完成后会重新遍历一遍检查,这个时候时会进入这个分支。

    3.3.3 该位置是一个链表

                synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                  //省略部分代码
                }
    

    这里的思想其实和HashMap一样(不熟悉的可以先看看本文开始推荐的两篇文章),都是把链表拆成两部分,一部分放在nextTab的i位置,一部分放在nextTab的i+n位置。
    注意,这里使用了synchronized锁住了当前节点,这也是一种没办法的事。但由于锁住的只是一个节点,并不会影响到其他扩容线程。

    3.3.4 该位置是一个红黑树

                      else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
    

    红黑树道理也是一样,也是拆分成两部分,但这里会统计放在nextTab的i位置的数量和i+n位置的数量,如果低于6会退化成链表。

    3.3.5 扩容结束

              if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {  //分支1
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  //分支2
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            //走到这里说明所有的扩容线程都结束了,也就是此次扩容彻底结束。
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
    

    当自己的部分数组都copy完毕后,通常会先进入分支2,将finishing置为true,i=n,由于此时i=n,会重新遍历一遍自己负责的部分数组,确保每个节点都被复制了,最后会进入分支1,将sizeCtl 置为下次扩容的阈值,其实sizeCtl = n2-n/2 = 2n 0.75. 也就是新的容量乘以扩容因子。至此,此次该线程扩容结束。

    4. 总结

    总体而言,本文并没有再重复介绍一些和HashMap中一样的算法,比如具体的hash算法,比如如何判断扩容时怎样将一个链表拆成两部分,主要介绍了思路以及并发相关的,个人理解,如果有错误恳请指正。

    相关文章

      网友评论

        本文标题:ConcurrentHashMap源码分析(JDK1.8)——扩

        本文链接:https://www.haomeiwen.com/subject/frpgyqtx.html