今天来讲一讲原子操作类,JUC包提供了一系列的原子性操作类,这些操作类使用的是CAS非阻塞算法实现的,相比于锁,原子性的操作性能有更大的提升。各个原子操作类的实现原理都大同小异,今天就拿AtomicLong类进行讲解。除了讲解AtomicLong类之后还会讲解JDK8新增的原子操作类LongAdder.
AtomicLong
从AtomicLong类中的源码可以看出来,AtomicLong类提供的方法都是在围绕着两个成员变量,
private volatile long value;
private static final Unsafe unsafe = Unsafe.getUnsafe();
value,存储的就是AtomicLong类增减之后的值,由volatile关键字修饰避免的内存可见性问题。
unsafe,Unsafe类实例,提供CAS非阻塞算法方法。
下面对开发中常用到的AtomicLong类提供的方法来看看它的源码到底是怎么实现的。
incrementAndGet
//调用unsafe方法,原子性设置value值为原始值+1,并返回递增后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) +1L;
}
decrementAndGet
//调用unsafe方法,原子性设置value值为原始值-1,并返回递减后的值
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) -1L;
}
getAndAdd
//调用unsafe方法,原子性设置value值为原始值+delta。返回相加前的原始值
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
addAndGet
//调用unsafe方法,原子性设置value值为原始值+delta。返回相加后的值
public final long addAndGet(long delta) {return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
上述的四个方法都调用了unsafe.getAndAddLong方法,我们知道CAS算法是非阻塞的并且多线程并发下只能有一个线程设置成功,那么上述方法是怎么保证值设置成并返回的呢?让我们来看看getAndAddLong中的源码吧。
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 =this.getLongVolatile(var1, var2); //获得原始值
}while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); //使用CAS进行设置值,直到设置成功
return var6;
}
从上面的代码中可以看出getAndAddLong方法通过自旋的方式保证使用CAS设置值成功后才返回。前面锁类型文章中有提到,自旋在高并发的时候会造成性能的损耗,这也是AtomicLong类会带来的问题。
LongAdder
使用AtomicLong时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只能有一个线程通过CAS算法操作成功,而其他线程通过无限自旋不断尝CAS操作,这会白白消耗CPU资源。这个问题能有什么解决方案吗?当然有,因为AtomicLong只有一个共享变量value,所以会造成竞争压力变大,那么是不是可以尝试增加共享变量的方式来缓解竞争压力呢?JDK8中新增的原子操作类LongAdder类就是通过增加共享变量的方式解决AtomicLong类存在的竞争压力过大造成性能下降的问题。
下面用两张图来对比一下AtomicLong、LongAdder竞争共享变量的不同
AtomicLong LongAdderLongAdder类继承Striped64类,Striped64类中有三个重要的成员变量,
transient volatile long base;
transient volatile Cell[] cells;
transient volatile int cellsBusy;
base:基数值,并发较少时,所有累加操作都是对base变量操作的。
cells:共享变量数组,通过源码可以看到Cell类添加了@sun.misc.Contended注解,不知道读者是否还记得这个注解的作用?这个注解是为了解决伪共享而设计的,关于伪共享可以看这里。
cellsBusy:由于cells数组是动态的,cellBusy用来标识cells数组正在扩容或初始化,通过CAS算法对cellsBusy变量进行操作,保证只有一个线程可以进行cell数组的扩容或初始化。
下面看看LongAdder常用的方法:
sum/longValue调用的sum方法
//返回当前的总值
public long sum() {
Cell[] as =cells; Cell a;
long sum =base;
if (as !=null) {
for (int i =0; i < as.length; ++i) {
if ((a = as[i]) !=null)
sum += a.value;
}
}
return sum;
}
累加cells数组内部的value后再累加base值,由于累加过程并没有进行加锁,所以在累加过程有可能别的线程已修改了值,所有这个sum方法返回的并不是一个精准的总数。
reset
//重置值
public void reset() {
Cell[] as =cells; Cell a;
base =0L;
if (as !=null) {
for (int i =0; i < as.length; ++i) {
if ((a = as[i]) !=null)
a.value =0L;
}
}
}
重置base变量,如果cells数组不为空则重置cell中的value值。
add
//增加指定的值
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//判断cells数组是否为null,为null则在base变量上累加x
if ((as =cells) !=null || !casBase(b =base, b + x)) {
boolean uncontended =true;
if (as ==null || (m = as.length -1) <0 ||
//通过当前线程的threadLocalRandomProbe变量进行计算要访问cells数组中的哪一个cell。
(a = as[getProbe() & m]) ==null ||
!(uncontended = a.cas(v = a.value, v + x)))
//如果计算得到的cell为null(需要对cells数组初始化),或cell不为null但是通过CAS算法设置失败(已经有其他的线程操作成功,则需要对cells进行扩容或重新计算cell)下面这行代码就是对cells数组初始化或扩容的。
longAccumulate(x, null, uncontended);
}
}
longAccumulate
//对cells数组进行初始化或扩容操作。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//初始化当前线程的threadLocalRandomProbe变量
int h;
if ((h =getProbe()) ==0) {
ThreadLocalRandom.current(); // force initialization
h =getProbe();
wasUncontended =true;
}
boolean collide =false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as =cells) !=null && (n = as.length) >0) {
//根据当前线程的threadLocalRandomProbe和cells元素个数计算需要访问的cell,如果cell为null执行if中代码
if ((a = as[(n -1) & h]) ==null) {
if (cellsBusy ==0) {// Try to attach new Cell
//为计算等到为null的cell赋值一个新创建的cell
Cell r =new Cell(x); // Optimistically create
if (cellsBusy ==0 && casCellsBusy()) {
boolean created =false;
try {// Recheck under lock
Cell[] rs; int m, j;
if ((rs =cells) !=null &&
(m = rs.length) >0 &&
rs[j = (m -1) & h] ==null) {
rs[j] = r;
created =true;
}
}finally {
cellsBusy =0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide =false;
}
else if (!wasUncontended)// CAS already known to fail
wasUncontended =true; // Continue after rehash
//计算等到的cell存在,则执行CAS进行设置
else if (a.cas(v = a.value, ((fn ==null) ? v + x :
fn.applyAsLong(v, x))))
break;
//当前cells数组元素个数大于CPU个数
else if (n >=NCPU ||cells != as)
collide =false; // At max size or stale
//是否有冲突
else if (!collide)
collide =true;
//如果当前cells数组元素个数没有达到cpu个数,并且存在冲突则进行cells数组扩容
else if (cellsBusy ==0 && casCellsBusy()) {
try {
if (cells == as) {// Expand table unless stale
Cell[] rs =new Cell[n <<1];
for (int i =0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
}finally {
cellsBusy =0;
}
collide =false;
continue; // Retry with expanded table
}
//为了能够找到一个空闲的cell,重新计算hash值。
h =advanceProbe(h);
}
//初始化cells数组
else if (cellsBusy ==0 &&cells == as && casCellsBusy()) {
boolean init =false;
try {// Initialize table
if (cells == as) {
Cell[] rs =new Cell[2];
rs[h &1] =new Cell(x);
cells = rs;
init =true;
}
}finally {
cellsBusy =0;
}
if (init)
break;
}
else if (casBase(v =base, ((fn ==null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
代码中的注释不知道我标注的能不能让读者明白。下面用几个问题来加深一下多LongAdder原理。
问题1:当前线程如何知道需要访问cells数组中的哪个元素?
解答1:每个线程都有一个threadLocalRandomProbe变量,通过threadLocalRandomProbe与cells数组大小进行计算得到当前线程需要访问的cell。
问题2:如何初始化cells数组?
解答2:成功使用CAS算法修改cellsBusy变量为1之后进行初始化,初始化之后再讲cellsBusy变量设置为0。
问题3:cells数组扩容条件?
解答3:cells数组扩容的条件是当前cells数组的元素个数小于CPU个数,并且当前线程计算得到的cell已经有其他线程对其进行操作,则触发扩容。
问题4:线程访问分配的cell元素有冲突后如何处理?
解决方案4:如果当前cells元素个数小于CPU个数,冲突则进行扩容。如果当前cells元素个数不小于CPU个数,则重新计算当前线程的threadLocalRandomProbe变量,然后重新计算需要访问的cell,知道获得的cell没有冲突。
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。
网友评论