LongAdder

作者: Pillar_Zhong | 来源:发表于2019-06-12 15:42 被阅读0次
    LongAdder.png

    Cell

    // 避免伪共享 jdk1.8后加入的
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        // CAS操作,设置value
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
    

    sum

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    // base + N * cell.value
                    sum += a.value;
            }
        }
        return sum;
    }
    

    reset

    // reset base & cell.value
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
    

    add

    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        // 如果有cell的部分,说明要累加到cell
        // 如果cell为空,那么累加base看看
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            // 如果cells还没有初始化,直接进入if
            // 或cells的长度为0
            // 或当前线程的cell位置为空,没有被累加过
            // 或者cell位置不为空,且累加失败,有竞争
            // 以上情况,全部进入下面的longAccumulate
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    

    longAccumulate

    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // 看下ThreadLocalRandom是否初始化
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        // 自旋
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            // 如果cells有东西
            if ((as = cells) != null && (n = as.length) > 0) {
                // 如果线程对应的坑位为空
                if ((a = as[(n - 1) & h]) == null) {
                    // cellsbusy为0,代表现在cells稳定,那么可以开始针对当前线程增加坑位
                    // 换句话说,如果发现cellsbusy为1,坑位就没机会增加了。
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        // 到这里,准备正式开始,第一时间先锁定cells,也就是先设置cellsbusy为1
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                // 再次定位到坑位
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    // 将cell填充进去,不要忘记这个坑位记录了你要put的value
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                // 最终释放cellsbusy,也就是解锁cells
                                cellsBusy = 0;
                            }
                            // 如果已经成功创建坑位,那么退出自旋,否则继续自旋
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    // 到这里说明cells正在被其他线程锁定,记录下有冲突的事实,准备去rehash
                    collide = false;
                }
                // 这里说明外部调用处在设置线程对应的坑位时失败,有竞争线程
                // wasUncontended的意义在于,如果有竞争,说明线程的probe冲突,准备去rehash
                // 对应下面的advanceProbe,这样再次自旋后,会去关注新的坑位,也就没有冲突一说了
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 到这里,说明该线程的坑位已经有了,直接累加就好,如果成功当然最好了
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // 如果上面累加失败,看下cells是否已经到了上限或cells已经变更了,比如扩容了。
                // 这里也记录下有冲突,准备去rehash
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // 设置冲突标识,准备去rehash
                else if (!collide)
                    collide = true;
                // 到这里说明,就算有了cells,但是该位置上累加失败,数组还可以扩容
                // 既然你不让我加,竞争这么厉害,那么扩容试试看
                // 当然了,先锁定cells
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        // 如果cells没变化
                        if (cells == as) {      // Expand table unless stale
                            // 容量扩大一倍
                            Cell[] rs = new Cell[n << 1];
                            将旧的转移到新的cells里面
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        // 解锁cells
                        cellsBusy = 0;
                    }
                    collide = false;
                    // 扩容完,再自旋看看,能否成功
                    continue;                   // Retry with expanded table
                }
                // 重新计算新的probe值以对应到不同的下标元素,然后重试。  
                h = advanceProbe(h);
            }
            // cells还未初始化或为空,先锁定cells
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        // cells的大小必须是2的幂,好length-1&h进行求余
                        Cell[] rs = new Cell[2];
                        // 设置h对应的下标位置
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    // 解锁
                    cellsBusy = 0;
                }
                // 如果初始化完毕,退出自旋
                if (init)
                    break;
            }
            // 说明cells还没有初始化,但是cells被别人锁定了
            // 那么尝试着加到base看看,如果成功,那么退出自旋
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    

    相关文章

      网友评论

          本文标题:LongAdder

          本文链接:https://www.haomeiwen.com/subject/olplfctx.html