LongAdder 是一个多线程高并发时使用的自增计数器,它的设计思想就是以空间换时间。相比较于 AtomicLong,它在高并发时更高效,因为 AtomicLong 自增时使用了 unsafe.getAndAddLong,源码
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}
可以看到 AtomicLong 在自增失败时会自旋,直到成功。这回导致高并发时,可能有非常多的无意义的自旋,LongAdder 很好的解决了这个问题,它使用 Cell 来帮助计数,它的 sum 是 base + 各个 Cell 中 value 的总和。
Cell 类
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
* 简易版的 AtomicLong
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
可以看到,它仅支持一个 cas 操作。
其实LongAdder 的实际操作依赖于它的父类 Striped64 的 longAccumulate 方法,Striped64 有四个子类 LongAdder, LongAccumulator ,DoubleAdder,DoubleAccumulator。操作基本相同,LongAccumulator ,DoubleAccumulator 构造方法提供一个二元操作符,可以自定义这个计算来实现累加。
回归正题我们继续看 LongAdder longAccumulate(在 Striped64 中) 方法,其他三各类大同小异。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty 此值可以看作是扩容意向,如果为 true 则下一次循环可能会进行扩容
for (;;) { //自旋,虽然也是自旋,但高并发时,肯定比 AtomicLong 少
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { //指定位置的 cell 未被占用,(n - 1) & h 是 h 对 n 取模的意思
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//再次判断指定的 cell 是否被其他线程修改,如果已修改,就自旋,否则修改成功
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail 这里 cell != null && wasUncontended == false 说明在这个 cell 上可能存在很大的竞争(并发很大),如果进入还在相同的cell上进行相同的操作,有可能还是失败,所以,这里重新计算hash,让累加操作转移到别的cell上
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)//如果走到这一步说明上一步的 cas 操作失败了,而且容量 >= CUP 数量 或 正在扩容 则重新计算hash,这里说明扩容不会超过 CPU 核数
collide = false; // At max size or stale
else if (!collide)
collide = true;//下一次可能会扩容
else if (cellsBusy == 0 && casCellsBusy()) {//cells 以2的幂扩容,一遍使用位于进行取模操作,casCellsBusy() 防止多线程同时扩容
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//cells 初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))//如果别的线程正在扩容,则对 base 进行 cas
break; // Fall back on using base
}
}
注意:扩容不会超过 CPU 核数
网友评论