美文网首页
ConcurrentHashMap源码分析

ConcurrentHashMap源码分析

作者: 奔跑地蜗牛 | 来源:发表于2019-05-20 22:37 被阅读0次

    前言

    本文中的ConcurrentHashMap源码分析版本是jdk1.8;

    问题

    在对ConcurrentHashMap进行源码分析之前,首先需要确定几个问题,也就是ConcurrentHashMap相对HashMap来说需要解决的问题:

      1. 如何保证高并发时数据插入的同步性以及高效性?
      1. 如何保证高并发场景下扩容时,不会重复扩容?
      1. 如何保证高并发场景下扩容时,如何处理数据读取和插入?

    ConcurrentHashMap常量说明

    要理解ConcurrentHashMap,首先需要对其常量和变量有一定认识,必须知道这是用来干什么的,什么场景下使用;

    • MAXIMUM_CAPACITY = 1 << 30;表示哈希表最大容量为2^30,因为32位bit中前两位bit是被用作控制目的的;
    • DEFAULT_CAPACITY = 16;表示哈希表默认容量是16,
    • MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;表示虚拟机栈数组最大长度,JDK1.8版本后concurrenthashMap只是在toArray相关的方法使用到;
    • DEFAULT_CONCURRENCY_LEVEL = 16;默认并发等级,当前版本不怎么使用,主要是为了兼容之前版本;
    • LOAD_FACTOR = 0.75f;哈希表加载因子,计算加载因子容量时可以利用n-(n>>2)表达式轻松计算出来;
    • TREEIFY_THRESHOLD = 8; bin链表转化成红黑树的阈值;超过该值后查找红黑树相对链表来说要效率高些

    bin:箱子或者柜子,可以理解为装node的箱子,每个箱子里面挂着一个node的链表或者红黑树;

    • UNTREEIFY_THRESHOLD = 6;bin红黑树转换成链表的阈值;当nodes数量小于该值时红黑树将转化为链表;这时查找链表速率更快些;
    • MIN_TREEIFY_CAPACITY = 64;表示哈希表容量大于该值时才会将链表转换成红黑树,这是bin链表转换成红黑树的第二个条件(如果一个bin中nodes太多的话,会选择扩容而不是转化成红黑树);

    为什么是64呢?因为避免在扩容和转换成红黑树时的冲突,因为哈希表容量较小时是比较容易进行扩容操作的,如果一个bin中nodes比较多就转换成红黑树,那么发生扩容之后,又得转换成链表,会造成很多不必要的计算,所以为了平衡两者的冲突,需要确定一个数组容量阈值;

    • MIN_TRANSFER_STRIDE = 16;表示扩容时一个线程需要转移bins的最小长度为16,因为扩容时如果bins长度比较大,那么可以借助多线程并发转移bins,转移bins指的是扩容后根据bin中node的hash值与扩容的数组长度相与后确定新的bin,然后将该node转移到新的bin去;
    • RESIZE_STAMP_BITS = 16;每次扩容时用来在sizeCtl中记录扩容戳的位数,不同轮次的扩容操作的扩容戳都是不同的,扩容戳可以保证多次扩容操作不会交叉重叠,该变量不是常量也不提供修改方法;
    • MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;表示扩容时最多允许2^16-1个线程进行transfer工作;
    • RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;扩容戳移位量,扩容时将扩容戳进行移位用来当做扩容线程计数的基数,通过反向移位后可以计算出扩容戳;
    • MOVED = -1,表示扩容时FORWARDING节点的hash值,FORWARDING节点是一个临时节点,只在扩容时出现,如果旧数组中某个bin(箱子)的nodes全部都迁移到了新数组中,旧数组就会在该bin头部放一个forwarding节点;如果读线程在进行读操作或者迭代读时遇到了该节点,就会被转移到新数组上执行读操作,如果写线程遇到该节点,则会尝试协助扩容;
    • TREEBIN=-2,表示TreeBin的hash值,TreeBin是ConcurrentHashMap中用来代理操作TreeNode的特殊节点,拥有存储实际数据的红黑树的根节点;
    • RESERVED = -3;保留节点,也就是站位符,一般情况下不会出现,在jdk1.8版本中新的函数式方法computeIfAbsent和compute中才会出现;
    • HASH_BITS=0x7ffffff;用于将节点hash值为负数的转化为正数,hashtable中利用hash值定位bin也是利用该方法;

    ConcurrentHashMap变量说明

    • volatile Node<K,V>[] nextTable;nextTabel!=null表示当前ConcurrentHashMap正在扩容,还有些扩容线程没有完全退出;

    • volatile int sizeCtl;用来控制ConcurrentHashMap初始化和扩容的;
      sizeCtl=-1表示进行初始化;
      sizeCtl=-(1+n)时表示扩容时的活跃线程数为n,n+1为sizeCtl的低32 - RESIZE_STAMP_BITS位值代表的数值,如只有一个线程第一次进行扩容时,sizeCtl=-2145714174,那么其二进制为
      -2145714174=1000 0000 0001 1011 0000 0000 0000 0010,
      其低32 - RESIZE_STAMP_BITS=32-16=16位的补码为0000 0000 0000 0010,刚好为2;
      如果该值减少2如果使得活跃线程位数代表的值为零,说明扩容已经结束;
      sizeCtl>0时表示初始化中正在使用的数组初始容量值或者初始化后下一次初始化数组需要的容量值;
      sizeCtl=0表示初始化中数组默认使用的是初始化容量值;

    • volatile int transferIndex:表示上一个transfer任务结束的位置或者数组长度值,ConcurrentHashMap将bin数组划分为长度为stride的m(m=n%stride)个transfer任务,第k(1<=K<=m)个transfer任务处理[(k-1)* stride,k* stride-1]个bins,而且为了尽量避免和迭代处理相同的bin,transfer任务是从后往前执行的,所以每次设置transferIndex都是k*stride,第一次设置transferIndex为bin数组的长度n,从n-1开始处理bin转移;第二次transferIndex则为n-stride;每次都是通过cas机制避免多线程处理同一个transfer任务;

    • volatile long baseCount:计数器基本值,主要在没有多线程竞争情况下使用,需要通过CAS机制更新;

    • volatile CounterCell[] counterCells:CAS自旋锁标志位,用于初始化,或者counterCells扩容时

    • volatile int cellsBusy:高并发的计数单元,如果进行了初始化,那么长度和bin数组一样都是2^n;

    ConcurrentHashMap方法解析

    putVal()解析

    putVal有三个参数K key,V value,boolean onlyIfAbsent,onlyIfAbsent默认为false,当putIfAbsent方法调用时则为ture;

      final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            //将key的hashcode进行平铺,也就是为了避免hash值低位与数组长度n-1相与可能会有更多的hash collide,因此将hash值高16位和低16位先进行或运算
            int hash = spread(key.hashCode());
            int binCount = 0;//用来计算bin链表长度
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh; K fk; V fv;
                if (tab == null || (n = tab.length) == 0)//初始化
                    tab = initTable();
                //将hash和数组长度n-1进行与运算找到bin的下标i,然后对该bin第一个Node进行判断
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果该node为空,则cas添加一个新node,成功则退出循环,否则继续;
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)//如果node的hash为moved,那么表示该bin正在扩容中,则协助扩容
                    tab = helpTransfer(tab, f);
                else if (onlyIfAbsent // check first node without acquiring lock
                         && fh == hash
                         && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                         && (fv = f.val) != null)//判断bin头node如果不能替换且old key和new key一致,则直接退出
                    return fv;
                else {//如果bin header node可以替换或者不等于key不一致
                    V oldVal = null;
                    synchronized (f) {//利用synchronized方法将header node加锁,避免竞争造成一致性
                        if (tabAt(tab, i) == f) {//如果header node尚未没有发生变化,避免另有线程更新该header node
                            if (fh >= 0) {//hash值大于零表示,该node是普通node
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {//从header node开始迭代找到key,hash值一致的node进行替换,找不到则在tail添加新node
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key, value);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {//如果header node是红黑树TreeBin
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                            else if (f instanceof ReservationNode)//保留节点一般情况不太可能出现
                                throw new IllegalStateException("Recursive update");
                        }
                    }
                    if (binCount != 0) {//如果binCount大于零即bin链表长度大于零
                        if (binCount >= TREEIFY_THRESHOLD)//判断是否大于红黑树转化阈值
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);//添加到高效计数器,另外根据binCount判断是否需要协助扩容或者红黑树转换
            return null;
        }
    

    transfer方法说明

    transfer方法其实划分为3个阶段:

      1. 根据cpu核数计算transfer任务步长stride和初始化nextTab;
      1. 当前线程申请transfer任务,transfer任务划分如下图:


        transfer任务.png
      1. 扩容操作
        源码解析如下:
      private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
             //计算transfer任务步长stride,最小为16,多线程情况下为n/(8*NCPU)        
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
                stride = MIN_TRANSFER_STRIDE; // subdivide range
            if (nextTab == null) {            // nextTab初始化
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;//长度增加两倍后复制给nextTable
                transferIndex = n;//一开始transferIndex等于原数组末尾index+1处
            }
            int nextn = nextTab.length;
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//转移节点
            boolean advance = true;//表示是否还能继续执行申请transfer任务
            boolean finishing = false; // 在提交nextTable之前用来保证所有transfer任务都已经完成
            for (int i = 0, bound = 0;;) {//i表示当前transfer任务初始化开始bin位置,bound表示当前transfer任务初始化结束bin的下标;
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;//定义下一个transfer任务开始bin的下标和结束bin的下标;
                    if (--i >= bound || finishing)
                        advance = false;
                    else if ((nextIndex = transferIndex) <= 0) {//如果nextIndex已经不大于零,那么说明transfer任务已经结束;
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSetInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {//尝试申请下一个transfer任务
                        //申请成功,则将bound赋值开始和结束bin的下标位置(i,bound)
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                //i<0表示transfer任务已经结束,i大于n表示存在新的扩容任务,
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {//执行本轮扩容的收尾工作
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);//将原数组长度扩大两倍-减去一半即是新数组长度的0.75;
                        return;
                    }
                    if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//尝试将sizeCtl减1,表明需要减少一个活跃扩容线程
                       //sc-2如果除了扩容戳以外的低位都是零的话,那么表示当前线程是否是最后一个扩容线程,如果是直接返回
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                            return;
                        finishing = advance = true;
                        i = n; // recheck before commit//重新检查先是否任务分配的hash bins都已经转移完毕
                    }
                }
                else if ((f = tabAt(tab, i)) == null)//如果当前bin为null,则直接放置一个forwardnode表明是扩容转移
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)//如果当前bin是forwardnode,那么表示出现了扩容重叠,需要重新申请transfer任务
                    advance = true; // already processed
                else {
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) {
                                //n=2^k,表示第k位为1,而对于数组长度为n,hash&n时第k位为零说明是新数组低位bin,而第k为1则说明是新数组高位bin;
                                int runBit = fh & n;
                                Node<K,V> lastRun = f;
                                for (Node<K,V> p = f.next; p != null; p = p.next) {//找到最后一个node
                                    int b = p.hash & n;
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {//根据lastRun生成node链
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                                        hn = new Node<K,V>(ph, pk, pv, hn);
                                }
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {//按照红黑树的方式转移bin
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }
    
    

    resizeStamp(n)解析

    resizeStamp方法是用来生成扩容戳的,对于不同数组长度n,则扩容戳不一样,同时每个扩容戳需要容纳最大为MAX_RESIZERS=(1 << (32 - RESIZE_STAMP_BITS)) - 1的活跃扩容线程,那么sizeCtl-2=resizeStamp(n)<<RESIZE_STAMP_BITS,就应该要能表示这三组信息:负值,表示不同数组长度的扩容戳和活跃线程数;
    先看下源码:

      static final int resizeStamp(int n) {
           //获取数组长度n前面零的个数一般为32-log(n),那么RESIZE_STAMP_BITS为扩容戳需要的位数
            return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
        }
    

    因为RESIZE_STAMP_BITS需要能展示出当前数组扩容次数,而数组可扩容次数最大值为32-log(n),所以RESIZE_STAMP_BITS>32>2^5,所以RESIZE_STAMP_BITS至少为6位;
    则假设n=2^k,那么经过resizeStamp之后得到的rs如下图:


    rs.png

    ;

    经过左移RESIZE_STAMP_SHIFT后如下图:


    sizeCtl.png

    如上图所示最高位为1表示负数,扩容戳的后五位表示当前扩容次数,32-扩容戳所占位数即可表示活跃扩容线程数,活跃扩容线程数为1时表示初始化,为2时表示当前扩容线程数为1;

    未完待续.....

    相关文章

      网友评论

          本文标题:ConcurrentHashMap源码分析

          本文链接:https://www.haomeiwen.com/subject/kezxzqtx.html