美文网首页
JAVA 多线程与高并发学习笔记(十)——LongAdder

JAVA 多线程与高并发学习笔记(十)——LongAdder

作者: 简单一点点 | 来源:发表于2022-08-03 09:43 被阅读0次

转眼就8月份了,要开始努力了~~~

在争用激烈的场景下,会导致大量的CAS空自旋。这会浪费大量的CPU资源,大大降低了程序的性能。

这时候可以通过使用 LongAdder 代替 AtomicInteger 来提高CAS操作的性能。

以空间换时间:LongAdder

Java8提供了一个类 LongAdder 通过以空间换时间的方式提高并发场景下CAS操作的性能。

LongAdder 的核心思想是热点分离,与 ConcurrentHashMap 的设计思想类似,将 value 值分离成一个数组,当多线程访问时,通过 Hash 算法将线程映射到数组的一个元素进行操作,而获取最终的的 value 结果时,则将数组的元素求和。

最终,通过 LongAdder 将内部操作对象从单一 value 值“演变”成一些列的数组元素,从而减小了内部竞争的粒度。

下面看一个小例子:

public class LongAdderTest {

    final int TURNS = 1000;

    @Test
    public void testLongAdder() {
        final int TASK_AMOUNT = 10;

        ExecutorService pool = Executors.newCachedThreadPool();

        LongAdder longAdder = new LongAdder();

        CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
        long start = System.currentTimeMillis();
        for(int i = 0; i < TASK_AMOUNT; i++) {
            pool.submit(() -> {
                try {
                    for(int j = 0; j < TURNS; j++) {
                        longAdder.add(1);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        float time = (System.currentTimeMillis() - start) / 1000F;
        System.out.println("运行时长为:" + time);
        System.out.println("累加结果为: " + longAdder.longValue());
    }
}

LongAdder 的原理

AtomicLong 使用内部变量 value 保存着实际的 long 值,所有的操作都是针对该变量进行的。也就是说在高并发环境下,value 变量其实是一个热点,重试线程越多,CAS失败概率更高,从而进入恶性CAS空自旋状态。

LongAdder 的基本实录是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行CAS操作。这样热点就被分担了,冲突的概率就小很多。

使用 LongAdder,即使线程数再多也不必担心,各个线程会分配到多个元素上去更新,增加元素个数,就可以降低 value 的“热度”,恶性CAS空自旋就解决了。

如果要获得完整的 LongAdder 存储的值,只要将各个槽中的变量值累加,返回最终累加之后的值即可。

LongAdder 的实现思路与 ConcurrentHashMap 中分段锁的基本原理非常相似,本质上都是不同的线程在不同的单元上进行操作,这样减少了线程竞争,提高了并发效率。

LongAdder 的设计体现了空间换时间的思想,不过在实际高并发场景下,数组元素所消耗的空间可以忽略不计。

LongAdder 实例的内部结构

一个 LongAdder 实例的内部结构如下所示。

longAdder.png

LongAdder 的内部成员包含一个 base 值和一个 cells 数组,在最初无竞争时,只操作 base 的值,当线程执行CAS失败后,才初始化 cells 数组,并为线程分配所对应的元素。

基类 Striped64 内部三个重要成员

LongAdder 继承于 Stripped64 类,base 值和 cell 数组都在 Stripped64 类中定义,它的内部三个重要的成员如下:

/**
 * 成员一:存放Cell的哈希表,大小为2的幂
 */
transient volatile Cell[] cells;
/**
 * 成员二:基础值
 * 1.在没有竞争时会更新这个值
 * 2.在cells初始化时,cells不可用,也会尝试通过CAS操作值累加到base
 */
transient volatile long base;
/**
 * 自旋锁,通过CAS操作加锁,为0表示cells数组没有处于创建、扩容阶段
 * 为1表示正在创建或者扩展cells数组,不能进行新Cell元素的设置操作
 */
transient volatile int cellsBusy;

Stripped64 内部包含一个 base 和一个 Cell[] 类型的 cells 数组,cells 数组又叫哈希表,在没有竞争的情况下,要累加的数通过CAS累加到 base 上,如果有竞争的话,会将要累加的数累加到 cells 数组中的某个 Cell 元素里面。

Stripped64 的整体值 value 的获取函数如下:

public long longValue() {
    return sum();
}

/**
 * 将多个cells数组中的值加起来的和
 */
public long sum() {
    Cell[] as = cells; 
    Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

Stripped64 的设计核心思路是通过内部的分散计算来避免竞争,以空间换取时间。没有竞争时 cells 数组为 null,这时只使用 base,一旦发生竞争,cells 数组就上场了。

cells 数组第一次初始化长度为2,以后每次扩容都变为原来的两倍,一直到 cells 数组的长度大于等于当前服务器的CPU核数,同一时刻能持有CPU时间片去并发操作同一内存地址的最大线程数最多也就是CPU的核数。

在存在线程争用的时候,每个线程被映射到 cells[threadLocalRandomProbe&cells.length] 位置的 Cell 元素,该线程对 value 所做的累加操作就执行在对应的 Cell 元素的值上。

LongAdder 类的 add() 操作

这里分析以下 LongAdder 类的 add() 方法,具体的源码如下:

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

/**
 * 自增1
 */
public void increment() {
    add(1L);
}

/**
 * 自减1
 */
public void decrement() {
    add(-1L);
}

首先介绍一下代码的外层 if 块的两个条件语句:

  • cells 数组不为null,说明存在争用,在不存在争用的时候,cells 数组一定为null,一旦对 base 的CAS操作失败,才会初始化 cells 数组。
  • 如果 cells 数组为null,表示之前不存在争用,并且此次 casBase 执行成功,表示基于 base 成员累加成功,add 方法直接返回;如果 casBase 方法执行失败,说明产生了第一次争用冲突,需要对 cells 数组初始化,此时即将进入内嵌 if 块。

casBase 方法很简单,就是通过 UNSAFE 类的CAS设置成员变量 base 的值为 base+xcasBase方法的代码如下:

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

内嵌的if语句块逻辑如下:

  • as == null || (m = as.length - 1)<0 代表 cells 没有初始化。
  • 当前线程的哈希值在 cells 数组映射位置的 Cell 对象为空,意思是还没有其他线程在同一个位置做过累加操作。
  • 当前线程的哈希值在 cells 数组映射位置的 Cell 对象不为空,然后在该 Cell 对象上进行CAS操作,设置其值为 v+x,但是CAS操作失败,表示存在争用。

如果以上条件满足一个,就进入 longAccumulate 方法。

LongAdder 类中的 longAccumulate() 方法

longAccumulate()Striped64 中重要的方法,实现不同的线程更新各自 Cell 中的值,其实现逻辑类似于分段锁,具体代码如下:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    // 扩容意向,collide=true可以扩容,collide=false不可扩容
    boolean collide = false;
    // 自旋,一直到操作成功
    for (;;) {
        // as 表示cells引用
        // a 表示当前线程命中的Cell
        // n表示cells数组长度
        // v 表示期望值
        Cell[] as; Cell a; int n; long v;
        // true表示下标位置的Cell为null,需要创建Cell
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // cells 数组没有处于创建,扩容阶段
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // 当前线程竞争失败,wasUncontended为false
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 当前线程rehash过哈希值,CAS更新cell
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                break;
            // 调整扩容意向,然后进入下一轮循环
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            // 设置扩容意向为true,但是不一定真的发生扩容
            else if (!collide)
                collide = true;
            // 真正扩容的逻辑
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0; // 释放锁
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); //重置当前线程的Hash值
        }
        // cells还未初始化,并且cellsBusy加锁成功
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 当前线程cellsBusy加锁失败,表示其它线程这鞥在初始化cells
        // 所以当前线程将值累加到base,注意add()方法调用此方式时fn为null
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

LongAddr 类的 casCellsBusy 方法

casCellsBusy()方法的代码很简单,就是将 cellsBusy 成员的值改为1,表示目前的cells数组在初始化或扩容中。代码如下:

final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

casCellsBusy() 方法相当于锁的功能,当线程需要 cells 数组初始化或扩容时,需要调到 casCellsBusy() 方法,通过CAS方式将 cellsBusy 成员的值改为1,如果修改失败,就表示其它的线程正在进行数组初始化或扩容的操作。在 cells 数组初始化或扩容的操作执行完成之后,cellsBusy 成员的值被改为0,这时不需要进行CAS修改,直接修改即可,因为不存在争用。

当 cellsBusy 值为1时,表示cells数组正在被某个线程执行初始化或扩容操作,其它线程不能进行以下操作:

  1. 对cells数组初始化。
  2. 对cells数组扩容。
  3. 如果cells数组中某个元素为null,就为该元素创建新的Cell对象,因为数组的结构正在修改,所以其他线程下面不能创建新的Cell对象。

相关文章

网友评论

      本文标题:JAVA 多线程与高并发学习笔记(十)——LongAdder

      本文链接:https://www.haomeiwen.com/subject/rsyswrtx.html