在之前计数方法addCount()方法中,它有两部分内容,一个是计数另一个是扩容,在扩容语句中有这样一句:
transfer(tab, null);
这句话表示,当第一个线程执行扩容操作的时候,会向transfer()传入现在的表和一个空对象,好,我们一步步来看下这个方法的代码。
第一部分代码
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride为每个cpu所需要处理的桶个数
int n = tab.length, stride;
//将 n / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
//这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//新的 table 尚未初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//扩容两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//如果扩容出现错误,则sizeCtl赋值为int最大值返回
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex代表的是旧数组的尾节点
transferIndex = n;
}
首先将旧表长度赋值给n,然后另外申请了一个变量stride,这个变量表示的是每个CPU所需要处理的桶的个数,代码中的第一个判断语句可以看出,stride最小值为16,也就是说,当桶的个数小于16的时候,默认一个线程来处理所有的桶。
第二条判断语句是对nextTab的判断,一开始的时候,我讲到,在addCount()方法中为该参数传入了一个Null值,说明table尚未初始化,需要进行初始化,容量直接申请为旧表的两倍,如果在扩容中出现错误,那么sizeCtl赋值为int最大值返回,然后需要注意的是transferIndex表示的迁移数据的下标,一开始的下标指向旧表的尾节点。
第二部分代码
int nextn = nextTab.length;
//创建一个标识类用于占位,当其他线程扫描到这个类的时候会跳过
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//数组一层层推进的标识符
boolean advance = true;
//扩容结束的标识符
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//每个线程进入这里,先获取自己需要处理桶的区间,第一次进入因为--i,会直接跳到else if中,对nextIndex进行赋值操作
//这里设置了一个i = -1,
// 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
//防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
//防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;//线程负责桶区间当前最小下标
i = nextIndex - 1;//线程负责桶区间当前最大下标
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//如果完成扩容
nextTable = null;// 删除成员变量
table = nextTab;// 更新 table
sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
return;// 结束方法。
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//扩容结束
i = n; // 再次循环检查一次表
}
}
//获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
else if ((f = tabAt(tab, i)) == null)
//如果写进fwd,则推进
advance = casTabAt(tab, i, null, fwd);
//如果当前位置不是Null,且hash值为-1,说明其他线程已经处理过这个桶,继续推进
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
第二部分代码主要是每个CPU负责的桶区间进行规划,然后进行不同情况的判定工作,介绍下关键的几个量
1、fwd:这个类是个标识类,用于指向新表用的,其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。
2、advance:这个变量是用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识
3、finishing:这个变量用于提示扩容是否结束用的
首先使用一个while(advance)循环出每个进入该循环的线程所要负责的桶的区间,再判断扩容是否结束,如果扩容结束,清空临死变量,更新 table 变量,更新库容阈值。如果没完成,但已经无法领取区间(没了),该线程退出该方法,并将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
接着判断当前位置是否为空,为空则插入cas方法插入fwd,如果插入fwd成功继续推进
如果当前位置不为空的情况下,hash值等于-1,表示当前桶已经完成扩容和数据迁移操作
第三部分代码
else {
//锁住首节点
synchronized (f) {
//二次判断地址偏移量所指向位置是否与f对象相等
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//fh>0为链表数据转移
if (fh >= 0) {
//首节点的hash值
int runBit = fh & n;
Node<K,V> lastRun = f;//最后一个节点
//这个地方跟hashmap不同,hashmap是直接推进到链表尾
//这个地方的处理在于想保留链表后所有hash值计算相同的点,这些点可以重复利用,不需要重新new
//所以下边需要获取哪个节点后的hash值全部相同
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//如果runBit==0,说明低位重用
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//高位重用
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//注意创建node节点的最后一个参数ln指代的是next
//也就是说,我们不再是从头到尾节点,而是从为节点开始向头结点走
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将遍历好的链表一放入i中
setTabAt(nextTab, i, ln);
//将遍历好的链表二放入i+n中
setTabAt(nextTab, i + n, hn);
//将fwd占位放入旧表中
setTabAt(tab, i, fwd);
//向前推进
advance = true;
}
//如果是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//遍历,将链表转成双端链表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果长度小于等于6,则将红黑树转换成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
第三部分代码跟hashmap中的类似,也是使用synchronized来锁定头结点,然后根据链表和红黑树分开进行判断数据迁移,如果是链表,ConcurrentHashmap比hashmap多了一个步骤,使用runBit来记录头结点hash值,然后遍历链表一个个跟runBit进行比较,runBit只有两种可能,一种是为0,另一种是为1,也就是说,如果为0,则低位上可以有很多重用的节点,如果为1则表示高位上有很多重用的节点,用lastRun来记录某个节点后runBit不变,也就是lastRun节点后的节点都是重用的,然后遍历lastRun之前的节点,这个地方也不同,hashmap是从头通过node的next属性进行从头往后添加节点,而ConcurrentHashmap是从尾开始往前添加节点,直到添加到首节点,然后通过setTabAt()方法添加到新表的桶中,最后在旧表放入一个fwd占位符。
红黑树跟链表也差不多,分出两个红黑树,一个在高位一个在低位,分出来的红黑树再进行长度判断,长度小于等于6的会通过方法untreeify()转换成链表结构存储。
参考链接:
https://www.jianshu.com/p/2829fe36a8dd
https://blog.csdn.net/u011392897/article/details/60479937
https://www.cnblogs.com/stateis0/p/9062095.html
网友评论