美文网首页并发编程专题
(十二) ConcurrentHashMap分段锁设计原理

(十二) ConcurrentHashMap分段锁设计原理

作者: 跟着Mic学架构 | 来源:发表于2021-09-27 10:51 被阅读0次

    内容目录

    分段锁设计提高统计元素数量的性能

    最后一个部分,也是ConcurrentHashMap中设计比较巧妙的地方。我们知道,当调用完put方法后,ConcurrentHashMap必须会增加当前元素的个数,方便在size()方法中获得存储的数据大小。代码的实现如下。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //省略部分代码....
        addCount(1L, binCount);
        return null;
    }
    

    很多读者可能会问,这块代码有什么好值得分析的呢?请大家思考一个问题,在常规的集合中,我们只需要一个全局int类型的字段去保存元素个数即可,每次添加一个元素,就对这个size变量+1。但是在ConcurrentHashMap集合中,需要保证对于该变量修改的线程安全性,那怎么设计呢?难道又用锁来实现?很显然,通过前面的分析,使用同步锁带来的性能开销太大,所以不适合。因此,在ConcurrentHashMap中,采用的是自旋锁和分段锁的设计。

    size计数的基本原理分析

    在ConcurrentHashMap中,采用两种方式来保存元素的个数。

    • 当线程竞争不激烈时,直接对baseCount+1来增加元素个数。
    • 当线程竞争比较激烈时,通过构建一个CounterCell数组,默认长度是2,然后通过随机算法选择一个CounterCell,针对该CounterCell中的value进行保存。
    private transient volatile long baseCount;
    private transient volatile CounterCell[] counterCells;
    

    元素个数累加的整体流程如下图所示。

    在这里插入图片描述

    addCount方法分析

    addCount方法的完整代码如下,从整体结构来看,包含两个部分的逻辑。

    • 对ConcurrentHashMap中元素个数进行累加。
    • 通过check>=0来判断是否需要扩容,这部分代码在前面分析tranfer()方法中有讲过,就不再赘述。
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    

    我们重点来分析下面这段增加元素个数的代码。

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
    }
    

    上述代码的处理逻辑如下:

    • 在if判断中,第一步就是先通过U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)修改全局成员变量baseCount进行累加。这个方法在线程竞争的情况下会返回false。
    • 如果第一步执行失败,则尝试使用CounterCell进行累加,这里面有几个逻辑。
      • 在if判断中通过as=counterCells把当前用来记录元素个数的全局变量counterCells赋值给了as。
      • 在第二个if判断中,有几个具体的判断逻辑,这些判断逻辑任何一个为true,都会调用fullAddCount方法去实现元素个数累加。判断逻辑简单说明一下。
        • as==null 说明CounterCell数组还未初始化。
        • (m = as.length - 1) < 0 ,这个判断理论上来说不存在,因为一旦CounterCell被初始化,就意味着as.length长度是2,所以不可能出现结果小于0的情况,如果各位读者有其他理解可以反馈给笔者。
        • (a = as[ThreadLocalRandom.getProbe() & m]) == null说明CounterCell数组已经创建了,但是通过探针hash定位在数组中中没有对象实例,说明这个数组中还存在没有CounterCell实例对象的情况。
        • U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)执行到了这个判断逻辑,说明当前CounterCell数组每个位置都有一个CounterCell实例对象,直接通过CAS操作针对上一个步骤获取到的CounterCell的value值进行累加,如果失败,说明存在竞争。
    • sumCount();返回总的元素个数,实际上应该就是CounterCell数组和baseCount两者累加计算的结果。

    细心的读者不难发现,在if判断中,首先是执行(as = counterCells) != null,然后再尝试对baseCount做累加。这里有一个概率性的问题,就是当一个集合发生过并发时,后续发生并发的可能性会越高,这种思想在并发编程以及很多应用场景中都有体现。

    fullAddCount方法分析

    fullAddCount的主要功能有几个。

    • 如果CounterCell数组还未初始化,则先初始化。
    • 如果已经初始化,则随机找到其中一个值进行累加更新。
    • 如果线程竞争加剧,则会尝试对CounterCell数组进行扩容。

    接下来,笔者分别针对这三个部分进行分析。

    第一个部分,初始化CounterCell

    CounterCell数组初始化处理不难理解。

    • cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)这个部分,是通过CellsBusy字段来表示抢占到锁的标记,通过CAS修改CellsBusy=1来表示占有状态。
    • CounterCell[] rs = new CounterCell[2];构造一个长度为2的CounterCell数组。
    • rs[h & 1] = new CounterCell(x);,把当前增加的元素个数x保存到rs[h&1]的位置。
    • counterCells = rs;rs赋值给全局对象counterCells
    private final void fullAddCount(long x, boolean wasUncontended) {
        //省略部分代码....
       for (;;) {
           CounterCell[] as; CounterCell a; int n; long v;
           if ((as = counterCells) != null && (n = as.length) > 0) {
               //省略部分代码....
           }
           else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
               boolean init = false;
               try {                           // Initialize table
                   if (counterCells == as) {
                       CounterCell[] rs = new CounterCell[2];
                       rs[h & 1] = new CounterCell(x);
                       counterCells = rs;
                       init = true;
                   }
               } finally {
                   cellsBusy = 0;
               }
               if (init)
                   break;
           }
       }
    }
    

    第二个部分,增加元素个数

    如果CounterCell数组已经初始化了,也存在两种情况。

    第一种情况是通过(a = as[(n - 1) & h]) == null计算到某个数组下标存在null的对象时,那么直接把当前要增加的元素个数x保存到数组中的某一个对象中即可。

    • (as = counterCells) != null && (n = as.length) > 0表示counterCells数组已经完成了初始化。
    • CounterCell r = new CounterCell(x);先创建一个CounterCell对象,把x保存进去。
    • U.compareAndSwapInt(this, CELLSBUSY, 0, 1)当前线程占用锁。
    • rs[j] = r,把新构建的保存了元素个数xCounterCell对象保存到rs[j]的位置。
    private final void fullAddCount(long x, boolean wasUncontended) {
        //省略部分代码....
       for (;;) {
           CounterCell[] as; CounterCell a; int n; long v;
           if ((as = counterCells) != null && (n = as.length) > 0) {
               if ((a = as[(n - 1) & h]) == null) {
                   if (cellsBusy == 0) {            // Try to attach new Cell
                       CounterCell r = new CounterCell(x); // Optimistic create
                       if (cellsBusy == 0 &&
                           U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                           boolean created = false;
                           try {               //Recheck under lock
                               CounterCell[] rs; int m, j;
                               if ((rs = counterCells) != null &&
                                   (m = rs.length) > 0 &&
                                   rs[j = (m - 1) & h] == null) {
                                   rs[j] = r;
                                   created = true;
                               }
                           } finally {
                               cellsBusy = 0;
                           }
                           if (created)
                               break;
                           continue;           // Slot is now non-empty
                       }
                   }
                   collide = false;
               }
               //省略部分代码....
           }
           //省略部分代码....
       }
    }
    

    第二种情况,就是直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)操作,对CounterCell指定位置的元素进行累加。

    private final void fullAddCount(long x, boolean wasUncontended) {
        //省略部分代码....
       for (;;) {
           //由于指定下标位置的cell值不为空,则直接通过cas进行原子累加,如果成功,则直接退出
           else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//
               break;
       }
      //省略部分代码....
    }
    

    第三个部分,CounterCell数组扩容

    如果竞争比较激烈,也就是通过多次自旋(也就是线程无法满足上述的判断条件时),就会触发CounterCell的扩容。

    • cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)抢占锁。
    • CounterCell[] rs = new CounterCell[n << 1];在原有的基础上扩容一倍,再通过for循环进行数据迁移。
    • counterCells = rs;把扩容后的对象赋值给counterCells
    private final void fullAddCount(long x, boolean wasUncontended) {
        //省略代码....
        else if (cellsBusy == 0 &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            try {
                if (counterCells == as) {// Expand table unless stale
                    //扩容一倍 2变成4
                    CounterCell[] rs = new CounterCell[n << 1];
                    for (int i = 0; i < n; ++i)
                        rs[i] = as[i];
                    counterCells = rs;
                }
            } finally {
                cellsBusy = 0;//恢复标识
            }
            collide = false;
            continue;//继续下一次自旋
        }
        //省略部分代码....
    }
    

    size()方法结果汇总

    基于前面的分析,基本上我们也能猜测出它的整体实现,无非就是把baseCountCounterCell保存的value进行累加即可。接下来我们看一下size()方法的定义如下。

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    主要看一下sumCount这个方法,实现原理和我们猜想的一致。

    • 先得到baseCount的值,保存到sum这个字段中。
    • 遍历CounterCell数组,把数组中每个CounterCell中存储的value进行累加。

    我们发现,size()方法在计算总的元素个数时,并没有加锁,所以size()方法返回的元素个数不一定代表总的容量。

    相关文章

      网友评论

        本文标题:(十二) ConcurrentHashMap分段锁设计原理

        本文链接:https://www.haomeiwen.com/subject/kzyhnltx.html