内容目录
分段锁设计提高统计元素数量的性能
最后一个部分,也是ConcurrentHashMap中设计比较巧妙的地方。我们知道,当调用完put
方法后,ConcurrentHashMap必须会增加当前元素的个数,方便在size()
方法中获得存储的数据大小。代码的实现如下。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//省略部分代码....
addCount(1L, binCount);
return null;
}
很多读者可能会问,这块代码有什么好值得分析的呢?请大家思考一个问题,在常规的集合中,我们只需要一个全局int类型的字段去保存元素个数即可,每次添加一个元素,就对这个size变量+1。但是在ConcurrentHashMap集合中,需要保证对于该变量修改的线程安全性,那怎么设计呢?难道又用锁来实现?很显然,通过前面的分析,使用同步锁带来的性能开销太大,所以不适合。因此,在ConcurrentHashMap中,采用的是自旋锁和分段锁的设计。
size计数的基本原理分析
在ConcurrentHashMap中,采用两种方式来保存元素的个数。
- 当线程竞争不激烈时,直接对
baseCount+1
来增加元素个数。 - 当线程竞争比较激烈时,通过构建一个
CounterCell
数组,默认长度是2,然后通过随机算法选择一个CounterCell
,针对该CounterCell
中的value进行保存。
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
元素个数累加的整体流程如下图所示。
在这里插入图片描述addCount方法分析
addCount方法的完整代码如下,从整体结构来看,包含两个部分的逻辑。
- 对ConcurrentHashMap中元素个数进行累加。
- 通过
check>=0
来判断是否需要扩容,这部分代码在前面分析tranfer()
方法中有讲过,就不再赘述。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
我们重点来分析下面这段增加元素个数的代码。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
}
上述代码的处理逻辑如下:
- 在if判断中,第一步就是先通过
U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
修改全局成员变量baseCount进行累加。这个方法在线程竞争的情况下会返回false。 - 如果第一步执行失败,则尝试使用
CounterCell
进行累加,这里面有几个逻辑。- 在if判断中通过
as=counterCells
把当前用来记录元素个数的全局变量counterCells
赋值给了as。 - 在第二个if判断中,有几个具体的判断逻辑,这些判断逻辑任何一个为true,都会调用
fullAddCount
方法去实现元素个数累加。判断逻辑简单说明一下。-
as==null
说明CounterCell
数组还未初始化。 -
(m = as.length - 1) < 0
,这个判断理论上来说不存在,因为一旦CounterCell
被初始化,就意味着as.length
长度是2,所以不可能出现结果小于0的情况,如果各位读者有其他理解可以反馈给笔者。 -
(a = as[ThreadLocalRandom.getProbe() & m]) == null
说明CounterCell
数组已经创建了,但是通过探针hash定位在数组中中没有对象实例,说明这个数组中还存在没有CounterCell
实例对象的情况。 -
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
执行到了这个判断逻辑,说明当前CounterCell
数组每个位置都有一个CounterCell
实例对象,直接通过CAS操作针对上一个步骤获取到的CounterCell
的value值进行累加,如果失败,说明存在竞争。
-
- 在if判断中通过
-
sumCount();
返回总的元素个数,实际上应该就是CounterCell
数组和baseCount
两者累加计算的结果。
细心的读者不难发现,在if判断中,首先是执行
(as = counterCells) != null
,然后再尝试对baseCount做累加。这里有一个概率性的问题,就是当一个集合发生过并发时,后续发生并发的可能性会越高,这种思想在并发编程以及很多应用场景中都有体现。
fullAddCount方法分析
fullAddCount的主要功能有几个。
- 如果
CounterCell
数组还未初始化,则先初始化。 - 如果已经初始化,则随机找到其中一个值进行累加更新。
- 如果线程竞争加剧,则会尝试对
CounterCell
数组进行扩容。
接下来,笔者分别针对这三个部分进行分析。
第一个部分,初始化CounterCell
CounterCell
数组初始化处理不难理解。
-
cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)
这个部分,是通过CellsBusy字段来表示抢占到锁的标记,通过CAS修改CellsBusy=1来表示占有状态。 -
CounterCell[] rs = new CounterCell[2];
构造一个长度为2的CounterCell
数组。 -
rs[h & 1] = new CounterCell(x);
,把当前增加的元素个数x
保存到rs[h&1]
的位置。 -
counterCells = rs;
把rs
赋值给全局对象counterCells
。
private final void fullAddCount(long x, boolean wasUncontended) {
//省略部分代码....
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
//省略部分代码....
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
}
}
第二个部分,增加元素个数
如果CounterCell数组已经初始化了,也存在两种情况。
第一种情况是通过(a = as[(n - 1) & h]) == null
计算到某个数组下标存在null的对象时,那么直接把当前要增加的元素个数x
保存到数组中的某一个对象中即可。
-
(as = counterCells) != null && (n = as.length) > 0
表示counterCells
数组已经完成了初始化。 -
CounterCell r = new CounterCell(x);
先创建一个CounterCell
对象,把x
保存进去。 -
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)
当前线程占用锁。 -
rs[j] = r
,把新构建的保存了元素个数x
的CounterCell
对象保存到rs[j]
的位置。
private final void fullAddCount(long x, boolean wasUncontended) {
//省略部分代码....
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { //Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//省略部分代码....
}
//省略部分代码....
}
}
第二种情况,就是直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
操作,对CounterCell
指定位置的元素进行累加。
private final void fullAddCount(long x, boolean wasUncontended) {
//省略部分代码....
for (;;) {
//由于指定下标位置的cell值不为空,则直接通过cas进行原子累加,如果成功,则直接退出
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//
break;
}
//省略部分代码....
}
第三个部分,CounterCell数组扩容
如果竞争比较激烈,也就是通过多次自旋(也就是线程无法满足上述的判断条件时),就会触发CounterCell的扩容。
-
cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)
抢占锁。 -
CounterCell[] rs = new CounterCell[n << 1];
在原有的基础上扩容一倍,再通过for
循环进行数据迁移。 -
counterCells = rs;
把扩容后的对象赋值给counterCells
。
private final void fullAddCount(long x, boolean wasUncontended) {
//省略代码....
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
//扩容一倍 2变成4
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;//恢复标识
}
collide = false;
continue;//继续下一次自旋
}
//省略部分代码....
}
size()方法结果汇总
基于前面的分析,基本上我们也能猜测出它的整体实现,无非就是把baseCount
和CounterCell
保存的value进行累加即可。接下来我们看一下size()方法的定义如下。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
主要看一下sumCount
这个方法,实现原理和我们猜想的一致。
- 先得到
baseCount
的值,保存到sum
这个字段中。 - 遍历
CounterCell
数组,把数组中每个CounterCell
中存储的value进行累加。
我们发现,
size()
方法在计算总的元素个数时,并没有加锁,所以size()
方法返回的元素个数不一定代表总的容量。
网友评论