LongAdder其实是AtomicLong的升级版,AtomicLong在多线程下会导致较多的自旋重试,主要原因还是因为多线程同时cas同一个变量的时候失败次数较多,那LongAdder的出现就是为了解决AtomicLong在多线程坏境下的痛点。
首先分析源代码之前我们先抛出以下几个问题:
- LongAdder如何解决AtomicLong的痛点?
- LongAdder内部维护的cell数组在什么情况下初始化?
- LongAdder怎么确定当前线程访问内部cell数组的那个元素?
- LongAdder如何扩容,在什么情况下扩容?
- LongAdder访问cell冲突后怎么解决?
先来看下LongAdder类的继承关系

LongAdder继承自Striped64,Striped64内部维护的三个变量cells、base、cellsBusy,base其实是基础值,cells是一个Cell数组,LongAdder的值其实就等于base加上cells数组内的value值。cellsBusy是cells初始化,扩容以及创建新的Cell的标志位。
接下来我们分析源代码,LongAdder的源码我们主要看add方法就行。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) { //(1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null
|| !(uncontended = a.cas(v = a.value, v + x))) //(2)
longAccumulate(x, null, uncontended); //(3)
}
}
代码1:先看cells数组是否为空,如果为空进行cas原子操作,将新增的值赋值给原变量,如果cells数组不为空或者是cas失败进行下一步。
问题1:AtomicLong的痛点就是多线程同时cas一个变量时,导致失败后自旋重试次数特别多,所以LongAdder内部特意扩充了一个cell数组去减少线程之间变量的竞争关系,从而提高了效率。
代码2:别看条件比较多,其实总结起来就一句话,查找当前线程所属的cell元素,如果有进行cas值替换,如果cell为空且替换失败的话进行下一步,这里注意下getProbe方法,该方法是获取当前线程下的threadLocalRandomProbe的值,这个值一开始为0,初始化的逻辑在代码3中。
1、2失败都会进入3,3代码业务相对复杂我们单独拧出来分析
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); //这里进行ThreadLocalRandom初始化
h = getProbe();//获取当前线程下的threadLocalRandomProbe值
wasUncontended = true;//该值为无竞争标志位置位,如果存在竞争该值则为false,无竞争则为true
}
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { (3) //当前线程访问的cell元素为空进行cell初始化
if (cellsBusy == 0) {//判断标志位,如果为零表示可以进行初始化操作
Cell r = new Cell(x); //新建一个Cell类
if (cellsBusy == 0 && casCellsBusy()) { //再次判断并且对cellsBusy进行cas值替换
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;//清除初始化标志位
}
if (created)//如果创建Cell并且赋值成功直接结束本次add操作
break;
continue;
}
}
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)//
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {(4)
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];//进行cell数组扩容,每次都扩充为原来的二倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];//将老数据转移至新cell数组中
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h); (5) //重新计算当前线程的threadLocalRandomProbe值,减少冲突的机会。
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { ----(2)
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];//初始化cell数组长度为2
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
}
}
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
上述代码逻辑比较复杂,这里我们带着剩下的问题去找答案会更好一些,下面我们逐个分析。
问题2:LongAdder在什么情况下会进行数组初始化呢?当在对base值进行cas失败后,且cell数组为空的时候就会进行cell数组初始化操作,在源码中我标注了下,在代码2中对cell数组进行初始化,并且初始化的数组大小长度为2,并且新创建一个cell类并赋值在h & 1位置上。
问题3:在代码3中可以看到“getProbe()&n-1”与运算逻辑,由此我们知道是由线程的threadLocalRandomProbe值与数组最大下标进行与运算得出的,这个逻辑有点类似hashMap寻找散列桶下标的逻辑。
问题4:代码4逻辑里面可以看到对cell数组进行了扩容操作,那么进入代码4的条件结合前面if判断可以得出(当前线程所访问的cell元素不为空 & 对当前访问的cell类中的value值cas替换失败 & 数组长度不大于cpu的个数
),每次扩容都将数组扩充为原来的两倍,并且将老数据循环赋值至新数组中。
问题5:在代码5中我们可以看到对线程的threadLocalRandomProbe进行了重新计算,采用的是xorshift随机算法,以此来减小下次访问冲突的机会。
这里我给一个我对AtomicLong和LongAdder性能对比的小测试
@State(Scope.Benchmark)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(1)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
public class TestMain {
private LongAdder longAdder = new LongAdder();
private AtomicLong atomicLong = new AtomicLong();
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder().include(TestMain.class.getName()).build();
new Runner(options).run();
}
@Benchmark
@Threads(8)
public void runLongAdder() {
for (int i = 0; i < 1000; i++) {
longAdder.add(i);
}
}
@Benchmark
@Threads(8)
public void runAtomicLong() {
for (int i = 0; i < 1000; i++) {
atomicLong.addAndGet(i);
}
}
}
运行结果:

最后给一个问题:既然有了高效率的LongAdder那AtomicLong是不是可以废弃?
参考文献:
网友评论