美文网首页
多线程并发编程4-原子操作类源码剖析

多线程并发编程4-原子操作类源码剖析

作者: Demo_zfs | 来源:发表于2020-03-03 00:40 被阅读0次

        今天来讲一讲原子操作类,JUC包提供了一系列的原子性操作类,这些操作类使用的是CAS非阻塞算法实现的,相比于锁,原子性的操作性能有更大的提升。各个原子操作类的实现原理都大同小异,今天就拿AtomicLong类进行讲解。除了讲解AtomicLong类之后还会讲解JDK8新增的原子操作类LongAdder.

    AtomicLong

        从AtomicLong类中的源码可以看出来,AtomicLong类提供的方法都是在围绕着两个成员变量,

    private volatile long value;

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    value,存储的就是AtomicLong类增减之后的值,由volatile关键字修饰避免的内存可见性问题。

    unsafe,Unsafe类实例,提供CAS非阻塞算法方法。

        下面对开发中常用到的AtomicLong类提供的方法来看看它的源码到底是怎么实现的。

    incrementAndGet

    //调用unsafe方法,原子性设置value值为原始值+1,并返回递增后的值

    public final long incrementAndGet() {

    return unsafe.getAndAddLong(this, valueOffset, 1L) +1L;

    }

    decrementAndGet

    //调用unsafe方法,原子性设置value值为原始值-1,并返回递减后的值

    public final long decrementAndGet() {

    return unsafe.getAndAddLong(this, valueOffset, -1L) -1L;

    }

    getAndAdd

    //调用unsafe方法,原子性设置value值为原始值+delta。返回相加前的原始值

    public final long getAndAdd(long delta) {

    return unsafe.getAndAddLong(this, valueOffset, delta);

    }

    addAndGet

    //调用unsafe方法,原子性设置value值为原始值+delta。返回相加后的值
    public final long addAndGet(long delta) {

    return unsafe.getAndAddLong(this, valueOffset, delta) + delta;

    }

        上述的四个方法都调用了unsafe.getAndAddLong方法,我们知道CAS算法是非阻塞的并且多线程并发下只能有一个线程设置成功,那么上述方法是怎么保证值设置成并返回的呢?让我们来看看getAndAddLong中的源码吧。

    public final long getAndAddLong(Object var1, long var2, long var4) {

    long var6;

        do {

    var6 =this.getLongVolatile(var1, var2);    //获得原始值

        }while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));    //使用CAS进行设置值,直到设置成功

        return var6;

    }

    从上面的代码中可以看出getAndAddLong方法通过自旋的方式保证使用CAS设置值成功后才返回。前面锁类型文章中有提到,自旋在高并发的时候会造成性能的损耗,这也是AtomicLong类会带来的问题。

    LongAdder

        使用AtomicLong时,在高并发下大量线程会同时去竞争更新同一个原子变量,但是由于同时只能有一个线程通过CAS算法操作成功,而其他线程通过无限自旋不断尝CAS操作,这会白白消耗CPU资源。这个问题能有什么解决方案吗?当然有,因为AtomicLong只有一个共享变量value,所以会造成竞争压力变大,那么是不是可以尝试增加共享变量的方式来缓解竞争压力呢?JDK8中新增的原子操作类LongAdder类就是通过增加共享变量的方式解决AtomicLong类存在的竞争压力过大造成性能下降的问题。

    下面用两张图来对比一下AtomicLong、LongAdder竞争共享变量的不同

    AtomicLong LongAdder

        LongAdder类继承Striped64类,Striped64类中有三个重要的成员变量,

    transient volatile long base;

    transient volatile Cell[] cells;

    transient volatile int cellsBusy;

        base:基数值,并发较少时,所有累加操作都是对base变量操作的。

        cells:共享变量数组,通过源码可以看到Cell类添加了@sun.misc.Contended注解,不知道读者是否还记得这个注解的作用?这个注解是为了解决伪共享而设计的,关于伪共享可以看这里

        cellsBusy:由于cells数组是动态的,cellBusy用来标识cells数组正在扩容或初始化,通过CAS算法对cellsBusy变量进行操作,保证只有一个线程可以进行cell数组的扩容或初始化。

        下面看看LongAdder常用的方法:

    sum/longValue调用的sum方法

    //返回当前的总值

    public long sum() {

    Cell[] as =cells; Cell a;

        long sum =base;

        if (as !=null) {

    for (int i =0; i < as.length; ++i) {

    if ((a = as[i]) !=null)

    sum += a.value;

            }

    }

    return sum;

    }

        累加cells数组内部的value后再累加base值,由于累加过程并没有进行加锁,所以在累加过程有可能别的线程已修改了值,所有这个sum方法返回的并不是一个精准的总数。

    reset

    //重置值

    public void reset() {

    Cell[] as =cells; Cell a;

        base =0L;

        if (as !=null) {

    for (int i =0; i < as.length; ++i) {

    if ((a = as[i]) !=null)

    a.value =0L;

            }

    }

    }

        重置base变量,如果cells数组不为空则重置cell中的value值。

    add

    //增加指定的值

    public void add(long x) {

    Cell[] as; long b, v; int m; Cell a;

    //判断cells数组是否为null,为null则在base变量上累加x

        if ((as =cells) !=null || !casBase(b =base, b + x)) {

    boolean uncontended =true;

            if (as ==null || (m = as.length -1) <0 ||

    //通过当前线程的threadLocalRandomProbe变量进行计算要访问cells数组中的哪一个cell。

    (a = as[getProbe() & m]) ==null ||

    !(uncontended = a.cas(v = a.value, v + x)))

    //如果计算得到的cell为null(需要对cells数组初始化),或cell不为null但是通过CAS算法设置失败(已经有其他的线程操作成功,则需要对cells进行扩容或重新计算cell)下面这行代码就是对cells数组初始化或扩容的。

    longAccumulate(x, null, uncontended);

        }

    }

    longAccumulate

    //对cells数组进行初始化或扩容操作。

    final void longAccumulate(long x, LongBinaryOperator fn,

                              boolean wasUncontended) {

    //初始化当前线程的threadLocalRandomProbe变量

    int h;

        if ((h =getProbe()) ==0) {

    ThreadLocalRandom.current(); // force initialization

            h =getProbe();

            wasUncontended =true;

        }

    boolean collide =false;                // True if last slot nonempty

        for (;;) {

    Cell[] as; Cell a; int n; long v;

            if ((as =cells) !=null && (n = as.length) >0) {

    //根据当前线程的threadLocalRandomProbe和cells元素个数计算需要访问的cell,如果cell为null执行if中代码

    if ((a = as[(n -1) & h]) ==null) {

    if (cellsBusy ==0) {// Try to attach new Cell

    //为计算等到为null的cell赋值一个新创建的cell

                        Cell r =new Cell(x);  // Optimistically create

                        if (cellsBusy ==0 && casCellsBusy()) {

    boolean created =false;

                            try {// Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs =cells) !=null &&

    (m = rs.length) >0 &&

    rs[j = (m -1) & h] ==null) {

    rs[j] = r;

                                    created =true;

                                }

    }finally {

    cellsBusy =0;

                            }

    if (created)

    break;

                            continue;          // Slot is now non-empty

                        }

    }

    collide =false;

                }

    else if (!wasUncontended)// CAS already known to fail

                    wasUncontended =true;      // Continue after rehash

    //计算等到的cell存在,则执行CAS进行设置

                else if (a.cas(v = a.value, ((fn ==null) ? v + x :

    fn.applyAsLong(v, x))))

    break;

    //当前cells数组元素个数大于CPU个数

                else if (n >=NCPU ||cells != as)

    collide =false;            // At max size or stale

    //是否有冲突

                else if (!collide)

    collide =true;

    //如果当前cells数组元素个数没有达到cpu个数,并且存在冲突则进行cells数组扩容

                else if (cellsBusy ==0 && casCellsBusy()) {

    try {

    if (cells == as) {// Expand table unless stale

                            Cell[] rs =new Cell[n <<1];

                            for (int i =0; i < n; ++i)

    rs[i] = as[i];

                            cells = rs;

                        }

    }finally {

    cellsBusy =0;

                    }

    collide =false;

                    continue;                  // Retry with expanded table

                }

    //为了能够找到一个空闲的cell,重新计算hash值。

    h =advanceProbe(h);

            }

    //初始化cells数组

    else if (cellsBusy ==0 &&cells == as && casCellsBusy()) {

    boolean init =false;

                try {// Initialize table

                    if (cells == as) {

    Cell[] rs =new Cell[2];

                        rs[h &1] =new Cell(x);

                        cells = rs;

                        init =true;

                    }

    }finally {

    cellsBusy =0;

                }

    if (init)

    break;

            }

    else if (casBase(v =base, ((fn ==null) ? v + x :

    fn.applyAsLong(v, x))))

    break;                          // Fall back on using base

        }

    }

        代码中的注释不知道我标注的能不能让读者明白。下面用几个问题来加深一下多LongAdder原理。

    问题1:当前线程如何知道需要访问cells数组中的哪个元素?

    解答1:每个线程都有一个threadLocalRandomProbe变量,通过threadLocalRandomProbe与cells数组大小进行计算得到当前线程需要访问的cell。

    问题2:如何初始化cells数组?

    解答2:成功使用CAS算法修改cellsBusy变量为1之后进行初始化,初始化之后再讲cellsBusy变量设置为0。

    问题3:cells数组扩容条件?

    解答3:cells数组扩容的条件是当前cells数组的元素个数小于CPU个数,并且当前线程计算得到的cell已经有其他线程对其进行操作,则触发扩容。

    问题4:线程访问分配的cell元素有冲突后如何处理?

    解决方案4:如果当前cells元素个数小于CPU个数,冲突则进行扩容。如果当前cells元素个数不小于CPU个数,则重新计算当前线程的threadLocalRandomProbe变量,然后重新计算需要访问的cell,知道获得的cell没有冲突。

     今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

    相关文章

      网友评论

          本文标题:多线程并发编程4-原子操作类源码剖析

          本文链接:https://www.haomeiwen.com/subject/pqbikhtx.html