[toc]
关于ConcurrentHashMap的size方法,有资料说size不能提供强的一致性,但是也有人说size是强一致性的。那么对于这个问题,我们从源码出发,来看看size的实现机制。最终看看能否得到正确的答案。
1.一致性定义
关于一致性的定义,大概如下:
一致性(Consistency)是指多副本(Replications)问题中的数据一致性。可以分为强一致性、顺序一致性与弱一致性。
1.1 强一致性(Strict Consistency)
强一致性也被可以被称做:
原子一致性(Atomic Consistency)
线性一致性(Linearizable Consistency)
要满足强一致性,必须符合以下两个要求:
- 任何一次读都能读到某个数据的最近一次写的数据。
- 系统中的所有进程,看到的操作顺序,都和全局时钟下的顺序一致。
上述定义用通俗的话来解释就是,假定对同一个数据集合,分别有两个线程A、B进行操作,假定A首先进行的修改操作,那么从时序上在A这个操作之后发生的所有B的操作都应该能看到A修改操作的结果。
1.2 弱一致性
数据更新之后,如果能容忍访问不到或者只能部分访问的情况,就是弱一致性。最终一致性是弱一致性的一个特例。
也就是说,对于数据集,分别有两个线程A、B进行操作,假定A首先进行了修改操作,那么可能从时许上滞后的B进行的读取操作在一段时间内还读取不到这个结果。读取的还是A操作之前的结果。这就是弱一致性。
最终一致性就是说,只要A、B的都不进行任何更新操作,一段时间之后,数据都能读取到最新的数据。
2.size方法源码
2.1 jdk1.8实现
2.1.1 size方法
我们来看看1.8版本中的ConcurrnetHashMap中size方法的源码:
/**
* {@inheritDoc}
*/
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
2.1.2 sumCount
实际上底层调用的是sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到,这个count,实际上是对CounterCell数组进行遍历,中间没有任何锁操作。
2.1.3 CounterCell
CounterCell源码如下:
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
这实际上就是一个volatile修饰的计数器。除了Contended这个注解之外,没有什么特别之处,在put、remove的时候,对这个计数器进行增减。
Contended这个注解我们在后面再来详细解释。
counterCells这个数组,实际上size和table一致,这样Counter中的value就是这个数组中index对应到table中bucket的长度。
在table扩容的时候,这个计数器数组也会扩容:
CounterCell[] rs = new CounterCell[n << 1];
2.1.4 addCount
那么在put和remove以及clear等方式对size数量有影响的方法中,都会调用addCount对size进行增减。
x为正数表示增加,负数表示减小。同时check如果大于0则需要对结果进行check,避免在并发过程中由于并发操作带来的计算不准确。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//判断是否为空
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
//a是计算出来的槽位
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
//cas方式
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//增加
fullAddCount(x, uncontended);
return;
}
//如果不需要检查就直接返回
if (check <= 1)
return;
s = sumCount();
}
//如果需要检查
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//遍历并重新计算
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
2.1.5 fullAddCount
这是执行增加的核心方法,其中大量使用了cas操作,另外还必须考虑到执行的并行性。
// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//死循环,cas方式修改
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
2.1.6 总结
通过对上述方法分析不难看出,size方法是弱一致性的,这是因为,如果有A线程正在进行put操作,之后触发了扩容或者红黑树转置,那么立即就会synchronized锁定root节点。之后开始进行对应的操作,这个操作是需要时间的。但是这个时候,如果线程B来调用size方法,那么size方法由于没有任何锁机制,肯定是能够返回的,此时返回的size就是put之前的值。那么这个结果就导致了弱一致性。即put在前的操作并不能马上让时许在其后面的操作得到结果,需要等一段时间。待synchronized执行完成。
ConcurrenthashMap的counter机制就是为了增加读取性能而设计的,如果为了强一致性,那么只能按HashTable的方式整个读取方法都加锁,那么这样肯定会影响性能的。
另外addCount,在增加操作的时候还会对数量进行检查。以避免并发操作带来的不一致性。
2.2 jdk1.7源码实现
由于1.7采用分段锁的机制,因此设计没有1.8复杂。
2.2.1 size方法源码
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
//死循环
for (;;) {
//遍历 逐步锁定段
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
//假定初始的modCount
sum = 0L;
size = 0;
overflow = false;
//计算bucket
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
//将modCount相加
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//如果modCount在这个计算过程中没有改变则说明size计算有效,否则会重置last之后重新计算
if (sum == last)
break;
last = sum;
}
} finally {
//将所有的lock进行unlock操作
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
这个方法的逻辑是,在一开始,遍历segment的时候,先锁定一个段,计算size,然后判断在这个过程中,modCount是否发生了改变,如果发生改变则说明计算结果会产生误差,则重新计算。直到modCount在计算前后相等,则说计算可行,之后再移动到下要给bucket。
可以看到这实际上是个低效的操作,只有在所有的bucket都计算完成之后,才会统一在finally中进行unlock。这样会导致全部的段都被锁定。
也就是说,1.7中的size方法,最开始是个乐观锁,最终会转换为悲观锁,这样实际上是个强一致性的方法。
2.3 说明
通过上述对1.7和1.8源码中对size方法的对比,在1.7中,size能做到强一致性,但是这样是有代价的,对分段锁的lock导致了整体性能的降低。而在1.8中,为了增加性能,而增加了一大段复杂的代码将size变成了弱一致性。但是好处是在put的过程中不会对size造成阻塞。
由此可见源码作者为了提升ConcurrentHashMap所做的各种努力。
这也是我们在编码过程中值得借鉴的地方。
至于@sun.misc.Contended,这是通过缓存行对齐来避免伪共享问题,这个将在后续单独介绍。
网友评论