美文网首页Java知识污力_Java多线程&锁
ConcurrentHashMap之扩容实现(基于JDK1.8)

ConcurrentHashMap之扩容实现(基于JDK1.8)

作者: 豆角同學 | 来源:发表于2018-07-01 09:26 被阅读62次

    1. 概述

    ConcurrentHashMap是JDK提供的一种线程安全的HashMap实现,JDK1.8对ConcurrentHashMap进行了大量优化,除了增加了函数式编程特性,还对加锁方式进行了优化,它抛弃了JDK1.6中分段锁的设计,而是直接对Map中Table数组的每个节点进行加锁,进一步减少了锁粒度,并且不再采用ReentrantLock加锁 ,直接使用synchronized同步块(JDK1.6开始已经对synchronized 做了大量优化,加入了自旋锁、偏向锁、轻量级锁、重量级锁等)。

    为了提高查询效率,采用了数组+链表+红黑树的设计,当链表中的元素个数大于64,且数组中链表节点长度大于8,则会自动把链表转化为红黑树,当两个条件有一个不满足时,会回退到数组+单链表的数据结构。

    在JDK1.8的实现中,还实现了并发扩容机制,也就是可以由多个线程同时帮助扩容,加速数据转移过程,极大提升了效率,本文尝试着把这个扩容过程解释清楚。

    2. 实现原理

    2.1 基础

    首先介绍一下CocurrentHashMap中几个重要变量,这些变量在原子更新、并发扩容控制以及统计元素个数方面发挥着重要作用。

    // 代表Map中元素个数的的基础计数器,当无竞争时直接使用CAS方式更新该数值 
    transient volatile long baseCount; 
    /** 
    * sizeCtl用于table初始化和扩容控制,当为正数时表示table的扩容阈值(n * 0.75),当为负数时表示table正在初始化或者扩容,
    * -1表示table正在初始化,其他负值代表正在扩容,第一个扩容的线程会把扩容戳rs左移RESIZE_STAMP_SHIFT(默认16)位再加2更新设置到sizeCtl中(sizeCtl = (rs << 16) + 2),
    * 每次一个新线程来扩容时都令sizeCtl = sizeCtl + 1,因此可根据sizeCtl计算出正在扩容的线程数,注释中所
    * 描述的 sizeCtl = -(1+threads)是不准确的。扩容时sizeCtl有两部分组成,第一部分是扩容戳,占据sizeCtl的高有效位,长度为
    * RESIZE_STAMP_BITS位(默认16),剩下的低有效位长度为32-RESIZE_STAMP_BITS位(16),每个新线程协助扩容时sizeCtl+1
    * ,直到所有的低有效位被占满,低有效位默认占16位(最高位为符号位),所以扩容线程数默认最大为65535
    */
    transient volatile int sizeCtl; 
    /** 
    * 用于控制多个线程去扩容时领取扩容子任务,每个线程领取子任务时,要减去扩容步长,如果能减成功,
    * 则成功领取一个扩容子任务,`transferIndex = transferIndex - stride(扩容步长)`,transferIndex减到0时
    * 代表没有可以领取的扩容子任务。
    */
    transient volatile int transferIndex; 
    // 扩容或者创建CounterCells时使用的自旋锁(使用CAS实现);
    transient volatile int cellsBusy; 
    /** 
    * 存储Map中元素的计数器,当并发量较高时`baseCount`竞争较为激烈,更新效率较低,所以把变化的数值
    * 更新到`counterCells`中的某个节点上,计算size()时需要统计每个`counterCells`的大小再加上`baseCount`的数值。
    */
    transient volatile CounterCell[] counterCells;
    /**
    * ConcurrentHashMap采用cas算法进行更新变量(table[i],sizeCtl,transferIndex,cellsBusy等)来保证线程安全性,它其实是一种乐观策略,
    * 假设每次都不会产生冲突,所以能够直接更新成功,如果出现冲突则再重试,直到更新成功。实现cas主要是借助了`sun.misc.Unsafe`类,该类提供了
    * 诸多根据内存偏移量直接从内存中读取设置对象属性的底层操作。
    */
    static final sun.misc.Unsafe U;
    

    下面是ConcurrentHashMap中的重要常量的含义及功能说明

    // HashMap的最大容量(table数组的长度):2^30,因为hashCode最高两位用于控制目的,因此hashCode最大取值为2^30,
    // 所以table数组长度n > hashCode,hashCode & (n-1)时无法索引到数组后面的节点上
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // 负载因子,最大容量为数组长度*负载因子,元素个数超过容量则触发扩容
    private static final float LOAD_FACTOR = 0.75f;
    // 链表长度大于8时链表转化为红黑树
    static final int TREEIFY_THRESHOLD = 8;
    // 红黑树节点数小于6时红黑树转化为单链表
    static final int UNTREEIFY_THRESHOLD = 6;
    // 容量大于64时转化为红黑树
    static final int MIN_TREEIFY_CAPACITY = 64;
    // 最小转移步长:由于在扩容过程中,会把一个待转移的数组分为多个区间段(转移步长),每个线程一次转移一个区间段的数据,
    // 一个区间段(转移步长)的默认长度是16,实际运行过程中会动态计算
    private static final int MIN_TRANSFER_STRIDE = 16;
    /** 
    * 扩容戳有效位数:每次在需要扩容的时会根据当前数组table的大小生成一个扩容戳,当一个线程需要
    * 协助扩容时需要实时计算扩容戳来验证是否
    * 需要协助扩容或扩容过程是否完成,生成扩容戳的方式:Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    * 其中n表示当前table的大小,利用该常量表示扩容戳的有效位长度,默认16位。
    */
    private static int RESIZE_STAMP_BITS = 16;
    // 最大并发扩容线程数:65535
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    // 扩容戳移位大小:sizeCtl为int类型,长度为32位,扩容戳有效位有RESIZE_STAMP_BITS位(默认16位),
    // 所以把扩容戳移到sizeCtl最高有效位时需要移位的个数为: 32 - RESIZE_STAMP_BITS
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    

    ConcurrentHashMap对table数组中的数据节点查询更新由Unsafe类实现,每次查询更新都是从内存中直接取数据,借助了如下3个小函数:

    • 获取table中index为i的节点
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    • table中index为i的节点更新为v(cas原子更新方式)
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    • 把table中index为i的节点更新为v(普通更新方式)
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
            U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    

    2.2 实现过程

    由于扩容(resize)过程是在向Map中插入节点时所触发的数据转移过程,所以直接从插入数据的内部实现方法putVal()开始介绍。

    CocurrentHashMap中的put()和putIfAbsent()方法内部都是借助putVal()方法实现,所以我们只需要看一下putVal()的实现过程。

    /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());//此处进行了一次重哈希(高16位与低16位异或),减少哈希碰撞
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果插入的位置还没有节点,则使用cas方式直接把节点设置到该位置上
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED) // 如果要插入的位置的节点是转移节点,说明Map正在扩容,则协助扩容
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {  // 对节点f加锁
                        if (tabAt(tab, i) == f) { // 加锁成功
                            if (fh >= 0) {        // hashCode>0,代表节点为单链表
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) { //key在Map中已经存在,更新key的值
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {    //新节点插入到链表的最后一个位置
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {   //如果为红黑树节点
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) { // 向红黑数中插入一个节点
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        if (binCount >= TREEIFY_THRESHOLD)//如果链表节点数量大于阈值(默认8),则转化为红黑树
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount); // 节点插入完成后,更新Map中节点总数
            return null;
        }
    

    从上述代码可以总结出插入数据节点的过程:

    1. 首先是根据key的hashCode做一次重哈希(进一步减少哈希碰撞)
    2. 先判断table为空,空则初始化Map,否则:
    3. 根据hashCode取模定位到table中的某个节点f,如果f为空,则新创建一个节点,使用cas方式更新到数组的f节点上,插入结束,否则:
    4. 若f是转移转移节点,则调用helpTransfer协助转移,否则:
    5. 锁定节点f(通过synchronized加锁)
    6. 节点f锁定成功后判断节点f类型,如果f是链表节点,则直接插入到链表底端(key不存在的话),如果节点f是红黑树节点,则按照二叉搜索树的方式插入节点,并调整树结构使其满足红黑规则
    7. 最后调用addCount更新Map中的节点计数。

    所以接下来要讲一下addCount方法:

    addCount方法用于更新Map中节点计数,更新节点个数时也做了对应的优化,其中采用了和LongAdder一样的实现方式,具体实现过程可以看LongAdder的实现过程;元素个数更新完成后再判断是否需要扩容,我主要对判断开始扩容或协助扩容的各种条件进行解释。

    /**
         *  
         * @param x the count to add
         * @param check if <0, don't check resize, if <= 1 only check if uncontended
         */
        private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount(); //统计map中的元素个数
            }
            if (check >= 0) {          // 检查是否需要协助扩容
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) { // map中的元素个数已经大于扩容阈值且小于最大阈值,table需要扩容
                    int rs = resizeStamp(n); //计算扩容戳
                    if (sc < 0) {      // 表示正在扩容或者table初始化
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||    // sizeCtl无符号右移16位得到扩容戳,扩容戳不同说明当前线程已经滞后其他线程,其他线程已经开启了新一轮扩容任务,不能再去扩容,sc == rs + 1 目前没看懂
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||   //扩容线程数大于最大扩容线程数,nextTable为空表示没有在扩容,不需要协助
                            transferIndex <= 0)         // transferIndex < 0 表示其他线程已经把扩容子任务领取完毕,也不需要协助扩容
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 使用cas方式把sizeCtl加1,代表增加一个协助扩容的线程,并令当前线程去协助扩容,当前线程协助完成后需要把sizeCtl减1,所以sizeCtl<0时可以利用sizeCtl计算出扩容线程的个数
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2)) //当前没有线程在扩容,则把扩容戳rs 左移16位加2得到一个负值,用cas方式更新到sizeCtl中,更新成功则作为第一个扩容线程执行扩容任务
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }
    

    helpTransfer 主要用于通过计算来验证Map是否需要协助扩容,如果Map正在扩容且扩容未结束则协助扩容,并通过transfer执行扩容过程。

    /**
         * Helps transfer if a resize is in progress.
         * 当其他线程进来时发现当前Map正在扩容,则判断是否需要帮助扩容
         */
        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            if (tab != null && (f instanceof ForwardingNode) &&              
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//当前table数组和nextTable数组都不为空,而且当前节点为ForwardingNode,说明HashMap正在扩容
                int rs = resizeStamp(tab.length);   //以当前数组的大小通过移位的方式生成扩容戳,保证每次扩容过程都生成唯一的扩容戳
                while (nextTab == nextTable && table == tab && //指向table的指针和nextTab的指针没有被其他线程更新
                       (sc = sizeCtl) < 0) { // sizeCtl小于0时,可以通过移位反解出正在扩容的线程数,代表正在扩容,大于0时代表下次扩容的阈值
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs    //sc右移16位若等于rs,说明没有线程在扩容
                           || sc == rs + 1 || sc == rs + MAX_RESIZERS  //扩容线程数超过限制
                           || transferIndex <= 0)         //transferIndex用于每次分配数据转移任务的上界,如果小于0则说明没有可以分配的数据转移任务
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//cas方式更新sc,更新成功则扩容线程数+1,并开始帮助转移任务
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
    

    transfer方法为实际的扩容实现,实现过程有些复杂,但如果认真看了前面关于控制变量sizeCtl、扩容戳rs以及转移索引transferIndex的相关注释说明,应该不难理解。

    /**
         * Moves and/or copies the nodes in each bin to new table. See
         * above for explanation.
         */
        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            //根据table的长度及cpu核数计算转移任务步长
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 计算转移步长并判断是否小于最小转移步长
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            if (nextTab == null) {            // 第一个扩容线程进来需要初始化nextTable
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            boolean advance = true; 
            boolean finishing = false; // to ensure sweep before committing nextTab
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing) //当前索引已经走到了本次扩容子任务的下界,子任务转移结束
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {//任务转移完成
                        i = -1;
                        advance = false;
                    }
                   //通过cas方式尝试获取一个转移任务(transferIndex - 转移步长stride),获取成功后得到处理的下界及当前索引
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;  // 更新当前子任务的下界
                        i = nextIndex - 1;  // 更新当前index位置  
                        advance = false;
                    }
                }
                if (i < 0 || i >= n || i + n >= nextn) { // 扩容结束
                    int sc;
                    if (finishing) {             // 最后一个出去的线程:更新table指针及sizeCtl值
                        nextTable = null;
                        table = nextTab;     // 指向扩容后的数组
                        sizeCtl = (n << 1) - (n >>> 1);  //sizeCtl更新为最新的扩容阈值(2n - 0.5n = 1.5n =  2n * 0.75),移位实现保证高效率
                        return;
                    }
                    // sizeCtl减1,表示减少一个扩容线程
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        // 判断是否是最后一个扩容线程,如果不是则直接退出,由于第一个线程进来时把扩容戳rs左移16位+2更新到sizeCtl,所以如果是最后一个线程的话,sizeCtl -2 应该等于rs左移16位
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;//如果是最后一个线程,则结束标志更新为真,并且在重新检查一遍数组
                        i = n; // recheck before commit
                    }
                }
                else if ((f = tabAt(tab, i)) == null)    // 当前桶节点为空,设置为转移成转移节点
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)   // 该桶节点已经被转移
                    advance = true; // already processed
                else {
                    synchronized (f) {         // 获取该节点的锁
                        if (tabAt(tab, i) == f) {// 获取锁之后再次验证是否被其他线程修改过
                            Node<K,V> ln, hn;
                            if (fh >= 0) {        // 节点HasCode大于0 代表该节点为链表节点
                                // 由于数组长度n为2的幂次方,所以当数组长度增加到2n时,原来hash到table中i的数据节点在长度为2n的table中要么在低位nextTab[i]处,要么在高位nextTab[n+i]处,具体在哪个位置与(fh & n)的计算结果有关
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                // 此处循环的目的是找到链表中最后一个从低索引位置变到高索引位置或者从高索引位置变到低索引位置的节点lastRun,从lastRun节点到链表的尾节点可根据runBit直接插入到新数组nextTable的节点中,其目的是尽量减少新创建节点数量,直接更新指针位置
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                               // 对于lastRun之前的链表节点,根据hashCode&n可确定即将转移到nextTable中的低索引位置节点(nextTab[i])还是高索引位置节点(nextTab[i + n]),并形成两个新的链表
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                // 使用cas方式更新两个链表到新数组nextTable中,并且把原来的table节点i中的数值变为转移节点
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {   //该节点为二叉搜索数节点(红黑树)
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    

    transfer数据转移过程可以分为如下几个步骤:

    1. 第一个扩容线程进来后创建nextTable数组,并设置transferIndex;
    2. 线程(第一个或其他)通过transferIndex-stride(扩容步长)来领取一个扩容子任务,transferIndex减到0说明所有子任务领取完成;
    3. 线程领取到扩容子任务后设置当前处理子任务的下界并更新当前处理节点所在的索引位置;
    4. 对子任务中的每个节点,扩容线程从后向前依次判断该节点是否已经转移,如果没有转移,则对该节点进行加锁,并且把节点对应的链表或红黑树转移到新数组nextTable中去;
    5. 如果线程处理的节点索引已经到达子任务的下界,则子任务执行结束,并尝试去领取新的子任务,若领取不到再判断当前线程是否是最后一个扩容线程,若是则最后扫描一遍数组,执行清理工作,否则直接退出。

    3. 总结

    JDK1.8对ConcurrentHashMap做了大量优化,本文只是对其中如果进行多线程并发扩容的过程做了详解,其他方面比如高效更新元素个数(类似LongAdder)以及红黑树的调整将在其他文章中做详细的解释。

    相关文章

      网友评论

        本文标题:ConcurrentHashMap之扩容实现(基于JDK1.8)

        本文链接:https://www.haomeiwen.com/subject/yzptyftx.html