美文网首页
ConcurrentHashMap解析三(transfer方法解

ConcurrentHashMap解析三(transfer方法解

作者: 代码potty | 来源:发表于2018-09-26 22:38 被阅读0次

    在之前计数方法addCount()方法中,它有两部分内容,一个是计数另一个是扩容,在扩容语句中有这样一句:

        transfer(tab, null);
    

    这句话表示,当第一个线程执行扩容操作的时候,会向transfer()传入现在的表和一个空对象,好,我们一步步来看下这个方法的代码。

    第一部分代码

        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
                //stride为每个cpu所需要处理的桶个数
                int n = tab.length, stride;
                //将 n / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
                //这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
                if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                    stride = MIN_TRANSFER_STRIDE; // subdivide range
                    //新的 table 尚未初始化
                if (nextTab == null) {            // initiating
                    try {
                        @SuppressWarnings("unchecked")
                        //扩容两倍
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                        nextTab = nt;
                    } catch (Throwable ex) {      // try to cope with OOME
                    //如果扩容出现错误,则sizeCtl赋值为int最大值返回
                        sizeCtl = Integer.MAX_VALUE;
                        return;
                    }
                    nextTable = nextTab;
                    //transferIndex代表的是旧数组的尾节点
                    transferIndex = n;
                }
    

    首先将旧表长度赋值给n,然后另外申请了一个变量stride,这个变量表示的是每个CPU所需要处理的桶的个数,代码中的第一个判断语句可以看出,stride最小值为16,也就是说,当桶的个数小于16的时候,默认一个线程来处理所有的桶。
    第二条判断语句是对nextTab的判断,一开始的时候,我讲到,在addCount()方法中为该参数传入了一个Null值,说明table尚未初始化,需要进行初始化,容量直接申请为旧表的两倍,如果在扩容中出现错误,那么sizeCtl赋值为int最大值返回,然后需要注意的是transferIndex表示的迁移数据的下标,一开始的下标指向旧表的尾节点。

    第二部分代码

            int nextn = nextTab.length;
                //创建一个标识类用于占位,当其他线程扫描到这个类的时候会跳过
                ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
                //数组一层层推进的标识符
                boolean advance = true;
                //扩容结束的标识符
                boolean finishing = false; // to ensure sweep before committing nextTab
                for (int i = 0, bound = 0;;) {
                    Node<K,V> f; int fh;
                    //每个线程进入这里,先获取自己需要处理桶的区间,第一次进入因为--i,会直接跳到else if中,对nextIndex进行赋值操作
                    //这里设置了一个i = -1,
                  // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
                    while (advance) {
                        int nextIndex, nextBound;
                        if (--i >= bound || finishing)
                        //防止在没有成功处理一个桶的情况下却进行了推进
                            advance = false;
                        else if ((nextIndex = transferIndex) <= 0) {
                            i = -1;
                            //防止在没有成功处理一个桶的情况下却进行了推进
                            advance = false;
                        }
                        else if (U.compareAndSwapInt
                                 (this, TRANSFERINDEX, nextIndex,
                                  nextBound = (nextIndex > stride ?
                                               nextIndex - stride : 0))) {
                            bound = nextBound;//线程负责桶区间当前最小下标
                            i = nextIndex - 1;//线程负责桶区间当前最大下标
                            advance = false;
                        }
                    }
                    if (i < 0 || i >= n || i + n >= nextn) {
                        int sc;
                        if (finishing) {//如果完成扩容
                            nextTable = null;// 删除成员变量
                            table = nextTab;// 更新 table
                            sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
                            return;// 结束方法。
                        }
                        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                            if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                                return;
                            finishing = advance = true;//扩容结束
                            i = n; // 再次循环检查一次表
                        }
                    }
                    //获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
                    else if ((f = tabAt(tab, i)) == null)
                        //如果写进fwd,则推进
                        advance = casTabAt(tab, i, null, fwd);
                        //如果当前位置不是Null,且hash值为-1,说明其他线程已经处理过这个桶,继续推进
                    else if ((fh = f.hash) == MOVED)
                        advance = true; // already processed
    

    第二部分代码主要是每个CPU负责的桶区间进行规划,然后进行不同情况的判定工作,介绍下关键的几个量
    1、fwd:这个类是个标识类,用于指向新表用的,其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。
    2、advance:这个变量是用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识
    3、finishing:这个变量用于提示扩容是否结束用的

    首先使用一个while(advance)循环出每个进入该循环的线程所要负责的桶的区间,再判断扩容是否结束,如果扩容结束,清空临死变量,更新 table 变量,更新库容阈值。如果没完成,但已经无法领取区间(没了),该线程退出该方法,并将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
    接着判断当前位置是否为空,为空则插入cas方法插入fwd,如果插入fwd成功继续推进
    如果当前位置不为空的情况下,hash值等于-1,表示当前桶已经完成扩容和数据迁移操作

    第三部分代码

              else {
                    //锁住首节点
                        synchronized (f) {
                        //二次判断地址偏移量所指向位置是否与f对象相等
                            if (tabAt(tab, i) == f) {
                                Node<K,V> ln, hn;
                                //fh>0为链表数据转移
                                if (fh >= 0) {
                                    //首节点的hash值
                                    int runBit = fh & n;
                                    Node<K,V> lastRun = f;//最后一个节点
                                    //这个地方跟hashmap不同,hashmap是直接推进到链表尾
                                    //这个地方的处理在于想保留链表后所有hash值计算相同的点,这些点可以重复利用,不需要重新new
                                    //所以下边需要获取哪个节点后的hash值全部相同
                                    for (Node<K,V> p = f.next; p != null; p = p.next) {
                                        int b = p.hash & n;
                                        if (b != runBit) {
                                            runBit = b;
                                            lastRun = p;
                                        }
                                    }
                                    //如果runBit==0,说明低位重用
                                    if (runBit == 0) {
                                        ln = lastRun;
                                        hn = null;
                                    }
                                    //高位重用
                                    else {
                                        hn = lastRun;
                                        ln = null;
                                    }
                                    for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                        int ph = p.hash; K pk = p.key; V pv = p.val;
                                        if ((ph & n) == 0)
                                        //注意创建node节点的最后一个参数ln指代的是next
                                        //也就是说,我们不再是从头到尾节点,而是从为节点开始向头结点走
                                            ln = new Node<K,V>(ph, pk, pv, ln);
                                        else
                                            hn = new Node<K,V>(ph, pk, pv, hn);
                                    }
                                    //将遍历好的链表一放入i中
                                    setTabAt(nextTab, i, ln);
                                    //将遍历好的链表二放入i+n中
                                    setTabAt(nextTab, i + n, hn);
                                    //将fwd占位放入旧表中
                                    setTabAt(tab, i, fwd);
                                    //向前推进
                                    advance = true;
                                }
                                //如果是红黑树
                                else if (f instanceof TreeBin) {
                                    TreeBin<K,V> t = (TreeBin<K,V>)f;
                                    TreeNode<K,V> lo = null, loTail = null;
                                    TreeNode<K,V> hi = null, hiTail = null;
                                    int lc = 0, hc = 0;
                                    //遍历,将链表转成双端链表
                                    for (Node<K,V> e = t.first; e != null; e = e.next) {
                                        int h = e.hash;
                                        TreeNode<K,V> p = new TreeNode<K,V>
                                            (h, e.key, e.val, null, null);
                                        if ((h & n) == 0) {
                                            if ((p.prev = loTail) == null)
                                                lo = p;
                                            else
                                                loTail.next = p;
                                            loTail = p;
                                            ++lc;
                                        }
                                        else {
                                            if ((p.prev = hiTail) == null)
                                                hi = p;
                                            else
                                                hiTail.next = p;
                                            hiTail = p;
                                            ++hc;
                                        }
                                    }
                                    //如果长度小于等于6,则将红黑树转换成链表
                                    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                    setTabAt(nextTab, i, ln);
                                    setTabAt(nextTab, i + n, hn);
                                    setTabAt(tab, i, fwd);
                                    advance = true;
                                }
                            }
                        }
                    }
                }
            }
    

    第三部分代码跟hashmap中的类似,也是使用synchronized来锁定头结点,然后根据链表和红黑树分开进行判断数据迁移,如果是链表,ConcurrentHashmap比hashmap多了一个步骤,使用runBit来记录头结点hash值,然后遍历链表一个个跟runBit进行比较,runBit只有两种可能,一种是为0,另一种是为1,也就是说,如果为0,则低位上可以有很多重用的节点,如果为1则表示高位上有很多重用的节点,用lastRun来记录某个节点后runBit不变,也就是lastRun节点后的节点都是重用的,然后遍历lastRun之前的节点,这个地方也不同,hashmap是从头通过node的next属性进行从头往后添加节点,而ConcurrentHashmap是从尾开始往前添加节点,直到添加到首节点,然后通过setTabAt()方法添加到新表的桶中,最后在旧表放入一个fwd占位符。
    红黑树跟链表也差不多,分出两个红黑树,一个在高位一个在低位,分出来的红黑树再进行长度判断,长度小于等于6的会通过方法untreeify()转换成链表结构存储。

    参考链接:
    https://www.jianshu.com/p/2829fe36a8dd
    https://blog.csdn.net/u011392897/article/details/60479937
    https://www.cnblogs.com/stateis0/p/9062095.html

    相关文章

      网友评论

          本文标题:ConcurrentHashMap解析三(transfer方法解

          本文链接:https://www.haomeiwen.com/subject/zutsoftx.html