美文网首页
LongAddr源码解析

LongAddr源码解析

作者: Pimow | 来源:发表于2018-10-17 11:45 被阅读0次

    扯淡序言

    在看ConcurrentHashMap源码对addCount(),fullAddCount()不是很理解。经过查询资料发现实现的逻辑跟LongAddr差不多。LongAddr是什么呢?有什么作用呢?让我们带着疑问一起走进这篇文章。

    AtomicLong缺点

    我们都知道AtomicLong是通过CAS自旋的方法去设置value,直到成功为止。那么当并发数比较多时,就会导致CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,形成恶性循环,从而降低了效率。

    Striped64类源码解析

    简介

    Striped64是jdk1.8提供的用于支持如long,double原子性操作的基类。

    Striped64的设计核心思路维护一组按需分配的计数单元,并发计数时,不同的线程可以在不同的计数单元上进行计数,这样减少了线程竞争,提高了并发效率。

    Striped64内部包含一个base和一个cell数组。整个Striped64的值包括base和cell数组的value相加。

    成员变量

     * //存放Cell的hash表,大小为2的幂。 
    
    transient volatile Cell[] cells;
    /** 
      * 基础值,没有竞争时会使用这个值,同时作为初始化table竞争失败的一种方案 
      * 也就是说没有竞争的时候会使用这个值,如果初始化table竞争失败也会使用这个值 
      */   
    transient volatile long base;
    
    //通过cas实现的锁,0无锁,1获得锁
    transient volatile int cellsBusy;
    //可用cpu数量
    static final int NCPU=Runtime.getRuntime().availableProcessors();
    

    Cell类源码解析

     // @sun.misc.Contended这个用于避免伪共享,缓冲行填充
    @sun.misc.Contended static final class Cell {
        //定义一个原子类型变量
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        //value变量的偏移量
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    

    longAccumulate()解析

     /**
      * @param x 值
      * @param fn 更新方法
      * @param wasUncontended 竞争标识,标识在调用这个方法之前是否发生竞争。false发生竞争
      */ 
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        //获取线程probe值
        if ((h = getProbe()) == 0) {
          //如果probe值为0,强制初始化当前线程的probe值。  
            ThreadLocalRandom.current(); 
            //再次获取线程probe值。  
            h = getProbe();
            //没有发生竞争
            wasUncontended = true;
        }
        //是否碰撞,false碰撞
        //如果最后一个槽为不为空才为true
        boolean collide = false;              
        for (;;) {
            Cell[] as; Cell a; int n; long v;
             //如果cells数组不等于null
            if ((as = cells) != null && (n = as.length) > 0) {
                  //通过h从cell表中选定一个cell位置。  
                if ((a = as[(n - 1) & h]) == null) {
                    //如果当前位置没有cell,尝试新建一个Cell对象。 
                    //获取锁
                    if (cellsBusy == 0) {  
                        //创建一个Cell对象     
                        Cell r = new Cell(x);   
                         //获取cellsBusy锁
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //cell创建标识 
                            boolean created = false;
                            try {             
                                Cell[] rs; int m, j;
                                //进入这里说明已经获取到锁,再次检测一下。  
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                   //设置新建的cell到指定位置。  
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //释放cellsBusy锁
                                cellsBusy = 0;
                            }
                            if (created)
                          //如果创建成功,直接跳出循环,退出方法。  
                                break;
                          //说明没有创建成功,则继续尝试。槽不是空的
                            continue;           
                        }
                    }
                    //执行到这里,说明获取cellsBusy锁失败。碰撞标识设置为false
                    collide = false;
                }
                //执行到这里说明cell位置不为空,就是等于a
                else if (!wasUncontended)      
                //设置未竞争标志位true,然后重试。
                    wasUncontended = true;   
                //通过cas将x值加到a的value上。     
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    //成功退出循环
                    break;
                else if (n >= NCPU || cells != as)
                    //说明cells表大于可用cpu数量,或者as数组过时;设置碰撞标识为false
                    collide = false;           
                else if (!collide)
                    //碰撞标识,设置为true
                    collide = true;
                //尝试获取cellsBusy锁。  
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                         //校验as是否过时
                        if (cells == as) {  
                            //cell数组扩容    
                            Cell[] rs = new Cell[n << 1];
                            //老数组拷贝给新数组
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        //释放锁
                        cellsBusy = 0;
                    }
                    //扩容cell表后,再次重试。  
                    collide = false;
                    continue;          
                }
                //重新计算hash值。xorshift算法
                h = advanceProbe(h);
            }
            //初始化cell表,先尝试获取cellsBusy锁。  
          else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
               //初始化标识 
               boolean init = false;
                try {                          
                    if (cells == as) {
                        //初始化cell表,初始容量为2。  
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //释放锁
                    cellsBusy = 0;
                }
                //初始化成功,退出循环
                if (init)
                    break;
            }
            //创建cell表由于竞争导致失败,尝试将x累加到base上。  
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                         
        }
    }
    

    LongAdder源码解析

    类结构关系图

    image

    从类关系图可以看出LongAddr继承Striped64

    add()解析

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
           //校验数组是否为null或者通过cas修改base的值是否成功
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //是否竞争标识,uncontended=true没有表示发生竞争
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //数组未初始化,或者数组某一index位置为null,或者cas修改失败
                //longAccumulate已讲
                longAccumulate(x, null, uncontended);
        }
    }
    

    sum()解析

    //sum的逻辑就是base+cells数组里面的值
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    reset()解析

    //cells数组的值都置为0
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
    

    LongAdder是否能够替换AtomicLong

    个人觉得是不行的,因为AtomicLong提供了很多cas方法,操作灵活,而LongAdder只有add和sum,使用起来比较受限。

    AtomicLong:JVM将64位的double,long 型变量的读操作分为两次32位的读操作,所以低并发保持了AtomicLong性能。

    LongAdder:高并发下热点数据被hash到多个 Cell,通过分散提升了并行度。

    阅读更多文章

    image.png

    相关文章

      网友评论

          本文标题:LongAddr源码解析

          本文链接:https://www.haomeiwen.com/subject/rbblzftx.html