美文网首页
ConcurrentHashMap解析二(putVal方法的解析

ConcurrentHashMap解析二(putVal方法的解析

作者: 代码potty | 来源:发表于2018-09-26 16:30 被阅读0次

    ConcurrentHashMap中put()这个方法很容易引起并发操作的问题,现在来研究下put()方法的实现

    put方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
      //onlyIfAbsent跟HashMap一样,就是判断是否要覆盖,默认为false,覆盖
    final V putVal(K key, V value, boolean onlyIfAbsent) {
      //这句话可以看出,ConcurrentHashMap中不允许存在空值,这个是跟HashMap的区别之一
      //通过这个机制,我们可以通过get方法获取一个key,如果抛出异常,说明这个key不存在
        if (key == null || value == null) throw new NullPointerException();
        //ConcurrentHashMap中的hash值计算方法,跟HashMap中的差不多,不过最后的结果要
         //与0x7fffffff进行与操作,这个地方我还不明白为什么有这个与操作
        int hash = spread(key.hashCode());
        //binCount=0说明首节点插入,未进行链表或红黑树操作,因为后面会对这个值进行更改
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
        //如果数组为空或者长度为0,进行初始化工作
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
        //如果获取位置的节点为空,说明是首节点插入情况
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
          //如果hash值等于MOVEN(默认-1),说明是协助扩容,这个在后边讲解ForwardingNode类
          //进行进一步解析
            else if ((fh = f.hash) == MOVED)
            //协助扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
              //对桶的首节点进行加锁
                synchronized (f) {
                    //双重判定,为了防止在当前线程进来之前,i地址所对应对象已经更改
                    if (tabAt(tab, i) == f) {
                    //为什么fh一定要大于等于0,这个原因在于TreeBin的hash值的设定,TreeBin类型
                    //的hash值默认设置为了-2
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                              //在当前桶中找到位置跳出
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //当到桶的结尾还没找到,则新增一个Node
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果hash小于0,判断是否是TreeBin的实例
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
            //如果binCount值不等于0,说明进行了链表或者红黑树操作
                if (binCount != 0) {
                    //如果binCount大于8则进行树化,但真正的转换成红黑树不是8的长度
                    //当长度超过64才会真正的树化,处于8-64之间的还只是数组扩容
                    if (binCount >= TREEIFY_THRESHOLD)
                        //这个方法我在红黑树一节详细讲解了,不清楚的可以看红黑树那节
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
          //计数方法
        addCount(1L, binCount);
        return null;
    }
    

    我对这个方法进行了注释,可以直观的看代码进行了解,主要分析一下三个方法
    1、数组初始化方法initTable()
    2、线程协助扩容方法helpTransfer()
    3、计数方法addCount()

    数组初始化initTable()

    我们来看下这个方法的代码:

        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
              //判断是否数组为空或者数组长度为0(未初始化)
            while ((tab = table) == null || tab.length == 0) {
                  //如果sizeCtl值小于0,则每个进入到这里的线程要作线程让步操作
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                //SIZECTL是地址偏移量,和前边我们讲到的tabAt()那三个方法的地址偏移量一样
                //如果SIZECTL对应地址的值与sc相等,说明当前的线程是第一个到达这条语句的
                 //线程,那么就会将SIZECTL地址所对应的值替换成-1,而SIZECTL地址偏移量对应的  
               // 对象就是sizeCtl。
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                          //再次进行判断,防止在进行U.compareAndSwapInt(this, SIZECTL, sc, -1)的时候
                          //有其他线程并发进入方法,导致出错,使用双重锁
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        //如果未初始化,会将sizeCtl设置成为当前容量类似hashmap中阈值threshold
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    这个地方有点疑问,sizeCtl到底指代什么,是阈值还是容量,在传入容量的构造方法中,是直接将cap赋值给sizeCtl,也就是说在这里的sizeCtl是容量大小,而数组初始化的时候,又将容量的0.75倍赋值给了sizeCtl,找了下网上的资料
    引用这个链接(https://www.jianshu.com/p/c0642afe03e0)的说法:
    sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
    **-1 **代表table正在初始化
    **-N **表示有N-1个线程正在进行扩容操作
    其余情况:
    1、如果table未初始化,表示table需要初始化的大小。
    2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。
    也就是说,sizeCtl的值有这几种情况,跟hashmap中单一指代阈值和容量的不同

    线程协助扩容方法helpTransfer()

    讲这个方法之前,我们需要先了解一些其他的类与方法,首先是ForwardingNode,一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。

        static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                //MOVEN默认值-1
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
    
            Node<K,V> find(int h, Object k){...}
        }
    

    然后是方法resizeStamp(),这个方法是计算校验码的

        static final int resizeStamp(int n) {
            return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
        }
    

    好,现在我们来看下helpTransfer()方法的使用:

        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                int rs = resizeStamp(tab.length);
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }   
    

    首先判断是否能够协助扩容的条件
    1、数组不为空
    2、传入的首节点是ForwardingNode的实例
    3、该实例的nextTable类型不为空
    在这三个条件的前提下,进行下一步,通过resizeStamp()方法拿到一个校验码,然后再判断sizeCtl的值是否小于0,因为sizeCtl负数表示正在扩容。

    停止扩容的四个条件(这部分我还没理解,等后面理解了回来补充):
    1、(sc >>> RESIZE_STAMP_SHIFT) != rs
    2、sc == rs + 1
    3、 sc == rs + MAX_RESIZERS
    4、transferIndex <= 0

    如果符合扩容条件,就使用U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)在sc上加一,然后调用transfer()方法进行数据迁移,这个方法比较复杂,到时候拿出来单独讲.
    参考链接:
    https://blog.csdn.net/u011392897/article/details/60479937

    计数方法addCount()

        private final void addCount(long x, int check) {
                CounterCell[] as; long b, s;
                //计数部分
                if ((as = counterCells) != null ||
                    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                    CounterCell a; long v; int m;
                    boolean uncontended = true;
                    if (as == null || (m = as.length - 1) < 0 ||
                        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                        !(uncontended =
                          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                        fullAddCount(x, uncontended);
                        return;
                    }
                    if (check <= 1)
                        return;
                    s = sumCount();
                }
              //扩容部分
                if (check >= 0) {
                    Node<K,V>[] tab, nt; int n, sc;
                    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                           (n = tab.length) < MAXIMUM_CAPACITY) {
                        int rs = resizeStamp(n);
                        if (sc < 0) {
                            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                                transferIndex <= 0)
                                break;
                            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                                transfer(tab, nt);
                        }
                        else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                     (rs << RESIZE_STAMP_SHIFT) + 2))
                            transfer(tab, null);
                        s = sumCount();
                    }
                }
            }       
    

    这个方法分为两部分,第一部分是计数,第二部分是扩容
    第一部分计数,
    当counterCells为空并且U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)返回true,则直接到下一部分是否扩容的判断,否则 CAS 失败后转入更新 counterCells ,防止CAS 自旋,使用的方法是fullAndCount(),这个方法取自striped64的LongerAdd的方法。
    关于striped64的解析详情可以看以下链接:
    http://brokendreams.iteye.com/blog/2259857
    说到这个,我就不得不提一下这个类CounterCell,这个类代码如下:

        @sun.misc.Contended static final class CounterCell {
            volatile long value;
            CounterCell(long x) { value = x; }
        }
    

    我们看到这个类用@sun.misc.Contended进行了注释,这个注释是自动填充缓存行用的,为什么要自动填充缓存行?
    因为这样可以防止伪共享的情况,那么什么是伪共享呢?
    我解释一下,就是当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享,也就是避免头结点和为节点加载到同一个缓存行,使头尾节点在修改时不会互相锁定,基于此,我们需要自动填充缓存行。
    这个类中我们可以看到只有一个volatile修饰符的变量value,也就是单个counterCell对象累加值

    第二部分扩容,
    满足当前容量大于sizeCtl值并且数组不为空,数组长度小于最大容量值的时候,就开始扩容,跟helpTransfer()方法中一样,这里也进行了同样的判定,当线程刚进来的时候,sc是正的,所以执行else if的语句,U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)
    这条语句将sizeCtl直接赋值成了一个负数,如果赋值成功,则调用数据迁移方法transfer().
    在代码的最后有个s = sumCount()的语句,这个是ConcurrentHashmap内部计数完反馈给我们的值,在size()中也有相关调用:

        public int size() {
            long n = sumCount();
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
        }
    

    我们来看下sumCount()方法的内部实现:

        final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    if ((a = as[i]) != null)
                        sum += a.value;
                }
            }
            return sum;
        }
    

    可以看出这个计算是分为两部分,基础部分是baseCount部分,然后还要加上countCells数组的所有值,通过之前的分析我们知道,baseCount是CAS成功后会自动累加的值,而countCells数组是在CAS失败,也就是出现并发的情况下,进行累加的数组,这个数组类似segmenet一样,采用分段锁,最后将二者的值相加从而得到一个近似值。

    相关文章

      网友评论

          本文标题:ConcurrentHashMap解析二(putVal方法的解析

          本文链接:https://www.haomeiwen.com/subject/nfrdoftx.html