JUC包下的原子类
JUC就是大名鼎鼎的java并发包,我们今天来看看基于非阻塞性算法的CAS封装的原子类.
JUC下有AtomicInteger
、AtomicLong
、AtomicBoolean
等类,
- 在多线程的环境下对count变量进行自增
public static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.execute(() -> System.out.println("现在对原子类进行自增:" + count.addAndGet(1)));
}
Thread.sleep(1000);
System.out.println("原子类的值:"+count.get());
}
这里我们通过代码验证了,多线程环境下,原子类是可以保证线程安全的。
AtomicLong
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 偏移量
private static final long valueOffset;
// JVM是否支持Long类型的CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();
static {
try {
// 获取value在AtomicLong中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 实际变量
private volatile long value;
public AtomicLong(long initialValue) {
value = initialValue;
}
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
这里有几个细节:
- value被
volatile
修饰,保证了内存可见性. - 所有的CAS操作依赖
Unsafe
类
Unsafe类
JDK中的Unsafe类提供了硬件级别的原子性操作,他提供的都是nat
ive方法,Java使用JNI的方式调用C++的本地方法。我们简单来看看Unsafe类的核心方法.
- sun.misc.Unsafe
// 获取对应属性的偏移量
public native long objectFieldOffset(Field var1);
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
// 比对对象 obj 中偏移量 offset 的变 的值是否与 expect 相等,相等使用update更新
// 然后返回 true ,否 返回 false
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// 唤醒调用park后阻塞的线程。
public native void unpark(Object var1);
// 阻塞当前线程
public native void park(boolean isAbsolute, long time);
更好性能的原子类-LongAdder
AtomicLong
通过CAS保证了原子性,只借助CPU即可完成一个轻量级的"锁",但是在高并发的场景下,还是会出现多个线程去争夺一个变量的控制权这种情况,这会造成线程直接频繁竞争,浪费CPU资源。
在一些分布式的项目中,为了满足"秒杀",我们通常会将仓库进行内部拆分,尽可能切分成小的颗粒度,然后通过一些策略去扣除缓存中的库存数,在统计时,再统一读取所有"分片"的数据即可。
LongAdder内部维护了多个cell变量,每个cell的初始值为0,当并发量上来的时候,线程会去抢夺空闲的cell的控制权,如果A竞争cellA失败,它会转向竞争cellB,如此一来,便减少了如AtomincLong
出现的单一变量遭到多线程频繁竞争的问题。
在做读操作的时候,LongAdder直接统计base+所有cell
的值返回即可.
- java.util.concurrent.atomic.LongAdder#add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- LongAdder并不会一开始就创建cell数组,惰性加载.
- 如果cell数组为空,那么先对base值进行累加操作.
伪共享问题
之前我们聊过,为了解决CPU和内存直接运行速度的差距,操作系统设计了多级缓存架构,其中CPU和主内存之间,还存在着高速缓冲层,也就是CPU Cache
.
在CPU Cache
内部是按行存储的,每一行称为Cache
行,它是主存和Cache直接交换数据的基本单位。
CPU需要访问变量值时,会从CPU Cache
查看是否有该变量,如果有直接读取,如果没有则从主内存读,然后回种到CPU Cache
,这里回种的时候就有一个问题,CPU Cache
是按行存储,变量的大小可能没有那么大,此时就会出现多个变量值存储在同一个Cache
行中,当发生多线程访问该变量时,只能有一个线程访问同一个缓存行,此时就会出现排队竞争的情况,也就是伪共享
.
如何解决伪共享问题
解决的思路是将变量值填充到跟cache行一样的大小,这样刚好一行只存放一个变量,那么线程之间竞争x
和y
的时候就会去访问不同的缓存行。
在JDK8中,可以使用@sun.misc.Contended
注解声明,让JDK为你完成这部分工作.
LongAdder如何解决伪共享
答案在代码里面:
- java.util.concurrent.atomic.Striped64.Cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
LongAdder源码解析
我们先来看看LongAdder的类图结构:
LongAdder结合上面的代码,我们可以知道,cell类其实是定义在Striped64
类中,Striped64
包含几个元素:
-
base
值,它是LongAdder的一个基础值. -
cell
数组,用来减少多线程竞争同一变量进行了切分.同时为了方便读取,使用了volatile
修饰value值,使其具备可见性. -
cellsBuy
,自旋锁开关.通常在数组进行创建、初始化、扩容的时候进行变化.
读取LongAdder的值-sum
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
代码比较简单,基本上就是累加sum
和cell
中所有的值,注意,这里的值只是读取那一刻的快照值,不是100%准确,因为没有做任何加锁措施。
累加操作-add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- 判断当前cells是否为空,如果不为空对base值进行累加
- 如果cells不为空或者casBase失败了,那么进入执行代码块.
- 对cell数组进行定位,也就是找到当前线程去访问哪一个cell.
我们继续来看看longAccumulate
.
- java.util.concurrent.atomic.Striped64#longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 当前cell数组元素大于CPU个数
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 是否有冲突
else if (!collide)
collide = true;
// 当前元素个数未达到CPU个数并且有冲突,扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 变量被volatile修饰,这里不用加锁
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
在定位的过程中,我们会遇到冲突的问题,我们回忆一下在HashMap的底层,是通过拉链法解决哈希冲突的问题。
在LongAdder中,在初始化、扩容的时候都会涉及到cellsBusy
这个标志位,如果当前cellsBusy
为1,说明当前有线程在进行扩容或者初始化工作,此时其他线程会等待。
我们在Initialize table
这行注释里面可以去看到cell数组初始化的过程.
初始化的过程可以理解为,多个线程CAS抢夺cellsBusy的控制权,抢到的线程进行初始化,其他线程进行等待.
扩容的过程有几个先决条件: cells中的元素个数小于当前机器CPU个数并且发生了CAS冲突(多个线程同时CAS竞争同一个cell),其中产生了CAS失败,那么就会进行扩容操作
.为什么要有这个条件,这是为了解决更合理的使用CPU核心数资源(思考一下伪共享的情景),每个CPU分配一个cell无疑是最佳的.扩容的过程是将数组增大一倍,然后将元素进行迁移.
以上,便是大致的流程。
网友评论