今天接着上周继续学习ConcurrentHashMap源码,上次主要看了一下它的成员变量、常用常量、构造函数、初始化方法以及增加、修改(删除是一种特殊情况)方法,大致对它有个了解。今天主要接着上次未完的地方,来看一下ConcurrentHashMap的迁移操作,也可以说是扩容操作,这个方法将原有table的节点迁移到新的table上。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;//nextIndex的值始终等于transferIndex的值。
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
这个方法代码比较多,看起来还是有点头晕,只能试着来分析一下。
1、如果nextTab为null,则以当前table长度两倍新建一个Node数组,即初始化新的数组,同时将新建数组引用赋值变量给属性nextTable,且transferIndex(扩容时指向nextTable的索引)值设为原table的长度。
2、以nextTab为参数,构建一个新的ForwardingNode节点,该节点保存了nextTable的引用。接着并定义两个布尔类型的变量advance和finishing,分别表示是否向前循环和是否完成扩容,以便控制循环。
3、for循环内部的while循环,每次执行一次--i并判断当前的--i是否大于等于边界值或者是否完成扩容操作,如果是,则将advance置为false,结束while循环;否则且将transferIndex赋值给nextIndex,并判断其值是否小于等于0,是则表明已经没有节点需要进行遍历,将advance置为false,i置为-1;否则调用unsafe的compareAndSwapInt,修改transferIndex的值,更新值为nextIndex - stride或者0,这里有点迷惑,不能理解stride变量。如果更新成功,则将advance置为false,i的值置为index -1 ,且将边界值bound置为nextIndex - stride或者0,这点确实不太能理解(有点尴尬)。
4、接着是对(i < 0 || i >= n || i + n >= nextn)进行条件判断,如果不满足则跳转到5;满足条件说明循环已经完成。如果finishing为true,表明扩容操作已经完成,则nextTable置为null,并将nextTable赋值给table,这样table实际指向新的table数组,然后sizeCtl的值改为原table长度的1.5倍,返回结束本方法。接着调用unsafe的compareAndSwapInt,成功将sizeCtl的值减1(增加一个线程进行扩容操作),将finishing和advance的值都更新为true,并将原table长度赋值给i。
5、如果根据i查找节点为null,说明该桶为空,那么调用casTabAt方法将2中新建的ForwardingNode放到相应的位置,并将结果赋给advance;如果该节点不为bull,跳转到6。
6、如果根据i查找节点为null,且该节点的hash等于MOVED(值为-1),说明该节点已经是一个ForwardingNode,即这个节点已经被处理过了,不需要再次处理,将advance值置为true。如果该节点的hash不等于MOVED常量,跳转到7。
7、给该节点(桶)加上锁,如果根据i查询节点的hash大于等于0,即是一个普通节点,那么从该结点的下一个节点开始遍历链表,如果遇到某个节点hash不等于链表头结点的hash值时,修改runBit和lastRun的值(这点没看明白,硬着头皮继续向下看吧)。如果runBit值为0,则将lastRun的引用赋值给ln,而hn为null。否则,则将则将lastRun的引用赋值给hn,而ln为null。如果根据i查询节点是一个树节点(红黑树),则跳到9。
8、继续遍历链表,且要满足遍历到的当前节点不等于lastRun节点。如果(ph & n) == 0,说明这时候ln的值就是lastRun,而hn为null,以当前节点的hash、key、value和ln为参数,构造新的Node节点(引用赋值给ln),且新建节点的next引用指向ln;否则ln为null,而hn的值是lastRun,同样以当前节点的hash、key、value和ln为参数,构造新的Node节点(赋值给hn),不过新建节点的next引用指向的是hn。遍历完成以后实际形成了两个链表,ln和hn。然后调用setTabAt方法,将链表ln放置在新table索引为i的位置,而将链表hn放置到新table索引为i+n的位置,并将2新建的ForwardingNode节点放置旧的table索引为i的位置(ForwardingNode节点的hash等于MOVED),即表明该节点已经处理过,最后将advance置为true。
9、如果为红黑树而不是链表,依然对其进行遍历,以遍历到当前节点的hash、key、value为参数,构建一个新的树节点。如果当前节点的hash值与table长度的与运算等于0,即(h & n)== 0(其实就是当前节点的hash为0),那么将loTail赋值给新建节点的前继引用,并判断其是否为null,是,则将新建树节点的引用赋值给lo;否则将新建节点赋值给loTail的后继引用,同时将新建节点赋值给loTail,lc自增。如果当前节点的hash不为0,那么将hiTail赋值给新建节点的前继引用,并判断其是否为null,是,则将新建树节点的引用赋值给hi;否则将新建节点赋值给hiTail的后继引用,同时将新建节点赋值给hiTail,hc自增。遍历完成以后,判断lc和hc的值是否小于链表转树的阈值。最终形成两个链表ln和hn,同8中的一样,也调用setTabAt方法,将ln放置在新table索引为i的位置,而将hn放置到新table索引为i+n的位置,并将2新建的ForwardingNode节点放置旧的table索引为i的位置。
到这里整个ConcurrentHashMap的扩容方法就完成了,感觉还是有一点复杂,而且对有些地方不是特别的理解,也可能自己是对它的数据结构还不够熟悉,最后红黑树这里,再次感觉到数据结构算法的重要性,自己还是应该再去好好看看算法的一些知识,毕竟有些知识点已经忘记的差不多了。
关于ConcurrentHashMap的其他方法我觉得也是应该要去看一下的,这里就不专门去分析了,自己也需要再去消化消化这部分代码。因为自己能力所限,所以如果有问题的地方希望大家能够指正,也希望能和各位多多交流探讨。
网友评论