JDK1.8时,java.util.concurrent.atomic包中提供了一个新的原子类:LongAdder。提供了原子累计值的方法。
根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间:
- 在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。
- 这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。
在大数据处理过程,为了方便监控,需要统计数据,少不了原子计数器。为了尽量优化性能,需要采用高效的原子计数器。
在jdk8中,引入了LongAddr,非常适合多线程原子计数器。与AtomicLong做了一个测试,LongAdder在多线程环境中,原子自增长性能要好很多。它常用于状态采集、统计等场景。
-
AtomicLong
image.png
-
LongAdder
image.png
package com.conrrentcy.atomic;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class LongAdderVSAtomicLongTest {
public static void main(String[] args){
testAtomicLongVSLongAdder(1, 10000000);
testAtomicLongVSLongAdder(10, 10000000);
testAtomicLongVSLongAdder(20, 10000000);
testAtomicLongVSLongAdder(40, 10000000);
testAtomicLongVSLongAdder(80, 10000000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times){
try {
System.out.println("threadCount:" + threadCount + ", times:" + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList<>();
for (int i=0;i<threadCount;i++){
list.add(new Thread(() -> {
for (int j = 0; j<times; j++){
atomicLong.incrementAndGet();
}
}));
}
for (Thread thread : list){
thread.start();
}
for (Thread thread : list){
thread.join();
}
}
static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList<>();
for (int i=0;i<threadCount;i++){
list.add(new Thread(() -> {
for (int j = 0; j<times; j++){
longAdder.add(1);
}
}));
}
for (Thread thread : list){
thread.start();
}
for (Thread thread : list){
thread.join();
}
}
}
LongAdder原理
LongAdder的原理是,在最初无竞争时,只更新base的值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的LongAdder存储的值。

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,
这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。
LongAdder继承自Striped64抽象类,Striped64中定义了Cell内部类和各重要属性。
唯一会制约AtomicLong高效的原因是高并发,高并发意味着CAS的失败几率更高, 重试次数更多,越多线程重试,CAS失败几率又越高,变成恶性循环,AtomicLong效率降低。
那怎么解决? LongAdder给了一个非常容易想到的解决方案:减少并发,将单一value的更新压力分担到多个value中去,降低单个value的 “热度”,分段更新。
这样,线程数再多也会分担到多个value上去更新,只需要增加value就可以降低 value的 “热度” ,AtomicLong中的 恶性循环就解决了。
cells 就是这个 “段” cell中的value 就是存放更新值的, 这样,当需要总数时,把cells 中的value都累加一下就可以了。
Code 解析
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@jdk.internal.vm.annotation.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return VALUE.compareAndSet(this, cmp, val);
}
final void reset() {
VALUE.setVolatile(this, 0L);
}
final void reset(long identity) {
VALUE.setVolatile(this, identity);
}
final long getAndSet(long val) {
return (long)VALUE.getAndSet(this, val);
}
// VarHandle mechanics
private static final VarHandle VALUE;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(Cell.class, "value", long.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
@sun.misc.Contended
缓存模型

CPU和主内存之间有好几层缓存,因为与cpu的速度相比,访问主内存的速度是非常慢的。如果频繁对同一个数据做运算,每次都从内存中加载,运算完之后再写回到主内存中,将会严重拖累cpu的计算资源。因此,为了充分发挥CPU的计算性能和吞吐量,平衡CPU和主内存之间的速度差距,现代CPU引入了一级缓存、二级缓存和三级缓存,结构如下图所示:

越靠近CPU的缓存存储速度越快,但是容量也越小。所以一级缓存(L1 Cache)最小最快,并且紧靠着在使用它的CPU内核。L2大一些,也慢一些,并且仍然只能被一个单独的 CPU 核使用。L3在现代多核机器中更普遍,仍然更大,更慢,并且被单个插槽上的所有 CPU 核共享。最后,你拥有一块主存,由全部插槽上的所有 CPU 核共享。
当CPU运算过程中需要加载数据时,首先去L1去寻找需要的数据,如果没有则去L2寻找,接着从L3中寻找,如果都没有,则从内存中读取数据。所以,如果某些数据需要经常被访问,那么这些数据存放在L1中的效率会最高。
而一般缓存未命中消耗的时钟周期及时间数据如下:

缓存行Cache Line
缓存是由缓存行组成的。一个缓存行存储字节的数量为2的倍数,在不同的机器上,缓存行大小为32字节到256字节不等,目前通常为64字节。
缓存行是按块加载到缓存中,一次性从内存中加载或者写入64字节的内容。假如一个Java对象有8个long类型的成员变量,那么一个缓存行最多可以一次性加载这8个long类型的变量到内存中。那么cpu在运算中,如果需要这8个变量,就可以直接从缓存中读取数据,而不需要从主内存中加载。
缓存一致性协议
由于一级缓存和二级缓存是被CPU核单独拥有的,当CPU核修改缓存中的内容时,缓存需要写回主内存,然后通知其他CPU核中对该缓存块进行失效处理。例如Core1和Core2并发读写同一个对象,该对象所属的内存地址块,会被同时缓存到一级缓存中。当Core1修改这个对象的变量时,改动会写回主内存,并且会让Core2缓存的该对象的缓存行失效。
伪共享(False Sharing)
伪共享,翻译为错误的共享。下面的图说明了伪共享的问题:

- 假设Core1更新了X值,那么缓存中的缓存行和内存中的值都会被改变,并且Core2的缓存行也都会失效,因为它缓存的X不是最新值了。那么Core2仅仅只是想读X值,却需要重新从主内存去加载,从而被缓存未命中拖慢了读取速度。
- 假设在核心1上运行的线程想更新变量X,同时核心2上的线程想要更新变量Y, 而这两个变量在同一个缓存行中。每个线程都要去竞争缓存行的所有权来更新变量。如果Core1获得了所有权,改变了X值,那么Core2中对应的缓存行失效。而Core2去读取Y时,发现缓存未命中,需要重新读取缓存行。然后当Core2去更新Y值时,也会使Core1的缓存行失效,从而使Core1引发一次缓存未命中。这会来来回回的经过L3缓存,大大影响了性能。如果互相竞争的核心位于不同的插槽,就要额外横跨插槽连接,问题可能更加严重。
伪共享解决办法
缓存行填充
disrupter框架通过增加字段,补全缓存行来确保ring buffer的序列号不会和其他东西同时存在于一个缓存行中,从而解决伪共享的问题:
private long p1, p2, p3, p4, p5, p6, p7;
private final long indexMask = 0;
private long p8, p9, p10, p11, p12, p13, p14;
@sun.misc.Contended注解
在Java 8中,提供了@sun.misc.Contended注解来避免伪共享,原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。
/** The current seed for a ThreadLocalRandom */
@sun.misc.Contended("tlr")
long threadLocalRandomSeed;
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
/** Secondary seed isolated from public ThreadLocalRandom sequence */
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed;
例如,Thread中有三个变量threadLocalRandomSeed,threadLocalRandomProbe,threadLocalRandomSecondarySeed就是通过注解的方法来操作缓存行的。不同的是,这里是让三个变量处于同一个缓存行,从而达到任意使一个变量改变,其他两个值也必须重新加载的目的。
总结
1.缓存时为了平衡CPU和内存之间的速度差异,cpu核独自拥有L1、L2缓存,公用L3缓存,且越靠近CPU的缓存存储速度越快、容量越小。
2.缓存中的内容是按块一次性从主内存中加载到缓存行。
3.L1、L2缓存中的缓存行修改时,会写回到L3和主内存,并且使其他核上的还缓存行失效。
4.多个CPU核一写一读,或者同时写同一块缓存行的时候,会导致伪共享问题。
5.假如存在多个线程同时修改一个类的不同字段的场景,必须考虑使用缓存行填充或者@sun.misc.Contended注解防止伪共享。
Cell数组

increase 流程
Increase() -> add(1L)
public void add(long x) {
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

if ((as = cells) != null || !casBase(b = base, b + x)) {
如果cells不为空,说明有竞争, 如果直接更新失败,说明有竞争,进入到多核逻辑.
对于上面的,有兴趣的可以看看是怎么找到指定的Cell的,在上面的a = as[getProbe() & m]中,其中m=数组的长度-1,其实这里也是一个取余的运算,而getProbe()这个方法是用于获取当前线程的threadLocalRandomProb(当前本地线程探测值,初始值为0),其实也就是一个随机数啊,然后对数组的长度取余得到的就是对应的数组的索引,首次调用这个方法是数组的第一个元素,如果数组的第一个元素为null,那么就说明没有找到对应的Cell;
对于取余运算,举个简单的例子吧,对于2的n次方取余,比如随机数9要对4进行取余,我们可以9&(4-1)=9&3=1001&0011=1,利用位运算取余了解一下;
Probe是什么
ConcurrentHashMap 在累加键值对个数的 addCount 函数中,使用 ThreadLocalRandom.getProbe() 得到线程的探针哈希值。
在这里,这个探针哈希值的作用是哈希线程,将线程和数组中的不用元素对应起来,尽量避免线程争用同一数组元素。探针哈希值和 map 里使用的哈希值的区别是,当线程发生数组元素争用后,可以改变线程的探针哈希值,让线程去使用另一个数组元素,而 map 中 key 对象的哈希值,由于有定位 value 的需求,所以它是一定不能变的。
那么这个探针哈希值是在哪计算的呢?带着这个问题我们继续往下看。
ThreadLocalRandom.getProbe() 方法如下:
/**
* Returns the probe value for the current thread without forcing
* initialization. Note that invoking ThreadLocalRandom.current()
* can be used to force initialization on zero return.
*/
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
...
private static final long PROBE;
...
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
...
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
...
} catch (Exception e) {
throw new Error(e);
}
}
可以看到 PROBE 表示的是 Thread 类 threadLocalRandomProbe 字段的偏移量。所以 getProbe 方法的功能就是简单的返回当前线程 threadLocalRandomProbe 字段的值。
接着去 Thread 类看看这个 threadLocalRandomProbe 字段,
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
Thread 类仅仅是定义了这个字段,并没有将其初始化,其初始化工作由 ThreadLocalRandom 类来做。
ThreadLocalRandom 类的 localInit 方法完成初始化工作,
/**
* Initialize Thread fields for the current thread. Called only
* when Thread.threadLocalRandomProbe is zero, indicating that a
* thread local seed value needs to be generated. Note that even
* though the initialization is purely thread-local, we need to
* rely on (static) atomic generators to initialize the values.
*/
static final void localInit() {
// probeGenerator 是一个 AtomicInteger 类型
// PROBE_INCREMENT 是一个静态常量,值为 0x9e3779b9
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread(); // 获取当前线程
// 通过 Unsafe 对象初始化当前线程的 threadLocalRandomSeed 字段
UNSAFE.putLong(t, SEED, seed);
// 通过 Unsafe 对象初始化当前线程的 threadLocalRandomProbe 字段
UNSAFE.putInt(t, PROBE, probe);
}
SEED 和 PROBE 类似,它表示的是 Thread 类 threadLocalRandomSeed 字段的偏移量。
在 ThreadLocalRandom 类的这个 localInit 方法里,同时初始化了当前线程的 threadLocalRandomSeed 字段和 threadLocalRandomProbe 字段。
所以在 Thread 类 threadLocalRandomProbe 字段上的注释中说:nonzero if threadLocalRandomSeed initialized。就是说如果 threadLocalRandomSeed 字段被初始化了,threadLocalRandomProbe 字段就非零。因为它俩是同时被初始化的。
除此之外,也可以通过 ThreadLocalRandom 类的 advanceProbe 方法更改当前线程 threadLocalRandomProbe 的值。
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
ConcurrentHashMap 里的 fullAddCount 方法会调用 ThreadLocalRandom.localInit() 初始化当前线程的探针哈希值;当发生线程争用后,也会调用 ThreadLocalRandom.advanceProbe(h) 更改当前线程的探针哈希值
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
...
h = ThreadLocalRandom.advanceProbe(h);
...
}
longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
if ((as = cells) != null && (n = as.length) > 0) {
// 不是第一次进入
` if ((a = as[(n - 1) & h]) == null) {`

//还没分配cell
if (cellsBusy == 0 && casCellsBusy()) {
//获得锁
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//再次检查
}
}
}

else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
h = advanceProbe(h); // 总是不成功,改变探针,让线程落入另外一个cell
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// cells 不存在 或者未加锁 或者 未新建 ->第一次进入

}else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
//再尝试一下累加
}
最终拿到多线程,用同一个LongAdder对象进行累加的结果。非原子操作,但是保证最终正确。
public long sum() {
Cell[] cs = cells;
long sum = base;
if (cs != null) {
for (Cell c : cs)
if (c != null)
sum += c.value;
}
return sum;
}
网友评论