美文网首页
ConcurrentHashMap源码分析(二)

ConcurrentHashMap源码分析(二)

作者: 非典型_程序员 | 来源:发表于2019-02-24 21:47 被阅读0次

    今天接着上周继续学习ConcurrentHashMap源码,上次主要看了一下它的成员变量、常用常量、构造函数、初始化方法以及增加、修改(删除是一种特殊情况)方法,大致对它有个了解。今天主要接着上次未完的地方,来看一下ConcurrentHashMap的迁移操作,也可以说是扩容操作,这个方法将原有table的节点迁移到新的table上。

        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            boolean advance = true;
            boolean finishing = false; // to ensure sweep before committing nextTab
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;//nextIndex的值始终等于transferIndex的值。
                    if (--i >= bound || finishing)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                else if ((f = tabAt(tab, i)) == null)
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    

    这个方法代码比较多,看起来还是有点头晕,只能试着来分析一下。
    1、如果nextTab为null,则以当前table长度两倍新建一个Node数组,即初始化新的数组,同时将新建数组引用赋值变量给属性nextTable,且transferIndex(扩容时指向nextTable的索引)值设为原table的长度。
    2、以nextTab为参数,构建一个新的ForwardingNode节点,该节点保存了nextTable的引用。接着并定义两个布尔类型的变量advance和finishing,分别表示是否向前循环和是否完成扩容,以便控制循环。
    3、for循环内部的while循环,每次执行一次--i并判断当前的--i是否大于等于边界值或者是否完成扩容操作,如果是,则将advance置为false,结束while循环;否则且将transferIndex赋值给nextIndex,并判断其值是否小于等于0,是则表明已经没有节点需要进行遍历,将advance置为false,i置为-1;否则调用unsafe的compareAndSwapInt,修改transferIndex的值,更新值为nextIndex - stride或者0,这里有点迷惑,不能理解stride变量。如果更新成功,则将advance置为false,i的值置为index -1 ,且将边界值bound置为nextIndex - stride或者0,这点确实不太能理解(有点尴尬)。
    4、接着是对(i < 0 || i >= n || i + n >= nextn)进行条件判断,如果不满足则跳转到5;满足条件说明循环已经完成。如果finishing为true,表明扩容操作已经完成,则nextTable置为null,并将nextTable赋值给table,这样table实际指向新的table数组,然后sizeCtl的值改为原table长度的1.5倍,返回结束本方法。接着调用unsafe的compareAndSwapInt,成功将sizeCtl的值减1(增加一个线程进行扩容操作),将finishing和advance的值都更新为true,并将原table长度赋值给i。
    5、如果根据i查找节点为null,说明该桶为空,那么调用casTabAt方法将2中新建的ForwardingNode放到相应的位置,并将结果赋给advance;如果该节点不为bull,跳转到6。
    6、如果根据i查找节点为null,且该节点的hash等于MOVED(值为-1),说明该节点已经是一个ForwardingNode,即这个节点已经被处理过了,不需要再次处理,将advance值置为true。如果该节点的hash不等于MOVED常量,跳转到7。
    7、给该节点(桶)加上锁,如果根据i查询节点的hash大于等于0,即是一个普通节点,那么从该结点的下一个节点开始遍历链表,如果遇到某个节点hash不等于链表头结点的hash值时,修改runBit和lastRun的值(这点没看明白,硬着头皮继续向下看吧)。如果runBit值为0,则将lastRun的引用赋值给ln,而hn为null。否则,则将则将lastRun的引用赋值给hn,而ln为null。如果根据i查询节点是一个树节点(红黑树),则跳到9。
    8、继续遍历链表,且要满足遍历到的当前节点不等于lastRun节点。如果(ph & n) == 0,说明这时候ln的值就是lastRun,而hn为null,以当前节点的hash、key、value和ln为参数,构造新的Node节点(引用赋值给ln),且新建节点的next引用指向ln;否则ln为null,而hn的值是lastRun,同样以当前节点的hash、key、value和ln为参数,构造新的Node节点(赋值给hn),不过新建节点的next引用指向的是hn。遍历完成以后实际形成了两个链表,ln和hn。然后调用setTabAt方法,将链表ln放置在新table索引为i的位置,而将链表hn放置到新table索引为i+n的位置,并将2新建的ForwardingNode节点放置旧的table索引为i的位置(ForwardingNode节点的hash等于MOVED),即表明该节点已经处理过,最后将advance置为true。
    9、如果为红黑树而不是链表,依然对其进行遍历,以遍历到当前节点的hash、key、value为参数,构建一个新的树节点。如果当前节点的hash值与table长度的与运算等于0,即(h & n)== 0(其实就是当前节点的hash为0),那么将loTail赋值给新建节点的前继引用,并判断其是否为null,是,则将新建树节点的引用赋值给lo;否则将新建节点赋值给loTail的后继引用,同时将新建节点赋值给loTail,lc自增。如果当前节点的hash不为0,那么将hiTail赋值给新建节点的前继引用,并判断其是否为null,是,则将新建树节点的引用赋值给hi;否则将新建节点赋值给hiTail的后继引用,同时将新建节点赋值给hiTail,hc自增。遍历完成以后,判断lc和hc的值是否小于链表转树的阈值。最终形成两个链表ln和hn,同8中的一样,也调用setTabAt方法,将ln放置在新table索引为i的位置,而将hn放置到新table索引为i+n的位置,并将2新建的ForwardingNode节点放置旧的table索引为i的位置。
    到这里整个ConcurrentHashMap的扩容方法就完成了,感觉还是有一点复杂,而且对有些地方不是特别的理解,也可能自己是对它的数据结构还不够熟悉,最后红黑树这里,再次感觉到数据结构算法的重要性,自己还是应该再去好好看看算法的一些知识,毕竟有些知识点已经忘记的差不多了。
    关于ConcurrentHashMap的其他方法我觉得也是应该要去看一下的,这里就不专门去分析了,自己也需要再去消化消化这部分代码。因为自己能力所限,所以如果有问题的地方希望大家能够指正,也希望能和各位多多交流探讨。

    相关文章

      网友评论

          本文标题:ConcurrentHashMap源码分析(二)

          本文链接:https://www.haomeiwen.com/subject/oozuyqtx.html