美文网首页程序员数据结构和算法分析
Java并发-atomic原子类包源码剖析

Java并发-atomic原子类包源码剖析

作者: 宛丘之上兮 | 来源:发表于2018-11-28 14:31 被阅读15次

java.util.concurrent.atomic是jdk1.5新增的,这个包下主要提供了一些原子类,这些类基本特性是线程安全的,保证数据的非阻塞同步(比jdk1.5之前的synchronized阻塞同步更高效),这样就避免了阻塞同步中线程阻塞和唤醒带来的性能问题。

下面来举个例子说明下原子类的非阻塞同步,代码如下:

public class Test { 
    public static void main(String args[]) {
        int threadCnt = 20;
        Thread[] threads = new Thread[threadCnt];
        CountDownLatch cdt = new CountDownLatch(threadCnt);
        for (int i = 0; i < threadCnt; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 1000000; j++) {
                        increase();
                    }
                    cdt.countDown();
                }
            });
            threads[i].start();
        }
        for (Thread i : threads) {
            try {
                i.join();//保证前面的所有的线程执行玩再接着执行主线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

//        while (Thread.activeCount() > 1) {
//            Thread.yield();
//        }
//        try {
//            cdt.await();//保证前面的所有的线程执行玩再接着执行主线程
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println(count + "   " + count2.get());
    }

    public static int count = 0;
    public static AtomicInteger count2 = new AtomicInteger(0);

    public static void increase() {
        count++;
        count2.incrementAndGet();
    }

上面的代码运行了20个线程,每个线程对count变量和count2变量进行100W次自增操作,如果上面这段代码能够正常并发的话,最后的结果应该都是2000W(20 * 100,0000=2000,0000)才对,但实际结果却发现每次运行count的结果都不相同,都是一个小于2000W的数字,而count2永远都是2000W。

要是换成volatile修饰count变量呢?volatile关键字很重要的两个特性:

  1. 保证变量在线程间可见,对volatile变量所有的写操作都能立即反应到其他线程中,换句话说,volatile变量在各个线程中是一致的(得益于java内存模型—"先行发生原则");
  2. 禁止指令的重排序优化;

但是volatile并没有我所期待的第三个特性:volatile修饰的变量的运算在并发下是安全的。用volatile修饰count变量,重新测试上面的代码,count每次都是输出小于20000的数字,因为核心点在于java里的运算(比如自增)并不是原子性的(比如 --i、++i这两个操作,其中包含有3个操作步骤:第一步,读取i;第二步,加1或减1;第三步:写回内存)。

那么原子变量是怎样保证数据同步的呢?你可以这样理解:当有多个线程同时操作这些原子类的实例时,这些原子实例对于线程X其实并不具有synchronized的排他性,反而具有排己性。我线程X访问原子实例怎么会排斥自己呢?是不是觉得很难理解呢?其实排己就是为了数据的同步,而且比排他性的效率更高,因为排己是通过一个while自旋循环实现的,而且这个while循环耗时一般比较短。当线程X要写会内存的时候,先比较内存地址的旧值A(这个值是volatile的即线程间可见)是否改变了,如果变了(比如另外一个线程Y修改了内存地址对应的变量的值,线程X是可见的),那就就进行while自旋,until直到预期的旧值A是对的,JVM会为while自旋这样的操作会付出一定的代价,但是这样的代价相对于synchronized的线程阻塞和唤起,可以说是比较高效了。

原子类的非阻塞同步实际上是靠硬件的相关指令来实现的,或者说只是在硬件级别上阻塞了,可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作——引文

在并发环境下,某个线程对共享原子变量先进行操作,如果没有其他线程争用共享数据那操作就成功;如果存在数据的争用冲突,那就才去补偿措施,比如不断的重试机制,直到成功为止,因为这种乐观的并发策略不需要把线程挂起,操作和冲突检测具备原子性。在硬件指令集的发展驱动下,使得 "操作和冲突检测" 这种看起来需要多次操作的行为只需要一条处理器指令便可以完成,这些指令中就包括非常著名的CAS指令(Compare-And-Swap比较并交换)。《深入理解Java虚拟机第二版.周志明》第十三章中这样描述关于CAS机制:

CAS指令需要三个操作数,分别是内存位置(Java中可以简单理解为变量的内存地址,用V表示)、旧的预期值(A)和新值(B)。CAS指令执行时,当且仅当V符合旧的预期值A时,处理器用新值B更新V的值,否则它就不执行更新,但是无论是否更新了V的值,都会返回V的旧值,上述的处理过程就是一个原子操作。
在JDK1.5之后,Java程序中才可以使用CAS操作,该操作由sun.misc.Unsafe类里面的compareAndSwapInt()compareAndSwapLong()等几个方法包装提供,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令,没有方法调用的过程,或者可以认为是无条件内联进去了。
由于sun.misc.Unsafe类不是提供给用户程序调用的类(Unsafe.getUnsafe()的代码中限制了只有启动类加载器(BootStrap ClassLoader)加载的Class才能访问它),因此,如果不采用反射手段,我们只能通过其它的Java API来间接使用它,如J.U.C包里的整数原子类,其中的compareAndSet()getAndIncrement()等方法都使用了Unsafe类的CAS操作。

我们来看一下AtomicInteger的incrementAndGet的源码:

    private static final long valueOffset;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    static {
        try {
            valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

可以看到变量value是用volatile修饰的,那么这个变量在线程之间是透明的,这个方法其实就是调用Unsafe类的方法,我们再进去看下Unsafe的getAndAddInt()方法的源码:

    public final int getAndAddInt(Object o, long valueOffset, int modify) {
        int expect;
        do {
            expect = this.getIntVolatile(o, valueOffset);
        } while(!this.compareAndSwapInt(o, valueOffset, expect, expect + modify));
        return expect;
    }

可以看到是一个while循环,非阻塞同步的诀窍就在于这个自旋式while循环,循环自旋不断尝试将一个比当前值大modify的新值赋给自己,如果失败则说明在执行"获取-设置"操作的时已经被其它线程修改过了,于是便再次进入循环下一次操作,直到成功为止。原子的将变量设定为新数据,同时返回先前的旧数据。

java.util.concurrent.atomic中的类可以分成4组:

  • 标量类(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
  • 数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
  • 更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
  • 复合变量类:AtomicMarkableReference,AtomicStampedReference

标量类

第一组AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。

他们的实现都是依靠 真正的值为volatile 类型,通过Unsafe 包中的原子操作实现。最基础就是CAS,他是一切的基础。如下 。其中valueOffset是 在内存中 value相对于基地址的偏移量。(它的获得也由Unsafe 本地代码获得)。

核心代码如下,其他都是在compareAndSet基础上构建的。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger() {
    }

    public final int get() {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final void lazySet(int newValue) {
        unsafe.putOrderedInt(this, valueOffset, newValue);
    }

    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final boolean weakCompareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }
}

void set()和void lazySet():set设置为给定值,直接修改原始值;lazySet延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。

数组类

第二组AtomicIntegerArray,AtomicLongArray还有AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。

他们内部并不是像AtomicInteger一样维持一个valatile变量,而是全部由native方法实现,如下
AtomicIntegerArray的实现片断:

public class AtomicIntegerArray implements java.io.Serializable {
    private static final long serialVersionUID = 2862133569453604235L;

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final int base = unsafe.arrayBaseOffset(int[].class);
    private static final int shift;
    private final int[] array;

    static {
        int scale = unsafe.arrayIndexScale(int[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }

    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);
        return byteOffset(i);
    }

    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }

    public AtomicIntegerArray(int length) {
        array = new int[length];
    }

    public AtomicIntegerArray(int[] array) {
        // Visibility guaranteed by final field guarantees
        this.array = array.clone();
    }

    public final int length() {
        return array.length;
    }

    public final int get(int i) {
        return getRaw(checkedByteOffset(i));
    }

    private int getRaw(long offset) {
        return unsafe.getIntVolatile(array, offset);
    }

    public final void set(int i, int newValue) {
        unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
    }

    public final void lazySet(int i, int newValue) {
        unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
    }

    public final int getAndSet(int i, int newValue) {
        return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
    }

    public final boolean compareAndSet(int i, int expect, int update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

    private boolean compareAndSetRaw(long offset, int expect, int update) {
        return unsafe.compareAndSwapInt(array, offset, expect, update);
    }

    public final boolean weakCompareAndSet(int i, int expect, int update) {
        return compareAndSet(i, expect, update);
    }

    public final int getAndIncrement(int i) {
        return getAndAdd(i, 1);
    }

    public final int getAndDecrement(int i) {
        return getAndAdd(i, -1);
    }

    public final int getAndAdd(int i, int delta) {
        return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
    }

    public final int incrementAndGet(int i) {
        return getAndAdd(i, 1) + 1;
    }

    public final int decrementAndGet(int i) {
        return getAndAdd(i, -1) - 1;
    }

    public final int addAndGet(int i, int delta) {
        return getAndAdd(i, delta) + delta;
    }

    public final int getAndUpdate(int i, IntUnaryOperator updateFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSetRaw(offset, prev, next));
        return prev;
    }

    public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSetRaw(offset, prev, next));
        return next;
    }

    public final int getAndAccumulate(int i, int x,
                                      IntBinaryOperator accumulatorFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSetRaw(offset, prev, next));
        return prev;
    }

    public final int accumulateAndGet(int i, int x,
                                      IntBinaryOperator accumulatorFunction) {
        long offset = checkedByteOffset(i);
        int prev, next;
        do {
            prev = getRaw(offset);
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSetRaw(offset, prev, next));
        return next;
    }
}

可以看到,其实核心操作还是Unsafe类进行处理的。这个类提供两个构造器,一个是通过数组长度构造,第二个是通过原有数组构造,需要注意的是第二个构造器是调用clone()方法将原有数组拷贝一份,所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响传入的数组,大家都知道clone()是浅拷贝,所以如果数组内容是对象的话会有影响。

array使用的是final修饰,变成了常量数组,内存中的偏移地址不可变,这点很重要,但是该内存地址所指向的那个对象还是可以变的,这个array数组就保存到了方法区,同样的可以保证多线程访问时的可见性,避免使用volatile也减少了开销。

需要注意的是static语句,arrayBaseOffset这个native方法是获取数组首个元素的首地址偏移赋值给base,arrayIndexScale这个native方法可以用来获取数组元素的增量地址(即每个元素的字节大小)赋值给scale,比如元素是int类型,那么scale就是4,下标为i的元素的内存地址valueOffset_i就是valueOffset_i=i* scale + base = i*4+base公式1。数组的偏移地址和每个元素的偏移地址对于Unsafe的CAS操作十分重要,没有内存的偏移地址Unsafe就无法进行CAS操作。这应该很好理解的吧?这只是简单的解释,如果这个解释你还不理解的话,那么后面的详细的解释就更难懂了。

详细解释的话,那就先来研究这两个native方法:

public int arrayBaseOffset(Class clazz) {
    Class<?> component = clazz.getComponentType();
    if (component == null) {
        throw new IllegalArgumentException("Valid for array classes only: " + clazz);
    }
    // TODO: make the following not specific to the object model.
    int offset = 12;
    if (component == long.class || component == double.class) {
        offset += 4;  // 4 bytes of padding.
    }
    return offset;
}

public int arrayIndexScale(Class clazz) {
    Class<?> component = clazz.getComponentType();
    if (component == null) {
        throw new IllegalArgumentException("Valid for array classes only: " + clazz);
    }
    // TODO: make the following not specific to the object model.
    if (!component.isPrimitive()) {
        return 4;
    } else  if (component == long.class || component == double.class) {
        return 8;
    } else if (component == int.class || component == float.class) {
        return 4;
    } else if (component == char.class || component == short.class) {
        return 2;
    } else {
        // component == byte.class || component == boolean.class.
        return 1;
    }
}

arrayBaseOffset就是计算数组对象在内存中从数组的地址到首个元素的地址的偏移量offset,为什么offset会先加12呢,这里涉及到的是java内存中对象存储的知识。每个类对象在内存中存储时除了数据内容,其实还要包含一个头部信息的,对象头包含Mark Word指向类的指针两部分,Mark Word在32位JVM中的长度是32bit,在64位JVM中长度是64bit,指向类的指针在32位JVM中的长度是32bit,在64位JVM中长度是64bit,如果这个类是数组类型的话,还需要4字节(该数据在32位和64位JVM中长度都是32bit)来存储数组的大小,所以这里是12字节。接下来又涉及到了字节对齐,在jvm中,是要以8字节为单位进行对齐的,这里的头部12字节肯定是无法对齐了,但是如果是long,double等8字节的类型,就是在开始存时就进行对齐操作,这样就能保证接下来的每一个元素都是8的倍数,而如果是其他的对象比如int 4字节,就在数组末尾进行对齐,这样就能缺多少补多少。

而arrayIndexScale实际上是能获取数组中每个元素在内存中的大小赋值给scale,有点像cpp里sizeof。

然后接着分析static语句,if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two");这行代码意思是说如果scale不是2的次幂的话那么就抛出异常,大家都知道Integer的字节长度是4,long和double都是8字节,这些字节长度都是2的shift次幂,下面一行代码shift = 31 - Integer.numberOfLeadingZeros(scale);是计算指数shift的的过程并赋值给shift,其实这行代码可以使用一个数学公式解释更好理解一些2^{shift}=scale公式2。Integer类的方法numberOfLeadingZeros(int)是什么意思呢?其实按照字面leading的翻译是领头的意思,即领头的0的个数,进一步解释是从最高位开始直到遇到1的连续的0的个数,相对应的Integer还有一个numberOfTrailingZeros(int)方法表示尾随的0的个数即从最低位开始直到遇到1的连续的0的个数。其实计算shift的过程主要是为了优化公式1,我们看一下源码中计算valueOffset_i的过程:

    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);
        return byteOffset(i);
    }

    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }

由于公式2的成立,我们可以把公式1转换成公式3:valueOffset_i=i << shift+base公式3

下面我们仔细分析下Integer类的方法numberOfLeadingZeros(int)的原理,其实是应用了典型的二分查找,先把32位整形分为高16位和低16位查找非零数,在对高16位进行或低16位进行二分

    public static int numberOfLeadingZeros(int i) {
        if (i == 0)                                                                       
                return 32;
        int n = 1;
        // 下面的代码就是定位从左边开始第一个非零值的位置,在定位过程中顺便累加从左边开始0的个数
        // 将i无符号右移16位后,有二种情况;
        //   情况1.i=0,则第一个非零值位于低16位,i至少有16个0,同时将i左移16位(把低16位移到原高16位的位置,这样情况1和情况2就能统一后续的判断方式)
        //   情况2.i!=0,则第一个非零值位于高16位,后续在高16位中继续判断
        // 这个思路就是二分查找,首先把32位的数分为高低16位,如果非零值位于高16位,后续再将高16位继续二分为高低8位,一直二分到集合中只有1个元素
        if (i >>> 16 == 0) { n += 16; i <<= 16; }
        // 判断第一个非零值是否位于高8位
        if (i >>> 24 == 0) { n +=  8; i <<=  8; }  
        // 判断第一个非零值是否位于高4位
        if (i >>> 28 == 0) { n +=  4; i <<=  4; }
        // 判断第一个非零值是否位于高2位
        if (i >>> 30 == 0) { n +=  2; i <<=  2; }       
        // 判断第一个非零值是否位于左边第一位
        n -= i >>> 31;                                                                
        return n;

更新器类

第三组AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。API非常简单,但是也是有一些约束:

  1. 字段必须是volatile类型的,否则get的时候会抛出Exception in thread "main" java.lang.ExceptionInInitializerError Caused by: java.lang.IllegalArgumentException: Must be volatile type异常
  2. 字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说 调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。(我试了下,貌似只要字段不是private就行了,大神解释下)
  3. 只能是实例变量,不能是类变量,也就是说不能加static关键字。
  4. 只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
  5. 对于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater 。

下面举个例子:

public class Test {
    private static class User {
        public volatile String name;
        public volatile String name2;
        public volatile int age;
        public volatile int age2;
        public AtomicInteger age3;

        public User(String name, int age) {
            this.name = name;
            this.name2 = name;
            this.age = age;
            this.age2 = age;
            this.age3 = new AtomicInteger(age);
        }
    }
    // 创建原子更新器,并设置需要更新的对象类和对象的属性
    private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
    private static AtomicReferenceFieldUpdater<User, String> b = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
    public static void main(String args[]) {
        User user = new User("zzh", 0);
        int threadCnt = 20;
        Thread[] threads = new Thread[threadCnt];
        CountDownLatch cdt = new CountDownLatch(threadCnt);
        for (int i = 0; i < threadCnt; i++) {
            final int iT = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 4000; j++) {
                        user.name2 += "0";
                        a.getAndIncrement(user);
                        b.accumulateAndGet(user, "0", new BinaryOperator() {
                            @Override
                            public Object apply(Object o, Object o2) {
                                return o.toString() + o2.toString();
                            }
                        });
                        user.age2++;
                        user.age3.incrementAndGet();
                    }
                    cdt.countDown();
                }
            });
            threads[i].start();
        }
        for (Thread i : threads) {
            try {
                i.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
//        while (Thread.activeCount() > 1) {
//            Thread.yield();
//        }
//        try {
//            cdt.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        System.out.println(a.get(user) + "   " + user.age2 + "   " + user.age3.get() + "  " + b.get(user).length() + "  " + user.name2.length());
    }
        //输出
        //80000   77295   80000  80003  41319
}

测试结果是age和age3总是80000即非阻塞同步,字符串name的长度永远都是80003(即zzh后跟8万个0的字符串)也保持了数据的非阻塞同步,但是age2总是一个小于80000的数字即无法同步,name2的长度也总是小于80003的一个数即无法同步。

复合变量类

防止ABA问题出现而构造的类。如什么是ABA问题呢,wiki官方解释。假设多线程环境中,一个线程修改A->B,然后又B->A,另一个线程看到的值未改变,又继续修改成自己的期望值。如果我们不关心中间状态的变化,只关心最终结果,就无所谓ABA问题。而AtomicStampedReference和AtomicMarkableReference就是为了处理ABA问题而存在的。

看下AtomicStampedReference是怎么解决这个问题的:


/**
通过static pair保存一个引用和计数器
*/
private static class Pair<T> {
    final T reference;
    final int stamp;
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}
 
private volatile Pair<V> pair;
 
/**
 * 通过传入的初始化引用和计数器来构造函数一个pair
 *
 * @param initialRef 初始化用用
 * @param initialStamp 初始化计数器或者叫时间戳
 */
public AtomicStampedReference(V initialRef, int initialStamp) {
    pair = Pair.of(initialRef, initialStamp);
}

AtomicStampedReference通过一个pair来保存初始化引用和计数器,以后每次原子操作时,都需要比较引用和计数器是否都正确。举个通俗点的例子,你倒了一杯水放桌子上,干了点别的事,然后同事把你水喝了又给你重新倒了一杯水,你回来看水还在,拿起来就喝,如果你不管水中间被人喝过,只关心水还在,这就是ABA问题。如果你是一个讲卫生讲文明的小伙子,不但关心水在不在,还要在你离开的时候水被人动过没有,因为你是程序员,所以就想起了放了张纸在旁边,写上初始值0,别人喝水前麻烦先做个累加才能喝水。这就是AtomicStampedReference的解决方案。

看下AtomicStampedReference的方法:

    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

AtomicMarkableReference跟AtomicStampedReference差不多,AtomicStampedReference是使用pair的int stamp作为计数器使用,AtomicMarkableReference的pair使用的是boolean mark。还是那个水的例子,AtomicStampedReference可能关心的是动过几次,AtomicMarkableReference关心的是有没有被人动过,方法都比较简单。

LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator

这四个类之前是在guava以及hystrix等中出现过,jdk1.8在JUC包中新增了这四个类,作者是著名的Doug lea。

我们来分析下LongAdder这个类,其余的类的原理都相似。LongAdder主要处理高并发情况,因为AtomicLong虽然能保持数据同步但是在高并发情况下CAS失败率会很高(即while自旋会很久)导致效率低下。

而LongAdder的性能比上面那种还要好很多,首先它有一个基础的值base,在发生竞争的情况下,会有一个Cell数组用于将不同线程的操作离散到不同的节点上去(会根据需要扩容,最大为CPU核数),sum()会将所有Cell数组中的value和base累加作为返回值。核心的思想就是将AtomicLong将单一value的更新压力分担到多个value中去,降低单个value的 “热度”,分段更新!!

底竞争下直接更新base,类似AtomicLong,高并发下,会将每个线程的操作hash到不同的cells数组中,从而将AtomicLong中更新,一个value的行为优化之后,分散到多个value中,从而降低更新热点,而需要得到当前值的时候,直接将所有cell中的value与base相加即可,但是跟AtomicLong(compare and change -> xadd)的CAS不同,incrementAndGet操作及其变种可以返回更新后的值,而LongAdder返回的是void。

public class LongAdder extends Striped64 implements Serializable {
//...
}

LongAdder继承自Striped64,Striped64内部维护了一个懒加载的数组以及一个额外的base实力域,数组的大小是2的N次方(方便取余操作),使用每个线程Thread内部的哈希值访问。

abstract class Striped64 extends Number {
/** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;
     
@sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
}

数组的元素是Cell类,可以看到Cell类用Contended注解修饰,这里主要是解决false sharing(伪共享的问题),比如两个volatile变量被分配到了同一个缓存行,但是这两个的更新在高并发下会竞争,比如线程A去更新变量a,线程B去更新变量b,但是这两个变量被分配到了同一个缓存行,因此会造成每个线程都去争抢缓存行的所有权,例如A获取了所有权然后执行更新这时由于volatile的语义会造成其刷新到主存,但是由于变量b也被缓存到同一个缓存行,因此就会造成cache miss,这样就会造成极大的性能损失,因此有一些类库的作者,例如JUC下面的、Disruptor等都利用了插入dummy 变量的方式,使得缓存行被其独占,比如下面这种代码添加了14行缓冲行防止伪共享:

static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
 }

但是这种方式毕竟不通用,例如32、64位操作系统的缓存行大小不一样,因此JAVA8中就增加了一个注@sun.misc.Contended解用于解决这个问题,由JVM去插入这些变量,具体可以参考openjdk.java.net/jeps/142 ,但是通常来说对象通常是不规则的分配到内存中的,但是数组由于是连续的内存,因此可能会共享缓存行,因此这里加一个Contended注解以防cells数组发生伪共享的情况。

来看下核心的addsum方法:

public class LongAdder {
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /**
         *  如果是第一次执行,则直接case操作base
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            /**
             * as数组为空(null或者size为0)
             * 或者当前线程取模as数组大小为空
             * 或者cas更新Cell失败
             */
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

    public long sum() {
       //通过累加base与cells数组中的value从而获得sum
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
}

其中casBase操作其实就是Unsafe的CAS操作。LongAdder的分段更新操作会带来空间上的浪费(Cell数组导致),可以空间换时间,但是,能不换就不换! 所以,只有并发高到一定程度了,才会分段更新,因为低并发时,casBase操作基本都会成功。

由于Cell相对来说比较占内存,因此这里采用懒加载的方式,在无竞争的情况下直接更新base域,在第一次发生竞争的时候(CAS失败)就会创建一个大小为2的cells数组,每次扩容都是加倍(加倍操作由longAccumulate方法判断并执行),只到达到CPU核数。同时我们知道扩容数组等行为需要只能有一个线程同时执行,因此需要一个锁,这里通过CAS更新cellsBusy来实现一个简单的spin lock。

数组访问索引是通过Thread里的threadLocalRandomProbe域取模实现的,这个域是ThreadLocalRandom更新的,cells的数组大小被限制为CPU的核数,因为即使有超过核数个线程去更新,但是每个线程也只会和一个CPU绑定,更新的时候顶多会有cpu核数个线程,因此我们只需要通过hash将不同线程的更新行为离散到不同的slot即可。

我们知道线程、线程池会被关闭或销毁,这个时候可能这个线程之前占用的slot就会变成没人用的,但我们也不能清除掉,因为一般web应用都是长时间运行的,线程通常也会动态创建、销毁,很可能一段时间后又会被其他线程占用,而对于短时间运行的,例如单元测试,清除掉有啥意义呢?

用Doug Lea说的一句话: 低并发时LongAdder和AtomicLong性能差不多,高并发时LongAdder更高效。其实LongAdder就是以空间(Cell数组)换时间(CAS操作)。


参考文献:

  1. 原子操作类AtomicInteger详解
  2. java-juc-原子类-AtomicIntegerArray初探| cruise yang
  3. Integer.numberOfLeadingZeros(int i) - u010667082 - CSDN博客
  4. JUC源码分析4-原子变量-AtomicStampedReference/AtomicMarkableReference
  5. Java并发工具类之LongAdder原理总结- 后端- 掘金
  6. 从LONGADDER看更高效的无锁实现

相关文章

网友评论

    本文标题:Java并发-atomic原子类包源码剖析

    本文链接:https://www.haomeiwen.com/subject/qkhfqqtx.html