一、认识相关字段
相关字段,
// 两种情况
// 1. counterCells 数组未初始化,在没有线程争用时,将 size 的变化写入此字段
// 2. 初始化 counterCells 数组时,没有获取到 cellsBusy 锁,会再次尝试将 size 的变化写入此字段
private transient volatile long baseCount;
// 用于同步 counterCells 数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// counterCells 数组一旦初始化,size 的变化将不再尝试写入 baseCount
// 可以将 size 的变化写入数组中的任意元素
// 可扩容,长度保持为 2 的幂
private transient volatile CounterCell[] counterCells;
其中,CounterCell 是 ConcurrentHashMap 的一个静态内部类。
/* ---------------- Counter support -------------- */
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
二、计算 size 的源码分析
计算 size 的方法调用链:size() -> sumCount(),
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n); // 将 n 裁剪到 [0, Integer.MAX_VALUE] 内
}
// 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到,map 中键值对的个数通过求 baseCount 与 counterCells 非空元素的和得到。
那么现在的问题就是 baseCount 和 counterCells 里的值都是什么时候计算的呢?
ConcurrentHashMap 内部所有改变键值对个数的方法都会调用 addCount 方法更新键值对的计数。
addCount 方法源码,
// 参数 x 表示键值对个数的变化值,如果为正,表示新增了元素,如果为负,表示删除了元素
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果 counterCells 为空,则直接尝试通过 CAS 将 x 累加到 baseCount 中
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// counterCells 非空
// 或 counterCells 为空,但 CAS baseCount 失败都会来到这里
CounterCell a; long v; int m;
boolean uncontended = true; // CAS 数组元素时,有没有发生线程争用的标志
// 如果当前线程探针哈希到的数组元素非空,则尝试将 x 累加到对应数组元素
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// counterCells 为空,或其长度小于1
// 或当前线程探针哈希到的数组元素为空
// 或当前线程探针哈希到的数组元素非空,但 CAS 数组元素失败
// 都会调用 fullAddCount 方法来完成 x 的写入
fullAddCount(x, uncontended);
return; // 如果调用过 fullAddCount,则当前线程一定不会协助扩容
}
// 走到这说明,CAS 数组元素成功
// 此时如果 check <= 1,也不协助可能会发生的扩容
if (check <= 1)
return;
// 如果 check 大于 1,则计算当前 map 的 size,为判断是否需要扩容做准备
s = sumCount();
}
// size 的变化已经写入完成
// 后面如果 check >= 0,则判断当前的 size 是否会触发扩容
if (check >= 0) {
// 扩容相关的逻辑
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
如果不管其中与扩容有关的逻辑,addCount 方法记录 size 变化的过程可以分为两类情况,
-
counterCells 数组未初始化
a. CAS 一次 baseCount
b. 如果 CAS 失败,则调用 fullAddCount 方法 -
counterCells 数组已初始化
a. CAS 一次当前线程探针哈希到的数组元素
b. 如果 CAS 失败,则调用 fullAddCount 方法
fullAddCount 方法的源码,
// 只被 addCount 方法调用
// 如果 counterCells 数组未初始化
// 或者线程哈希到的 counterCells 数组元素未初始化
// 或者 CAS 数组元素失败,都会调用此方法
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 判断线程探针哈希值是否初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true; // 重新假设未发生争用
}
boolean collide = false; // 是否要给 counterCells 扩容的标志
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 数组不为空且长度大于 0
if ((a = as[(n - 1) & h]) == null) {
// 尝试初始化线程探针哈希到的数组元素
if (cellsBusy == 0) { // Try to attach new Cell
// 注意,这里已经把 x 放入对象
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && // 准备初始化数组元素,要求 cellsBusy 为 0,并尝试将其置 1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 获得 cellsBusy 锁
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 判断有没有被其它线程初始化
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放 cellsBusy 锁
}
if (created) // 初始化元素成功,直接退出循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash(指的是更改当前线程的探针哈希值)
// wasUncontended 为 true 执行到这
// 尝试将 x 累加进数组元素
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// CAS 失败
// 判断 counterCells 是否正在扩容,或数组长度是否大于等于处理器数
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 如果数组没有在扩容,且数组长度小于处理器数
// 此时,如果 collide 为 false,则把它变成 true
// 在下一轮循环中,如果 CAS 数组元素继续失败,就会触发 counterCells 扩容
else if (!collide)
collide = true;
// 如果 collide 为 true,则尝试给 counterCells 数组扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h); // 更改当前线程的探针哈希值
}
// counterCells 数组为空或长度为 0
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 获取 cellsBusy 锁
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2]; // 初始长度为 2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// counterCells 数组为空或长度为 0,并且获取 cellsBusy 锁失败
// 则会再次尝试将 x 累加到 baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
} // end for
}
这个方法细节较多,比较复杂。
细节方面请参考上面的源码和注释。
下面,我们只从整体上看它实现了哪些功能,
- 线程探针哈希值的初始化。
- counterCells 数组的初始化和扩容。
- counterCells 元素的初始化。
- 将 size 的变化,写入 counterCells 中的某一个元素。(如果 counterCells 初始化时,获取锁失败,则还会尝试将 size 的变化,写入 baseCount。)
三、小小总结
代码虽然看起来很复杂,但 Doug Lea 计算 size 的思想很明确,也很巧妙。
指导思想: 尽量降低线程冲突,以最快的速度写入 size 的变化。
如何降低冲突?
如果没有冲突发生,只将 size 的变化写入 baseCount。
一旦发生冲突,就用一个数组(counterCells)来存储后续所有 size 的变化。这样,线程只要对任意一个数组元素写入 size 变化成功即可,数组长度越长,线程发生冲突的可能性就越小。
关于 counterCells 扩容:
如果 CAS 数组元素连续失败两次,就会进行 counterCells 数组的扩容,直到达到机器的处理器数为止。
比如我的机器是双核四线程,真正能并行的线程数是 4,所以在我机器上 counterCells 初始化后,最多扩容一次。
关于线程的探针哈希值是如何初始化和更改的,可以参考:关于 ConcurrentHashMap 1.8 中的线程探针哈希
网友评论