美文网首页
Java1.8新特性 LongAdder源码学习

Java1.8新特性 LongAdder源码学习

作者: Acezhuyf | 来源:发表于2019-03-22 11:20 被阅读0次

    1.简介

    LongAdder是Java8中新的并发包类,相比较之前的原子类AtomicXXX,LongAdder在低并发的情况下性能和原子类基本持平,但在高并发的情况下性能优于原子类

    2.源码分析

    /**
     * One or more variables that together maintain an initially zero
     * {@code long} sum.  When updates (method {@link #add}) are contended
     * across threads, the set of variables may grow dynamically to reduce
     * contention. Method {@link #sum} (or, equivalently, {@link
     * #longValue}) returns the current total combined across the
     * variables maintaining the sum.
     *
     * <p>This class is usually preferable to {@link AtomicLong} when
     * multiple threads update a common sum that is used for purposes such
     * as collecting statistics, not for fine-grained synchronization
     * control.  Under low update contention, the two classes have similar
     * characteristics. But under high contention, expected throughput of
     * this class is significantly higher, at the expense of higher space
     * consumption.
     * @since 1.8
     * @author Doug Lea
    */
    public class LongAdder extends Striped64 implements Serializable
    

    注释的意思是,1个或多个值的和被保存在一个初始化值为0的"sum"中,当调用add()方法的并发量很大时,那么存储值的容器会动态扩容以降低碰撞几率(类似hash碰撞),方法sum()返回并发操作后得到的最终结果。
    通常情况下,LongAdder在统计数据更新时比AtomicLong更适用。但不应该用于细粒化的同步控制。LongAdder在低并发的情况下性能和AtomicLong基本持平,但在高并发的情况下性能优于AtomicLong。

    那我们就来看看add(long x)和sum()方法的具体实现吧!

    public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            //尝试更新base,若并发不高,能更新成功,退出条件(cas更新)
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                boolean uncontended = true;
                /**
                * 尝试第二次更新值,取得Cell[]中的一个值,getProbe() & m是
                * 取得一个随机整数然后和m(数组大小-1)做与运算,得出的值在
                * 0~(as.length - 1)之间,更新成功则退出条件
                */
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    //更新失败后进入
                    longAccumulate(x, null, uncontended);
            }
        }
    //这是存储实体Cell的源码,对CAS不了解的同学可以看看Atomic包
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    
    final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) {
            int h;
            // 取得一个随机整数,设置是否并发竞争为true
            if ((h = getProbe()) == 0) {
                ThreadLocalRandom.current(); 
                h = getProbe();
                wasUncontended = true;
            }
            //是否碰撞,参考hashmap
            boolean collide = false; 
            for (;;) {
                Cell[] as; Cell a; int n; long v;
                if ((as = cells) != null && (n = as.length) > 0) {
                    //如果数组指定位置为空,则赋值
                    if ((a = as[(n - 1) & h]) == null) {
                        if (cellsBusy == 0) {       
                            Cell r = new Cell(x);
                            //创建Cell时,casCellsBusy()锁住数组
                            if (cellsBusy == 0 && casCellsBusy()) {
                                boolean created = false;
                                try {               
                                    Cell[] rs; int m, j;
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    cellsBusy = 0;
                                }
                                if (created)
                                    break;
                                continue;
                            }
                        }
                        //指定数组位置为null则说明没有碰撞
                        collide = false;
                    }
                    else if (!wasUncontended)
                        wasUncontended = true;
                    /**
                    * a不为null,更新a的value,成功则退出。更新失败则
                    * 说明多个线程同时更新一个Cell,并发量大,碰撞几率高
                    * 可能需要扩容
                    */
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x))))
                        break;
                    /**
                    * 若数组大小大于CPU个数,则说明碰撞不是由于数组过小导致
                    * 则重新尝试更新数据
                    */
                    else if (n >= NCPU || cells != as)
                        collide = false;            
                    else if (!collide)
                        collide = true;
                    //扩容数组,扩容时锁住数组
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            if (cells == as) {      
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   
                    }
                    h = advanceProbe(h);
                }
                //若cell[]为空,初始化
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {                           // Initialize table
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }
    //加锁方法
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
    
    /**
    * 返回各个线程操作总和,LongAdder在统计的时候如果有并发更新
    * 可能导致统计的数据有误差,因为volatile并不能保证并发安全
    */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    3.总结

    LongAdder在AtomicLong的基础上将单点的更新压力分散到各个节点,在低并发的时候通过对base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并发的时候通过分散,让各个线程操作不同的节点,提高了性能。

    相关文章

      网友评论

          本文标题:Java1.8新特性 LongAdder源码学习

          本文链接:https://www.haomeiwen.com/subject/dyhlvqtx.html