美文网首页并发
LongAdder源码深入剖析

LongAdder源码深入剖析

作者: SunnyMore | 来源:发表于2018-12-25 00:13 被阅读9次

    一、简介

     在之前的《ConcurrentHashMap深入剖析(JDK8)》文章中,我们看到了CounterCell的实现沿用了LongAdder的分段计数的原理;那么这一篇我们来重点看下LongAdder的实现原理。
     我们先来这个类的描述:

    /**
     * One or more variables that together maintain an initially zero
     * {@code long} sum.  When updates (method {@link #add}) are contended
     * across threads, the set of variables may grow dynamically to reduce
     * contention. Method {@link #sum} (or, equivalently, {@link
     * #longValue}) returns the current total combined across the
     * variables maintaining the sum.
     *
     * <p>This class is usually preferable to {@link AtomicLong} when
     * multiple threads update a common sum that is used for purposes such
     * as collecting statistics, not for fine-grained synchronization
     * control.  Under low update contention, the two classes have similar
     * characteristics. But under high contention, expected throughput of
     * this class is significantly higher, at the expense of higher space
     * consumption.
     *
     * <p>LongAdders can be used with a {@link
     * java.util.concurrent.ConcurrentHashMap} to maintain a scalable
     * frequency map (a form of histogram or multiset). For example, to
     * add a count to a {@code ConcurrentHashMap<String,LongAdder> freqs},
     * initializing if not already present, you can use {@code
     * freqs.computeIfAbsent(k -> new LongAdder()).increment();}
     *
     * <p>This class extends {@link Number}, but does <em>not</em> define
     * methods such as {@code equals}, {@code hashCode} and {@code
     * compareTo} because instances are expected to be mutated, and so are
     * not useful as collection keys.
     *
     * @since 1.8
     * @author Doug Lea
     */
    

     第二段中很清晰的表明LongAdder在高并发的场景下会比AtomicLong 具有更好的性能,代价是消耗更多的内存空间。那么,有如下问题:

    为什么要引入LongAdder?AtomicLong在高并发的场景下有什么问题吗? 如果低并发环境下,LongAdder和AtomicLong性能差不多,那LongAdder是否就可以替代AtomicLong了?

    为什么要引入LongAdder?

     我们知道,AtomicLong是利用底层的CAS操作来提供并发性的,比如addAndGet方法:

    /**
         * Atomically adds the given value to the current value.
         *
         * @param delta the value to add
         * @return the updated value
         */
        public final long addAndGet(long delta) {
            return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
        }
    

     上述方法调用了Unsafe类的getAndAddLong方法,该方法是一个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。(也即乐观锁的实现模式)
     在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的场景,此时AtomicLong的自旋会成为瓶颈。
    这就是LongAdder引入的初衷------解决高并发环境下AtomictLong的自旋瓶颈问题。

    LongAdder为什么高并发性场景能会更好?

     既然,在高并发场景下LongAdder可以显著提升性能,那么它时如何做到的?这一部分简述下LongAdder的实现思路,具体原理在源码剖析的章节会重点阐述。
     我们知道,AtomicLong中有个内部变量value保存着实际的long的值,所有的操作都是针对该变量进行操作的。也就是说,高并发场景下,value变量其实就是一个热点,也就是N个线程竞争的一个热点。
     LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回即可。
     这种做法其实跟JDK1.8之前的ConcurrentHashMap“分段锁”的实现思路是一样的:分散热点。

    LongAdder能否替代AtomicLong?

     我们先来看看LongAdder提供的API:


    LongAdder的API



    AtomicLong的API

     从上面的图中可以看到,LongAdder的API和AtomicLong的API还是有比较大的差异,而且AtomicLong提供的功能更丰富,尤其是addAndGet、decrementAndGet、compareAndSet这些方法。addAndGet、decrementAndGet除了单纯的做自增自减外,还可以立即获取增减后的值,而LongAdder则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,则使用AtomicLong比较合适;
     另外,从空间方面考虑,LongAdder其实是一种“空间换时间”的思想,从这一点来讲AtomicLong更合适。
     低并发、一般的业务尝尽下AtomicLong是足够了,如果并发量很多,存在大量写多读少的情况,那LongAdder可能更合适。


    二、LongAdder原理

     上文已经说过了,AtomicLong是多个线程针对单个热点值value进行原子操作。而LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作。

    比如有三个Thread1、Thread2、Thread3,每个线程对value增加10。

     对于AtomicLong,最终结果的计算值始终是下面这个形式:

    value = 10 + 10 + 10 = 30

     但是对于LongAdder来说,内部有一个base变量,一个Cell[]数组。

    • base变量:非竞态条件下,直接累加到该变量上
    • Cell[]数组:竞态条件下,累加各个线程自己的槽Cell[i]中

     最终结果的计算是下面的形式:



    三、LongAdder源码剖析

     我们先来看下LongAdder类的类签名:

    public class LongAdder extends Striped64 implements Serializable 
    

     LongAdder继承了Striped64类,来实现累加功能,它是实现高并发累加的工具类;Striped64的设计核心思路就是通过内部的分散计算来避免竞争。Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表。
     没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。这样Striped64的值就跟原理章节中描述的一样,满足了求和公式;

    3.1 Striped64类

     Striped64类有三个重要的成员变量:

    /**
         * Table of cells. When non-null, size is a power of 2.
         * 存放Cell的hash表,大小为2的幂。
         */
        transient volatile Cell[] cells;
    
        /**
         * Base value, used mainly when there is no contention, but also as
         * a fallback during table initialization races. Updated via CAS.
         * 基础值
         * 1. 在没有竞争时会更新这个值;
         * 2. 在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base。
         */
        transient volatile long base;
    
        /**
         * Spinlock (locked via CAS) used when resizing and/or creating Cells.
         * 自旋锁,通过CAS操作加锁,用于保护创建或者扩展Cell表。
         */
        transient volatile int cellsBusy;
    
    成员cells

     cells数组是LongAdder高性能实现的必杀器:
     AtomicInteger只有一个value,所以线程累加都要通过cas竞争value这一个变量,高并发下线程争用非常严重。
     而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况下不会用到cells数组,它为null,这时使用base做累加,有了竞争后cells数组就上场了,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器cpu的数量为止就不在扩容。那么为什么到超过cpu数量的时候就不再扩容了?每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个Cell对象上。

    成员变量cellsBusy

     cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:

    1. cells数组初始化的时候;
    2. cells数组扩容的时候;
    3. 如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;
    成员变量base

     它有两个作用:

      1. 在开始没有竞争的情况下,将累加值累加到base;
      1. 在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上;

    3.2 Cell内部类

    /**
         * Padded variant of AtomicLong supporting only raw accesses plus CAS.
         *
         * JVM intrinsics note: It would be possible to use a release-only
         * form of CAS here, if it were provided.
         *  //为提高性能,使用注解@sun.misc.Contended,用来避免伪共享
         */
        @sun.misc.Contended static final class Cell {
            //用来保存要累加的值
            volatile long value;
            Cell(long x) { value = x; }
            //使用UNSAFE类的cas来更新value值
            final boolean cas(long cmp, long val) {
                return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
            }
    
            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            //value在Cell类中存储位置的偏移量;
            private static final long valueOffset;
            //这个静态方法用于获取偏移量
            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> ak = Cell.class;
                    valueOffset = UNSAFE.objectFieldOffset
                        (ak.getDeclaredField("value"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }
    

     这个类相对来说比较简单,volatile类型的value用来保存要累加的值;final类型的valueOffset表示value在Cell类中存储位置的偏移量;Cell类重点需要关注注解@sun.misc.Contended;这个注解是JDK1.8中用来解决伪共享的问题,具体伪共享可以参见《伪共享、缓存行填充和CPU缓存详述》这一篇文章,这里不再详述;

    3.3 LongAdder

     LongAdder我们关注下重点方法,首先看下add方法。

    add方法

     add方法是LongAdder累加的方法,传入的参数x为要累加的值;

    /**
         * Adds the given value.
         *
         * @param x the value to add
         */
        public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            /**
             * 如果一下两种条件则继续执行if内的语句
             * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
             * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
             * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
             * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
             */
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否不存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
                boolean uncontended = true;
                /**
                 *1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
                 *2. (m = as.length - 1) < 0: cells数组的长度为0
                 *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
                 *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
                 *4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
    
                 如果进入if语句执行longAccumulate方法,有三种情况
                 1. 前两个条件代表cells没有初始化,
                 2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
                 3. 第四个条件代表产生了冲突,uncontended=false
                 **/
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    longAccumulate(x, null, uncontended);
            }
        }
    

     如上所示,只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只是针对cell[]数组中的单元Cell。如果cell[]数组未初始化,会调用父类的longAccumulate去初始化cell[],如果cell[]已经初始化但冲突发生在Cell单元内,则也是调用父类的longAccumulate,此时可能需要对cell[]扩容了。接下来我们来看下longAccumulate方法。

    longAccumulate方法

     三个参数第一个为要累加的值,第二个为null,第三个为wasUncontended表示调用方法之前的add方法是否未发生竞争;

    /**
         * Handles cases of updates involving initialization, resizing,
         * creating new Cells, and/or contention. See above for
         * explanation. This method suffers the usual non-modularity
         * problems of optimistic retry code, relying on rechecked sets of
         * reads.
         *
         * @param x the value
         * @param fn the update function, or null for add (this convention
         * avoids the need for an extra field or function in LongAdder).
         * @param wasUncontended false if CAS failed before call
         */
        final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) {
            //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;
            //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
            //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;
            //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;
            //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
            int h;
            if ((h = getProbe()) == 0) {
                //初始化ThreadLocalRandom;
                ThreadLocalRandom.current(); // force initialization
                //将h设置为0x9e3779b9
                h = getProbe();
                //设置未竞争标记为true
                wasUncontended = true;
            }
            //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突
            boolean collide = false;                // True if last slot nonempty
            for (;;) {
                Cell[] as; Cell a; int n; long v;
                //这个主干if有三个分支
                //1.CASE1:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
                //2.CASE2:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
                //3.CASE3:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上
                //先看主分支一
                if ((as = cells) != null && (n = as.length) > 0) {
                    /**
                     *CASE1:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上
                     */
                    if ((a = as[(n - 1) & h]) == null) {
                        //cellsBusy == 0 代表当前没有线程cells数组做修改
                        if (cellsBusy == 0) {       // Try to attach new Cell
                            //将要累加的x值作为初始值创建一个新的Cell对象,
                            Cell r = new Cell(x);   // Optimistically create
                            //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
                            if (cellsBusy == 0 && casCellsBusy()) {
                                //标记Cell是否创建成功并放入到cells数组被hash的位置上
                                boolean created = false;
                                try {               // Recheck under lock
                                    Cell[] rs; int m, j;
                                    //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        //将新的cell设置到该位置
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
                                    //去掉锁
                                    cellsBusy = 0;
                                }
                                //生成成功,跳出循环
                                if (created)
                                    break;
                                //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。
                                continue;           // Slot is now non-empty
                            }
                        }
                        //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
                        collide = false;
                    }
                    /**
                     * case2:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;
                     */
                    else if (!wasUncontended)       // CAS already known to fail
                        //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。
                        wasUncontended = true;      // Continue after rehash
                    /**
                     * case3:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出
                     */
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x))))
                        break;
                    /**
                     *  case4:case3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值
                     */
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale
                    /**
                     * case5:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
                     */
                    else if (!collide)
                        //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
                        collide = true;
                    /**
                     * case6:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
                     */
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
                            //检查cells是否已经被扩容
                            if (cells == as) {      // Expand table unless stale
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        collide = false;
                        continue;                   // Retry with expanded table
                    }
                    //为当前线程重新计算hash值
                    h = advanceProbe(h);
                }
                //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {                           // Initialize table
                        //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        //解锁
                        cellsBusy = 0;
                    }
                    //如果init为true说明初始化成功,跳出循环
                    if (init)
                        break;
                }
                /**
                 *如果以上操作都失败了,则尝试将值累加到base上;
                 */
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }
    
    Striped64类longAccumulate方法原理
    getProbe()方法hash的生成

     hash是LongAdder定位当前线程应该将值累加到cells表的哪个位置上的,所以hash算法是非常重要的,接下来看看它的实现;
     在java的Thread类中有一个成员变量:

    /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
        @sun.misc.Contended("tlr")
        int threadLocalRandomProbe;
    

     threadLocalRandomProbe这个变量的值就是LongAdder用来hash定位Cells数组位置的,平时线程的这个变量一般用不到,它的值一直都是0。在LongAdder的父类Striped64里通过getProbe方法获取当前线程的threadLocalRandomProbe值:

    /**
         * Returns the probe value for the current thread.
         * Duplicated from ThreadLocalRandom because of packaging restrictions.
         */
        static final int getProbe() {
            //PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量,所以下面语句获取的就是threadLocalRandomProbe的值;
            return UNSAFE.getInt(Thread.currentThread(), PROBE);
        }
    
    threadLocalRandomProbe的初始化

     线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9

    int h;
            if ((h = getProbe()) == 0) { // 给当前线程生成一个非0的hash值
                //初始化ThreadLocalRandom;
                ThreadLocalRandom.current(); // force initialization
                //将h设置为0x9e3779b9
                h = getProbe();
                //设置未竞争标记为true
                wasUncontended = true;
            }
    

     重点在 ThreadLocalRandom.current(); 这一行

    /**
         * Returns the current thread's {@code ThreadLocalRandom}.
         *
         * @return the current thread's {@code ThreadLocalRandom}
         */
        public static ThreadLocalRandom current() {
            if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
                localInit();
            return instance;
        }
    

     在current方法中判断如果probe的值为0,则执行locaInit()方法,将当前线程的probe设置为非0的值,该方法实现如下:

    /**
         * Initialize Thread fields for the current thread.  Called only
         * when Thread.threadLocalRandomProbe is zero, indicating that a
         * thread local seed value needs to be generated. Note that even
         * though the initialization is purely thread-local, we need to
         * rely on (static) atomic generators to initialize the values.
         */
        static final void localInit() {
            //private static final int PROBE_INCREMENT = 0x9e3779b9;
            int p = probeGenerator.addAndGet(PROBE_INCREMENT);
            //prob不能为0
            int probe = (p == 0) ? 1 : p; // skip 0
            long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
            //获取当前线程
            Thread t = Thread.currentThread();
            UNSAFE.putLong(t, SEED, seed);
            //将probe的值更新为probeGenerator的值
            UNSAFE.putInt(t, PROBE, probe);
        }
    

     probeGenerator 是static 类型的AtomicInteger类,每执行一次localInit()方法,都会将probeGenerator 累加一次0x9e3779b9这个值;,0x9e3779b9这个数字的得来是 2^32 除以一个常数,这个常数就是传说中的黄金比例 1.6180339887;然后将当前线程的threadLocalRandomProbe设置为probeGenerator 的值,如果probeGenerator 为0,这取1;

    threadLocalRandomProbe重新生成

     就是将prob的值左右移位 、异或操作三次

    /**
         * Pseudo-randomly advances and records the given probe value for the
         * given thread.
         */
        static final int advanceProbe(int probe) {
            probe ^= probe << 13;   // xorshift
            probe ^= probe >>> 17;
            probe ^= probe << 5;
            UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
            return probe;
        }
    
    LongAdder的sum方法

     最后来看下LongAdder的sum方法,

     /**
         * Returns the current sum.  The returned value is <em>NOT</em> an
         * atomic snapshot; invocation in the absence of concurrent
         * updates returns an accurate result, but concurrent updates that
         * occur while the sum is being calculated might not be
         * incorporated.
         *
         * @return the sum
         */
        public long sum() {
            Cell[] as = cells; Cell a;
            long sum = base;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

      这个其实印证了我们原理章节中所说的一个求和公式;需要注意的是,这个方法只能得到某个时刻的近似值,这也就是LongAdder并不能完全替代LongAtomic的原因之一。

    四、Striped64的其它子类

     JDK1.8时,java.util.concurrent.atomic包中,除了新引入LongAdder外,还有引入了它的三个兄弟类:LongAccumulator、DoubleAdder、DoubleAccumulator

    LongAccumulator

     LongAccumulator是LongAdder的增强版。LongAdder只能针对数值的进行加减运算,而LongAccumulator提供了自定义的函数操作。其构造函数如下:

    /**
         * Creates a new instance using the given accumulator function
         * and identity element.
         * @param accumulatorFunction a side-effect-free function of two arguments
         * @param identity identity (initial value) for the accumulator function
         */
        public LongAccumulator(LongBinaryOperator accumulatorFunction,
                               long identity) {
            this.function = accumulatorFunction;
            base = this.identity = identity;
        }
    

     通过LongBinaryOperator,可以自定义对入参的任意操作,并返回结果(LongBinaryOperator接收2个long作为参数,并返回1个long)

     LongAccumulator内部原理和LongAdder几乎完全一样,都是利用了父类Striped64的longAccumulate方法。

    DoubleAdder和DoubleAccumulator

     从名字也可以看出,DoubleAdder和DoubleAccumulator用于操作double原始类型。

     与LongAdder的唯一区别就是,其内部会通过一些方法,将原始的double类型,转换为long类型,其余和LongAdder完全一样:

    /**
         * Adds the given value.
         *
         * @param x the value to add
         */
        public void add(double x) {
            Cell[] as; long b, v; int m; Cell a;
            if ((as = cells) != null ||
                !casBase(b = base,
                         Double.doubleToRawLongBits //Double内部转换成Long的方法
                         (Double.longBitsToDouble(b) + x))) {
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value,
                                          Double.doubleToRawLongBits
                                          (Double.longBitsToDouble(v) + x))))
                    doubleAccumulate(x, null, uncontended);
            }
        }
    

    相关文章

      网友评论

        本文标题:LongAdder源码深入剖析

        本文链接:https://www.haomeiwen.com/subject/yvvmkqtx.html