美文网首页Java轻巧一些收藏
[Java]重学Java-原子类

[Java]重学Java-原子类

作者: AbstractCulture | 来源:发表于2022-09-01 16:19 被阅读0次

    JUC包下的原子类

    JUC就是大名鼎鼎的java并发包,我们今天来看看基于非阻塞性算法的CAS封装的原子类.
    JUC下有AtomicIntegerAtomicLongAtomicBoolean等类,

    UML
    • 在多线程的环境下对count变量进行自增
    public static AtomicLong count = new AtomicLong(0);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> System.out.println("现在对原子类进行自增:" + count.addAndGet(1)));
        }
        Thread.sleep(1000);
        System.out.println("原子类的值:"+count.get());
    }
    

    这里我们通过代码验证了,多线程环境下,原子类是可以保证线程安全的。

    AtomicLong

    public class AtomicLong extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 1927816293512124184L;
    
        // unsafe实例
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        // 偏移量
        private static final long valueOffset;
    
        // JVM是否支持Long类型的CAS
        static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
    
    
        private static native boolean VMSupportsCS8();
    
        static {
            try {
                // 获取value在AtomicLong中的偏移量
                valueOffset = unsafe.objectFieldOffset
                    (AtomicLong.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }
        // 实际变量
        private volatile long value;
    
        public AtomicLong(long initialValue) {
            value = initialValue;
        }
        
        public final boolean compareAndSet(long expect, long update) {
            return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
        }
    

    这里有几个细节:

    1. value被volatile修饰,保证了内存可见性.
    2. 所有的CAS操作依赖Unsafe

    Unsafe类

    JDK中的Unsafe类提供了硬件级别的原子性操作,他提供的都是nat
    ive方法,Java使用JNI的方式调用C++的本地方法。我们简单来看看Unsafe类的核心方法.

    • sun.misc.Unsafe
        // 获取对应属性的偏移量
        public native long objectFieldOffset(Field var1);
        
        public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    
        public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
        // 比对对象 obj 中偏移量 offset 的变 的值是否与 expect 相等,相等使用update更新  
        // 然后返回 true ,否 返回 false
        public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
        
        // 唤醒调用park后阻塞的线程。
        public native void unpark(Object var1);
        // 阻塞当前线程
        public native void park(boolean isAbsolute, long time);
    

    更好性能的原子类-LongAdder

    AtomicLong通过CAS保证了原子性,只借助CPU即可完成一个轻量级的"锁",但是在高并发的场景下,还是会出现多个线程去争夺一个变量的控制权这种情况,这会造成线程直接频繁竞争,浪费CPU资源。
    在一些分布式的项目中,为了满足"秒杀",我们通常会将仓库进行内部拆分,尽可能切分成小的颗粒度,然后通过一些策略去扣除缓存中的库存数,在统计时,再统一读取所有"分片"的数据即可。

    cell

    LongAdder内部维护了多个cell变量,每个cell的初始值为0,当并发量上来的时候,线程会去抢夺空闲的cell的控制权,如果A竞争cellA失败,它会转向竞争cellB,如此一来,便减少了如AtomincLong出现的单一变量遭到多线程频繁竞争的问题。

    在做读操作的时候,LongAdder直接统计base+所有cell的值返回即可.

    • java.util.concurrent.atomic.LongAdder#add
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    
    1. LongAdder并不会一开始就创建cell数组,惰性加载.
    2. 如果cell数组为空,那么先对base值进行累加操作.

    伪共享问题

    之前我们聊过,为了解决CPU和内存直接运行速度的差距,操作系统设计了多级缓存架构,其中CPU和主内存之间,还存在着高速缓冲层,也就是CPU Cache.
    CPU Cache内部是按行存储的,每一行称为Cache行,它是主存和Cache直接交换数据的基本单位。
    CPU需要访问变量值时,会从CPU Cache查看是否有该变量,如果有直接读取,如果没有则从主内存读,然后回种到CPU Cache,这里回种的时候就有一个问题,CPU Cache是按行存储,变量的大小可能没有那么大,此时就会出现多个变量值存储在同一个Cache行中,当发生多线程访问该变量时,只能有一个线程访问同一个缓存行,此时就会出现排队竞争的情况,也就是伪共享.

    缓存行

    如何解决伪共享问题

    解决的思路是将变量值填充到跟cache行一样的大小,这样刚好一行只存放一个变量,那么线程之间竞争xy的时候就会去访问不同的缓存行。
    在JDK8中,可以使用@sun.misc.Contended注解声明,让JDK为你完成这部分工作.

    LongAdder如何解决伪共享

    答案在代码里面:

    • java.util.concurrent.atomic.Striped64.Cell
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    

    LongAdder源码解析

    我们先来看看LongAdder的类图结构:

    LongAdder

    结合上面的代码,我们可以知道,cell类其实是定义在Striped64类中,Striped64包含几个元素:

    1. base值,它是LongAdder的一个基础值.
    2. cell数组,用来减少多线程竞争同一变量进行了切分.同时为了方便读取,使用了volatile修饰value值,使其具备可见性.
    3. cellsBuy,自旋锁开关.通常在数组进行创建、初始化、扩容的时候进行变化.

    读取LongAdder的值-sum

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    代码比较简单,基本上就是累加sumcell中所有的值,注意,这里的值只是读取那一刻的快照值,不是100%准确,因为没有做任何加锁措施。

    累加操作-add

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    
    1. 判断当前cells是否为空,如果不为空对base值进行累加
    2. 如果cells不为空或者casBase失败了,那么进入执行代码块.
    3. 对cell数组进行定位,也就是找到当前线程去访问哪一个cell.
      我们继续来看看longAccumulate.
    • java.util.concurrent.atomic.Striped64#longAccumulate
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // 当前cell数组元素大于CPU个数
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // 是否有冲突
                else if (!collide)
                    collide = true;
                // 当前元素个数未达到CPU个数并且有冲突,扩容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    // 变量被volatile修饰,这里不用加锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    

    在定位的过程中,我们会遇到冲突的问题,我们回忆一下在HashMap的底层,是通过拉链法解决哈希冲突的问题。
    在LongAdder中,在初始化、扩容的时候都会涉及到cellsBusy这个标志位,如果当前cellsBusy为1,说明当前有线程在进行扩容或者初始化工作,此时其他线程会等待。
    我们在Initialize table这行注释里面可以去看到cell数组初始化的过程.

    初始化的过程可以理解为,多个线程CAS抢夺cellsBusy的控制权,抢到的线程进行初始化,其他线程进行等待.

    扩容的过程有几个先决条件: cells中的元素个数小于当前机器CPU个数并且发生了CAS冲突(多个线程同时CAS竞争同一个cell),其中产生了CAS失败,那么就会进行扩容操作.为什么要有这个条件,这是为了解决更合理的使用CPU核心数资源(思考一下伪共享的情景),每个CPU分配一个cell无疑是最佳的.扩容的过程是将数组增大一倍,然后将元素进行迁移.

    以上,便是大致的流程。

    相关文章

      网友评论

        本文标题:[Java]重学Java-原子类

        本文链接:https://www.haomeiwen.com/subject/irdsnrtx.html