美文网首页
深入理解 ConcurrentHashMap

深入理解 ConcurrentHashMap

作者: 懒癌正患者 | 来源:发表于2019-06-05 12:06 被阅读0次

    【转载原文】莫那·鲁道@掘金 讲得很仔细,注解也很清楚

    深入理解 hashcode 和 hash 算法

    为什么使用 hashcode

    那么我们就说说为什么使用 hashcode ,hashCode 存在的第一重要的原因就是在 HashMap(HashSet 其实就是HashMap) 中使用(其实Object 类的 hashCode 方法注释已经说明了 ),我知道,HashMap 之所以速度快,因为他使用的是散列表,根据 key 的 hashcode 值生成数组下标(通过内存地址直接查找,没有任何判断),时间复杂度完美情况下可以达到 n1(和数组相同,但是比数组用着爽多了,但是需要多出很多内存,相当于以空间换时间)。

    String 类型的 hashcode 方法

    在 JDK 中,Object 的 hashcode 方法是本地方法,也就是用 c 语言或 c++ 实现的,该方法直接返回对象的 内存地址。这么做会有说明问题呢?我们用代码看看:

    class Test1{
    
      String name;
    
      public Test1(String name) {
        this.name = name;
      }
    
      public static void main(String[] args) {
        Map<Test1, String> map = new HashMap<>(4);
        map.put(new Test1("hello"), "hello");
        String hello = map.get(new Test1("hello"));
        System.out.println(hello);
      }
    }
    

    这段代码打印出来的会是什么呢? 答: null。因为我们没有重写 hashCode 方法,所有,HashMap 内部使用的是该对象的内存地址,那么肯定不一样。我们第一个对象根本就没有存,因此,返回就是 null。这里就可以看出来重写 hashCode 的重要性。

    JDK 中,我们经常把 String 类型作为 key,那么 String 类型是如何重写 hashCode 方法的呢?

    我们看看代码:

    public int hashCode() {
            int h = hash;
            if (h == 0 && value.length > 0) {
                char val[] = value;
    
                for (int i = 0; i < value.length; i++) {
                    h = 31 * h + val[i];
                }
                hash = h;
            }
            return h;
        }
    

    代码非常简单,就是使用 String 的 char 数组的数字每次乘以 31 再叠加最后返回,因此,每个不同的字符串,返回的 hashCode 肯定不一样。那么为什么使用 31 呢?

    为什么大部分 hashcode 方法使用 31

    如果有使用 eclipse 的同学肯定知道,该工具默认生成的 hashCode 方法实现也和 String 类型差不多。都是使用的 31 ,那么有没有想过:为什么要使用 31 呢?
    在名著 《Effective Java》第 42 页就有对 hashCode 为什么采用 31 做了说明:

    之所以使用 31, 是因为他是一个奇素数。如果乘数是偶数,并且乘法溢出的话,信息就会丢失,因为与2相乘等价于移位运算(低位补0)。使用素数的好处并不很明显,但是习惯上使用素数来计算散列结果。 31 有个很好的性能,即用移位和减法来代替乘法,可以得到更好的性能: 31 * i == (i << 5) - i, 现代的 VM 可以自动完成这种优化。这个公式可以很简单的推导出来。

    这个问题在 SO 上也有讨论: https://stackoverflow.com/questions/299304/why-does-javas-hashcode-in-string-use-31-as-a-multiplier%EF%BC%89

    可以看到,使用 31 最主要的还是为了性能。当然用 63 也可以。但是 63 的溢出风险就更大了。那么15 呢?仔细想想也可以。

    在《Effective Java》也说道:编写这种散列函数是个研究课题,最好留给数学家和理论方面的计算机科学家来完成。我们此次最重要的是知道了为什么使用31。

    HashMap 的 hash 算法的实现原理(为什么右移 16 位,为什么要使用 ^ 位异或)

    好了,知道了 hashCode 的生成原理了,我们要看看今天的主角,hash 算法。

    其实,这个也是数学的范畴,从我们的角度来讲,只要知道这是为了更好的均匀散列表的下标就好了,但是,就是耐不住好奇心啊! 能多知道一点就是一点,我们来看看 HashMap 的 hash 算法(JDK 8).

    static final int hash(Object key) {
            int h;
            return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }
    

    乍看一下就是简单的异或运算和右移运算,但是为什么要异或呢?为什么要移位呢?而且移位16?

    在分析这个问题之前,我们需要先看看另一个事情,什么呢?就是 HashMap 如何根据 hash 值找到数组种的对象,我们看看 get 方法的代码:

    final Node<K,V> getNode(int hash, Object key) {
            Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
            if ((tab = table) != null && (n = tab.length) > 0 &&
                // 我们需要关注下面这一行
                (first = tab[(n - 1) & hash]) != null) {
                if (first.hash == hash && // always check first node
                    ((k = first.key) == key || (key != null && key.equals(k))))
                    return first;
                if ((e = first.next) != null) {
                    if (first instanceof TreeNode)
                        return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                    do {
                        if (e.hash == hash &&
                            ((k = e.key) == key || (key != null && key.equals(k))))
                            return e;
                    } while ((e = e.next) != null);
                }
            }
            return null;
        }
    

    我们看看代码中注释下方的一行代码:first = tab[(n - 1) & hash])。

    使用数组长度减一 与运算 hash 值。这行代码就是为什么要让前面的 hash 方法移位并异或。

    我们分析一下:

    首先,假设有一种情况,对象 A 的 hashCode 为 1000010001110001000001111000000,对象 B 的 hashCode 为 0111011100111000101000010100000。

    如果数组长度是16,也就是 15 与运算这两个数, 你会发现结果都是0。这样的散列结果太让人失望了。很明显不是一个好的散列算法。

    但是如果我们将 hashCode 值右移 16 位,也就是取 int 类型的一半,刚好将该二进制数对半切开。并且使用位异或运算(如果两个数对应的位置相反,则结果为1,反之为0),这样的话,就能避免我们上面的情况的发生。

    总的来说,使用位移 16 位和 异或 就是防止这种极端情况。但是,该方法在一些极端情况下还是有问题,比如:10000000000000000000000000 和 1000000000100000000000000 这两个数,如果数组长度是16,那么即使右移16位,在异或,hash 值还是会重复。但是为了性能,对这种极端情况,JDK 的作者选择了性能。毕竟这是少数情况,为了这种情况去增加 hash 时间,性价比不高。

    HashMap 为什么使用 & 与运算代替模运算?

    好了,知道了 hash 算法的实现原理还有他的一些取舍,我们再看看刚刚说的那个根据hash计算下标的方法:

    tab[(n - 1) & hash];

    其中 n 是数组的长度。其实该算法的结果和模运算的结果是相同的。但是,对于现代的处理器来说,除法和求余数(模运算)是最慢的动作。

    上面情况下和模运算相同呢?

    a % b == (b-1) & a ,当b是2的指数时,等式成立。

    我们说 & 与运算的定义:与运算 第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0;

    当 n 为 16 时, 与运算 101010100101001001101 时,也就是
    1111 & 101010100101001001000 结果:1000 = 8
    1111 & 101000101101001001001 结果:1001 = 9
    1111 & 101010101101101001010 结果: 1010 = 10
    1111 & 101100100111001101100 结果: 1100 = 12

    可以看到,当 n 为 2 的幂次方的时候,减一之后就会得到 1111* 的数字,这个数字正好可以掩码。并且得到的结果取决于 hash 值。因为 hash 值是1,那么最终的结果也是1 ,hash 值是0,最终的结果也是0。

    HashMap 的容量为什么建议是 2的幂次方?

    到这里,我们提了一个关键的问题: HashMap 的容量为什么建议是 2的幂次方?正好可以和上面的话题接上。楼主就是这么设计的。

    为什么要 2 的幂次方呢?

    我们说,hash 算法的目的是为了让hash值均匀的分布在桶中(数组),那么,如何做到呢?试想一下,如果不使用 2 的幂次方作为数组的长度会怎么样?
    假设我们的数组长度是10,还是上面的公式:

    1010 & 101010100101001001000 结果:1000 = 8
    1010 & 101000101101001001001 结果:1000 = 8
    1010 & 101010101101101001010 结果: 1010 = 10
    1010 & 101100100111001101100 结果: 1000 = 8

    看到结果我们惊呆了,这种散列结果,会导致这些不同的key值全部进入到相同的插槽中,形成链表,性能急剧下降。

    所以说,我们一定要保证 & 中的二进制位全为 1,才能最大限度的利用 hash 值,并更好的散列,只有全是1 ,才能有更多的散列结果。如果是 1010,有的散列结果是永远都不会出现的,比如 0111,0101,1111,1110.......,只要 & 之前的数有 0, 对应的 1 肯定就不会出现(因为只有都是1才会为1)。大大限制了散列的范围。

    我们自定义 HashMap 容量最好是多少?

    那我们如何自定义呢?自从有了阿里的规约插件,每次楼主都要初始化容量,如果我们预计我们的散列表中有2个数据,那么我就初始化容量为2嘛?

    绝对不行,如果大家看过源码就会发现,如果Map中已有数据的容量达到了初始容量的 75%,那么散列表就会扩容,而扩容将会重新将所有的数据重新散列,性能损失严重,所以,我们可以必须要大于我们预计数据量的 1.34 倍,如果是2个数据的话,就需要初始化 2.68 个容量。当然这是开玩笑的,2.68 不可以,3 可不可以呢?肯定也是不可以的,我前面说了,如果不是2的幂次方,散列结果将会大大下降。导致出现大量链表。那么我可以将初始化容量设置为4。 当然了,如果你预计大概会插入 12 条数据的话,那么初始容量为16简直是完美,一点不浪费,而且也不会扩容。

    总结

    好了,分析完了 hashCode 和 hash 算法,让我们对 HashMap 又有了全新的认识。当然,HashMap 中还有很多有趣的东西值得挖掘,楼主会继续写下去。争取将 HashMap 的衣服扒光。

    总的来说,通过今天的分析,对我们今后使用 HashMap 有了更多的把握,也能够排查一些问题,比如链表数很多,肯定是数组初始化长度不对,如果某个map很大,注意,肯定是事先没有定义好初始化长度,假设,某个Map存储了10000个数据,那么他会扩容到 20000,实际上,根本不用 20000,只需要 10000* 1.34= 13400 个,然后向上找到一个2 的幂次方,也就是 16384 初始容量足够。

    深入理解 HashMap put 方法(JDK 8逐行剖析)

    注意:我们今天所有的一切都是基于 JDK 8,JDK 8 的实现和 JDK 7 有重大区别。
    前面我们分析了 hashCode 和 hash 算法的原理,其实都是为我们解析 HashMap 做铺垫,因为 HashMap 确实比较复杂(如果你每一行代码都看的话,每个位移都纠结的话),虽然总的来说,HashMap 不过是 Node 数组加 链表和红黑树。但是里面的细节确是无比的优雅和有趣。楼主为什么选择 put 方法来讲呢?因为从楼主看来,HashMap 的精髓就在 put 方法中。
    HashMap 的解析从楼主来看,主要可以分为几个部分:

    1. hash 算法(这个我们之前说过了,今天就不再赘述了)
    2. 初始化数组。
    3. 通过 hash 计算下标并检查 hash 是否冲突,也就是对应的下标是否已存在元素。
    4. 通过判断是否含有元素,决定是否创建还是追加链表或树。
    5. 判断已有元素的类型,决定是追加树还是追加链表。
    6. 判断是否超过阀值,如果超过,则重新散列数组。
    7. Java 8 重新散列时是如何优化的。

    开始吧!!!

    初始化数组

    首先我们来一个测试例子:

    public static void main(String[] args) {
        HashMap<String, Integer> hashMap = new HashMap<>(2);
        hashMap.put("one", 1);
        Integer one = hashMap.get("one");
        System.out.println(one);
    }
    

    一个简单的不能再简单的使用 HashMap 的例子,其中包含了对于 HashMap 来说关键的 3 个步骤,初始化,put 元素,get 元素。

    由于我们预计会放入一个元素,出于性能考虑,我们将容量设置为 2,既保证了性能,也节约了空间(置于为什么,我们在之前的文章中讲过)。

    那么我们就看看 new 操作的时候做了些什么事情:

     /**
         * Constructs an empty <tt>HashMap</tt> with the specified initial
         * capacity and the default load factor (0.75).
         *
         * @param  initialCapacity the initial capacity.
         * @throws IllegalArgumentException if the initial capacity is negative.
         */
        public HashMap(int initialCapacity) {
            this(initialCapacity, DEFAULT_LOAD_FACTOR);
        }
    
    //=================================================================================
        /**
         * Constructs an empty <tt>HashMap</tt> with the specified initial
         * capacity and load factor.
         *
         * @param  initialCapacity the initial capacity
         * @param  loadFactor      the load factor
         * @throws IllegalArgumentException if the initial capacity is negative
         *         or the load factor is nonpositive
         */
        public HashMap(int initialCapacity, float loadFactor) {
            if (initialCapacity < 0)
                throw new IllegalArgumentException("Illegal initial capacity: " +
                                                   initialCapacity);
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            if (loadFactor <= 0 || Float.isNaN(loadFactor))
                throw new IllegalArgumentException("Illegal load factor: " +
                                                   loadFactor);
            this.loadFactor = loadFactor;
            this.threshold = tableSizeFor(initialCapacity);
        }
    

    上面是 HashMap 的两个构造方法,其中,我们设置了初始容量为 2, 而默认的加载因子我们之前说过:0.75,当然也可以自己设置,但 0.75 是最均衡的设置,没有特殊要求不要修改该值,加载因子过小,理论上能减少 hash 冲突,加载因子过大可以节约空间,减少 HashMap 中最耗性能的操作:reHash。

    从代码中我可以看到,如果我们设置的初始化容量小于0,将会抛出异常,如果加载因子小于0也会抛出异常。同时,如果初始容量大于最大容量,则重新设置为最大容量。

    我们开最后两行代码,首先,对负载因子进行赋值,这个没什么可说的。
    牛逼的是下面一行代码:this.threshold = tableSizeFor(initialCapacity); 可以看的出来这个动作是计算阀值,上面是阀值呢?阀值就是,如果容器中的元素大于阀值了,就需要进行扩容,那么这里的这行代码,就是根据初始容量进行阀值的计算。

    我们进入到该方法查看:

     /**
         * Returns a power of two size for the given target capacity.
         */
        static final int tableSizeFor(int cap) {
            int n = cap - 1;
            n |= n >>> 1;
            n |= n >>> 2;
            n |= n >>> 4;
            n |= n >>> 8;
            n |= n >>> 16;
            return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
        }
    

    一通或运算和无符号右移运算,那么这个运算的的最后结果是什么呢?这里其实就是如果用户输入的值不是2的幂次方(我们通过之前的分析,应该直到初始容量如果不是2的幂次方会有多么不好的结果)。通过位移运算和或运算,最后得到一定是2的幂次方,并且是那个离那个数最近的数字,我们仔细看看该方法:

    | : 或运算,第一个操作数的的第n位于第二个操作数的第n位 只要有一个是1,那么结果的第n为也为1,否则为0

    首先,将容量减1,我们后面就知道了。然后将该数字无符号右移1,2,4,8,16,总共移了32位,刚好是一个int类型的长度。在移动的过程中,还对该数字进行或运算,为了方便查看,楼主写一下2进制的运算过程,假如我输入的是10,明显不是2的幂次方。我们看看会怎么样:

    10 = 1010;
    n = 9;
    1001 == 9;
    1001 >>> 1 = 0100;
    1001 或 0100 = 1101;
    1101 >>> 2 = 0011;
    110 或 0011 = 1111;
    1111 >>> 4 = 0000;
    1111 或 0000 = 1111;
    1111 >>> 8 = 0000;
    1111 或 0000 = 1111;
    1111 >>> 16 = 0000;
    1111 或 0000 = 1111;
    

    最后,1111 也就是 15 ,15 + 1 = 16,刚好就是距离10 最近的并且没有变小的2的幂次方数。可以说这个算法非常的牛逼。楼主五体投地。

    但是如果是 16 呢,并且没有不减一,我们看看什么结果:

    16 = 10000;
    10000 >>> 1 = 01000;
    10000 或 01000 = 11000;
    11000 >>> 2 = 00110;
    11000 或 00110 = 11110;
    11110 >>> 4 = 00001;
    11110 或 00001 = 11111;
    11111 >>> 8 = 00000;
    11111 或 00000 = 11111;
    11111 >>> 16 = 00000;
    11111 或 00000 = 11111;
    

    最后的数字就是31 ,31+ 1 = 32,同样也是上升到了更大的2次幂的数字。但是这不是我想要的结果,所以,JDK 的作者在之前先减去了1. 防止出现这样的问题。

    我们仔细观察其算法的过程,可以说,任何一个int 数字,都能找到离他最近的 2 的幂次方数字(并且比他大)。

    好了。到这里就完成了初始化,不过请注意,这里设置的阀值并不是最终的阀值,最终的阀值我们会在后面详细说明。这里我们更加关注这个算法。真的牛逼啊。

    通过 hash 计算下标并检查 hash 是否冲突,也就是对应的下标是否已存在元素。

    初始化好了 HashMap,我们接着就调用了 put 方法,该方法如下:

    public V put(K key, V value) {
            return putVal(hash(key), key, value, false, true);
    }
    

    其中调用 hash 方法,该方法我们之前已经深入讨论过,今天就不赘述了,如果有同学没有看过,也不要紧,看完这篇 再去看 或者 看完那篇 再来 看这篇都可以。有点绕。好,然后调用了 puVal 方法,我们看看该方法:

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                       boolean evict) {
            Node<K,V>[] tab; Node<K,V> p; int n, i;
            // 当前对象的数组是null 或者数组长度时0时,则需要初始化数组
            if ((tab = table) == null || (n = tab.length) == 0)
                // 得到数组的长度 16
                n = (tab = resize()).length;
            // 如果通过hash值计算出的下标的地方没有元素,则根据给定的key 和 value 创建一个元素
            if ((p = tab[i = (n - 1) & hash]) == null)
                tab[i] = newNode(hash, key, value, null);
            else { // 如果hash冲突了
                Node<K,V> e; K k;
                // 如果给定的hash和冲突下标中的 hash 值相等并且 (已有的key和给定的key相等(地址相同,或者equals相同)),说明该key和已有的key相同
                if (p.hash == hash &&
                    ((k = p.key) == key || (key != null && key.equals(k))))
                    // 那么就将已存在的值赋给上面定义的e变量
                    e = p;
                // 如果以存在的值是个树类型的,则将给定的键值对和该值关联。
                else if (p instanceof TreeNode)
                    e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
                // 如果key不相同,只是hash冲突,并且不是树,则是链表
                else { 
                    // 循环,直到链表中的某个节点为null,或者某个节点hash值和给定的hash值一致且key也相同,则停止循环。
                    for (int binCount = 0; ; ++binCount) {
                        // 如果next属性是空
                        if ((e = p.next) == null) {
                            // 那么创建新的节点赋值给已有的next 属性
                            p.next = newNode(hash, key, value, null);
                            // 如果树的阀值大于等于7,也就是,链表长度达到了8(从0开始)。
                            if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                                // 如果链表长度达到了8,且数组长度小于64,那么就重新散列,如果大于64,则创建红黑树
                                treeifyBin(tab, hash);
                            // 结束循环
                            break;
                        }
                        // 如果hash值和next的hash值相同且(key也相同)
                        if (e.hash == hash &&
                            ((k = e.key) == key || (key != null && key.equals(k))))
                            // 结束循环
                            break;
                        // 如果给定的hash值不同或者key不同。
                        // 将next 值赋给 p,为下次循环做铺垫
                        p = e;
                    }
                }
                // 通过上面的逻辑,如果e不是null,表示:该元素存在了(也就是他们呢key相等)
                if (e != null) { // existing mapping for key
                    // 取出该元素的值
                    V oldValue = e.value;
                    // 如果 onlyIfAbsent 是 true,就不要改变已有的值,这里我们是false。
                    // 如果是false,或者 value 是null
                    if (!onlyIfAbsent || oldValue == null)
                        // 将新的值替换老的值
                        e.value = value;
                    // HashMap 中什么都不做
                    afterNodeAccess(e);
                    // 返回之前的旧值
                    return oldValue;
                }
            }
            // 如果e== null,需要增加 modeCount 变量,为迭代器服务。
            ++modCount;
            // 如果数组长度大于了阀值
            if (++size > threshold)
                // 重新散列
                resize();
            // HashMap 中什么都不做
            afterNodeInsertion(evict);
            // 返回null
            return null;
        }
    

    该方法可以说是 HashMap 的核心方法,楼主已经在该方法中写满了注释。楼主说一下该方法的步骤:

    1. 判断数组是否为空,如果是空,则创建默认长度位 16 的数组。

    2. 通过与运算计算对应 hash 值的下标,如果对应下标的位置没有元素,则直接创建一个。

    3. 如果有元素,说明 hash 冲突了,则再次进行 3 种判断。

      1. 判断两个冲突的key是否相等,equals 方法的价值在这里体现了。如果相等,则将已经存在的值赋给变量e。最后更新e的value,也就是替换操作。
      2. 如果key不相等,则判断是否是红黑树类型,如果是红黑树,则交给红黑树追加此元素。
      3. 如果key既不相等,也不是红黑树,则是链表,那么就遍历链表中的每一个key和给定的key是否相等。如果,链表的长度大于等于8了,则将链表改为红黑树,这是Java8 的一个新的优化。
    4. 最后,如果这三个判断返回的 e 不为null,则说明key重复,则更新key对应的value的值。

    5. 对维护着迭代器的modCount 变量加一。

    6. 最后判断,如果当前数组的长度已经大于阀值了。则重新hash。

    通过判断是否含有元素,决定是否创建还是追加链表或树。

    首先判断是否含有元素,通过什么判断呢?

    tab[i = (n - 1) & hash];
    

    这个算式根据 hash 值获取对应的下标,具体是什么原理,我们在上一篇文章已经说了原因。这里也不在赘述了。如果 hash 值没有冲突,则创建一个 Node 对象,参数是hash值,key,value,还有为null 的next 属性。下面是构造函数。

    // 构造函数
     Node(int hash, K key, V value, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
    }
    

    如果没有冲突,后面紧接着就走 ++modCount,然后,判断容量是否大于阀值(默认是12)。如果大于,则调用 resize 方法,重新散列。resize 方法我们后面详细分析。

    判断已有元素的类型,决定是追加树还是追加链表。

    如果 hash 冲突了怎么办?我们刚刚说会有3种判断:

    1. 判断两个冲突的key是否相等,equals 方法的价值在这里体现了。如果相等,则将已经存在的值赋给变量e。最后更新e的value,也就是替换操作。
    2. 如果key不相等,则判断是否是红黑树类型,如果是红黑树,则交给红黑树追加此元素。
    3. 如果key既不相等,也不是红黑树,则是链表,那么就遍历链表中的每一个key和给定的key是否相等。如果,链表的长度大于等于8了,则将链表改为红黑树,这是Java8 的一个新的优化。

    注意:在链表的循环中,有一个方法 treeifyBin,这个方法在链表长度大于等于8 的时候会调用,那么该方法的内容是什么呢?

    final void treeifyBin(Node<K,V>[] tab, int hash) {
        int n, index; Node<K,V> e;
        // 如果数组是null 或者数组的长度小于 64
        if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 重新散列
            resize();
        // 如果给定的hash冲突了,则创建红黑树结构
        else if ((e = tab[index = (n - 1) & hash]) != null) {
            TreeNode<K,V> hd = null, tl = null;
            do {
                TreeNode<K,V> p = replacementTreeNode(e, null);
                if (tl == null)
                    hd = p;
                else {
                    p.prev = tl;
                    tl.next = p;
                }
                tl = p;
            } while ((e = e.next) != null);
            if ((tab[index] = hd) != null)
                hd.treeify(tab);
        }
    }
    

    该方法虽然主要功能是替换链表结构为红黑树,但是在替换前,会先判断,如果数组是 null 或者数组的长度小于 64,则重新散列,因为重新散列会拆分链表,使得链表的长度变短。提高性能。如果长度大于64了。就只能将链表变为红黑树了。

    判断是否超过阀值,如果超过,则重新散列数组。

    最后,判断是否阀值,如果超过则进行散列;

    // 如果 e == null,需要增加 modeCount 变量,为迭代器服务。
    ++modCount;
    // 如果数组长度大于了阀值
    if (++size > threshold)
        // 重新散列
        resize();
    // HashMap 中什么都不做
    afterNodeInsertion(evict);
    // 返回null
    return null;
    

    我们知道,阀值默认是16,那么 resize 方法就是重新散列的核心方法,我们看看该方法实现:

    final Node<K,V>[] resize() {
            Node<K,V>[] oldTab = table;
            int oldCap = (oldTab == null) ? 0 : oldTab.length;
            int oldThr = threshold;
            int newCap, newThr = 0;
            // 如果老的容量大于0
            if (oldCap > 0) {
                // 如果容量大于容器最大值
                if (oldCap >= MAXIMUM_CAPACITY) {
                    // 阀值设为int最大值
                    threshold = Integer.MAX_VALUE;
                    // 返回老的数组,不再扩充
                    return oldTab;
                }// 如果老的容量*2 小于最大容量并且老的容量大于等于默认容量
                else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                         oldCap >= DEFAULT_INITIAL_CAPACITY)
                    // 新的阀值也再老的阀值基础上*2
                    newThr = oldThr << 1; // double threshold
            }// 如果老的阀值大于0
            else if (oldThr > 0) // initial capacity was placed in threshold
                // 新容量等于老阀值
                newCap = oldThr;
            else {  // 如果容量是0,阀值也是0,认为这是一个新的数组,使用默认的容量16和默认的阀值12           
                newCap = DEFAULT_INITIAL_CAPACITY;
                newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
            }
            // 如果新的阀值是0,重新计算阀值
            if (newThr == 0) {
                // 使用新的容量 * 负载因子(0.75)
                float ft = (float)newCap * loadFactor;
                // 如果新的容量小于最大容量 且 阀值小于最大 则新阀值等于刚刚计算的阀值,否则新阀值为 int 最大值
                newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                          (int)ft : Integer.MAX_VALUE);
            } 
            // 将新阀值赋值给当前对象的阀值。
            threshold = newThr;
            @SuppressWarnings({"rawtypes","unchecked"})
                // 创建一个Node 数组,容量是新数组的容量(新容量要么是老的容量,要么是老容量*2,要么是16)
                Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
            // 将新数组赋值给当前对象的数组属性
            table = newTab;
            // 如果老的数组不是null
            if (oldTab != null) {
              // 循环老数组
                for (int j = 0; j < oldCap; ++j) {
                    // 定义一个节点
                    Node<K,V> e;
                    // 如果老数组对应下标的值不为空
                    if ((e = oldTab[j]) != null) {
                        // 设置为空
                        oldTab[j] = null;
                        // 如果老数组没有链表
                        if (e.next == null)
                            // 将该值散列到新数组中
                            newTab[e.hash & (newCap - 1)] = e;
                        // 如果该节点是树
                        else if (e instanceof TreeNode)
                            // 调用红黑树 的split 方法,传入当前对象,新数组,当前下标,老数组的容量,目的是将树的数据重新散列到数组中
                            ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                        else { // 如果既不是树,next 节点也不为空,则是链表,注意,这里将优化链表重新散列(java 8 的改进)
                          // Java8 之前,这里曾是并发操作会出现环状链表的情况,但是Java8 优化了算法。此bug不再出现,但并发时仍然不建议HashMap
                            Node<K,V> loHead = null, loTail = null;
                            Node<K,V> hiHead = null, hiTail = null;
                            Node<K,V> next;
                            do {
                                next = e.next;
                                // 这里的判断需要引出一些东西:oldCap 假如是16,那么二进制为 10000,扩容变成 100000,也就是32.
                                // 当旧的hash值 与运算 10000,结果是0的话,那么hash值的右起第五位肯定也是0,那么该于元素的下标位置也就不变。
                                if ((e.hash & oldCap) == 0) {
                                    // 第一次进来时给链头赋值
                                    if (loTail == null)
                                        loHead = e;
                                    else
                                        // 给链尾赋值
                                        loTail.next = e;
                                    // 重置该变量
                                    loTail = e;
                                }
                                // 如果不是0,那么就是1,也就是说,如果原始容量是16,那么该元素新的下标就是:原下标 + 16(10000b)
                                else {
                                    // 同上
                                    if (hiTail == null)
                                        hiHead = e;
                                    else
                                        hiTail.next = e;
                                    hiTail = e;
                                }
                            } while ((e = next) != null);
                            // 理想情况下,可将原有的链表拆成2组,提高查询性能。
                            if (loTail != null) {
                                // 销毁实例,等待GC回收
                                loTail.next = null;
                                // 置入bucket中
                                newTab[j] = loHead;
                            }
                            if (hiTail != null) {
                                hiTail.next = null;
                                newTab[j + oldCap] = hiHead;
                            }
                        }
                    }
                }
            }
            return newTab;
        }
    

    该方法可以说还是比较复杂的。初始的时候也是调用的这个方法,当链表数超过8的时候同时数组长度小于64的时候也是调用的这个方法。该方法步骤如下:

    1. 判断容量是否大于0,如果大于0,并且容量已将大于最大值,则设置阀值为 int 最大值,并返回,如果老的容量乘以 2 小于最大容量,且老的容量大于等于16,则更新阀值。也就是乘以2.
    2. 如果老的阀值大于0,则新的容量等于老的阀值。注意:这里很重要。还记的我们之前使用new 操作符的时候,会设置阀值为 2 的幂次方,那么这里就用上了那个计算出来的数字,也就是说,就算我们设置的不是2的幂次方,HashMap 也会自动将你的容量设置为2的幂次方。
    3. 如果老的阀值和容量都不大于0,则认为是一个新的数组,默认初始容量为16,阀值为 16 * 0.75f,也就是 12。
    4. 如果,新的阀值还是0,那么就使用我们刚刚设置的容量(HashMap 帮我们算的),通过乘以 0.75,得到一个阀值,然后判断算出的阀值是否合法:如果容量小于最大容量并且阀值小于最大容量,那么则使用该阀值,否则使用 int 最大值。
    5. 将刚刚的阀值设置打当前Map实例的阀值属性中。
    6. 将刚刚的数组设置到当前Map实例的数组属性中。
    7. 如果老的数组不是null,则将老数组中的值重新散列到新数组中。如果是null,直接返回新数组。

    那么,将老数组重新散列的过程到底是怎么样的呢?

    Java 8 重新散列时是如何优化的。

    重新散列的代码:

    // 如果老的数组不是null
    if (oldTab != null) {
      // 循环老数组
        for (int j = 0; j < oldCap; ++j) {
            // 定义一个节点
            Node<K,V> e;
            // 如果老数组对应下标的值不为空
            if ((e = oldTab[j]) != null) {
                // 设置为空
                oldTab[j] = null;
                // 如果老数组没有链表
                if (e.next == null)
                    // 将该值散列到新数组中
                    newTab[e.hash & (newCap - 1)] = e;
                // 如果该节点是树
                else if (e instanceof TreeNode)
                    // 调用红黑树 的split 方法,传入当前对象,新数组,当前下标,老数组的容量,目的是将树的数据重新散列到数组中
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // 如果既不是树,next 节点也不为空,则是链表,注意,这里将优化链表重新散列(java 8 的改进)
                  // Java8 之前,这里曾是并发操作会出现环状链表的情况,但是Java8 优化了算法。此bug不再出现,但并发时仍然不建议HashMap
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        // 这里的判断需要引出一些东西:oldCap 假如是16,那么二进制为 10000,扩容变成 100000,也就是32.
                        // 当旧的hash值 与运算 10000,结果是0的话,那么hash值的右起第五位肯定也是0,那么该于元素的下标位置也就不变。
                        if ((e.hash & oldCap) == 0) {
                            // 第一次进来时给链头赋值
                            if (loTail == null)
                                loHead = e;
                            else
                                // 给链尾赋值
                                loTail.next = e;
                            // 重置该变量
                            loTail = e;
                        }
                        // 如果不是0,那么就是1,也就是说,如果原始容量是16,那么该元素新的下标就是:原下标 + 16(10000b)
                        else {
                            // 同上
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    // 理想情况下,可将原有的链表拆成2组,提高查询性能。
                    if (loTail != null) {
                        // 销毁实例,等待GC回收
                        loTail.next = null;
                        // 置入bucket中
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    

    这里楼主重新贴了上面的代码,因为这段代码比较重要。还是说一下该部分代码的步骤。

    1. 首先循环老数组。下标从0开始,如果对应下标的值不是null,则判断该值有没有next 节点,也就是判断是否是链表。
    2. 如果不是链表,则根据新的数组长度重新hash该元素。
    3. 如果该节点是树,则调用红黑树的 split 方法,传入当前对象和新数组还有下标,该方法会重新计算红黑树中的每个hash值,如果重新计算后,树中元素的hash值不同,则重新散列到不同的下标中。达到拆分红黑树的目的,提高性能。具体如何拆分下面再说。
    4. 之后的判断就是链表,在Java8中,该部分代码不是简单的将旧链表中的数据拷贝到新数组中的链表就完了,而是会对旧的链表进行重新 hash,如果 hash 得到的值和之前不同,则会从旧的链表中拆出,放到另一个下标中去,提高性能,刚刚的红黑树也是这么做的。

    这里的重新hash 不是使用的 [e.hash & (newCap - 1)] 方法,而是使用更加效率的方法,直接 hash 老的数组容量,就没有了减一的操作,可见 JDK 的作者为了性能可以说是无所不用其极了。

    其实我们的注释已经写了,但是楼主还是再解释一遍吧:

    仔细阅读下面这段话:

    oldCap 假如是16,那么二进制为 10000,扩容变成 100000,也就是32.当旧的hash值 与运算 10000,结果是0的话,那么hash值的右起第五位肯定也是0,那么该于元素的下标位置也就不变。但如果不是0是1的话,说明该hash值变化了,那么就需要对这个元素重新散列放置。那么应该放哪里呢?如果是16,那么最左边是1的话,说明hash值变大了16,那么只需要在原有的基础上加上16便好了。

    这段代码还有一个需要注意的地方:在JDK 7 中,这里的的代码是不同的,在并发情况下会链表会变成环状,形成死锁。而JDK 8 已经修复了该问题,但是仍然不建议使用 HashMap 并发编程。

    总结

    截至到这里,我们的 HashMap 的 put 方法已经剖析完了,此次可以说收获不小:

    我们知道了,无论我们如何设置初始容量,HashMap 都会将我们改成2的幂次方,也就是说,HashMap 的容量百分之百是 2的幂次方,因为HashMap 太依赖他了。但是,请注意:如果我们预计插入7条数据,那么我们写入7,HashMap 会设置为 8,虽然是2的幂次方,但是,请注意,当我们放入第7条数据的时候,就会引起扩容,造成性能损失,所以,知晓了原理,我们以后在设置容量的时候还是自己算一下,比如放7条数据,我们还是都是设置成16,这样就不会扩容了。

    HashMap 的默认加载因子是 0.75,虽然可以修改,但是出于安全考虑,除非你经过大量测试,请不要修改此值,HashMap 使用此值基本是平衡了性能和空间的取舍。

    HashMap 扩容的时机是,容器中的元素数量 > 负载因此 * 容量,如果负载因子是0.75,容量是16,那么当容器中数量达到13 的时候就会扩容。还有,如果某个链表长度达到了8,并且容量小于64,则也会用扩容代替红黑树。

    HashMap 在 JDK 7 中并发扩容的时候是非常危险的,非常容易导致链表成环状。但 JDK 8 中已经修改了此bug。但还是不建议使用。强烈推荐并发容器 ConcurrentHashMap。

    HashMap 扩容的时候,不管是链表还是红黑树,都会对这些数据进行重新的散列计算,然后缩短他们的长度,优化性能。在进行散列计算的时候,会进一步优化性能,减少减一的操作,直接使用& 运算。可谓神来之笔。

    总之,HashMap 中的优秀的设计思想值得我们去学习,最让楼主震惊的就是那个将任意一个数变成了2的幂次方的数,并且该数字很合理,说实话,如果让楼主写,楼主是写不出来的。

    ConcurrentHashMap(JDK 1.8) putVal 源码分析

    我们之前分析了Hash的源码,主要是 put 方法。同时,我们知道,HashMap 在并发的时候是不安全的,为什么呢?因为当多个线程对 Map 进行扩容会导致链表成环。不单单是这个问题,当多个线程相同一个槽中插入数据,也是不安全的。而在这之后,我们学习了并发编程,而并发编程中有一个重要的东西,就是JDK 自带的并发容器,提供了线程安全的特性且比同步容器性能好出很多。一个典型的代表就是 ConcurrentHashMap,对,又是 HashMap ,但是这个 Map 是线程安全的,那么同样的,我们今天就看看该类的 put 方法是如何实现线程安全的。

    源码加注释分析 putVal 方法

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 死循环执行
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            // 获取对应下标节点,如果是kong,直接插入
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // CAS 进行插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果 hash 冲突了,且 hash 值为 -1,说明是 ForwardingNode 对象(这是一个占位符对象,保存了扩容后的容器)
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 如果 hash 冲突了,且 hash 值不为 -1
            else {
                V oldVal = null;
                // 同步 f 节点,防止增加链表的时候导致链表成环
                synchronized (f) {
                    // 如果对应的下标位置 的节点没有改变
                    if (tabAt(tab, i) == f) {
                        // 并且 f 节点的hash 值 不是大于0
                        if (fh >= 0) {
                            // 链表初始长度
                            binCount = 1;
                            // 死循环,直到将值添加到链表尾部,并计算链表的长度
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果 f 节点的 hasj 小于0 并且f 是 树类型
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 链表长度大于等于8时,将该节点改成红黑树树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 判断是否需要扩容
        addCount(1L, binCount);
        return null;
    }
    

    楼主在代码中写了很多注释,但是还是说一下步骤(该方法和HashMap 的高度相似,但是多了很多同步操作)。

    1. 校验key value 值,都不能是null。这点和 HashMap 不同。
    2. 得到 key 的 hash 值。
    3. 死循环并更新 tab 变量的值。
    4. 如果容器没有初始化,则初始化。调用 initTable 方法。该方法通过一个变量 + CAS 来控制并发。稍后我们分析源码。
    5. 根据 hash 值找到数组下标,如果对应的位置为空,就创建一个 Node 对象用CAS方式添加到容器。并跳出循环。
    6. 如果 hash 冲突,也就是对应的位置不为 null,则判断该槽是否被扩容了(-1 表示被扩容了),如果被扩容了,返回新的数组。
    7. 如果 hash 冲突 且 hash 值不是 -1,表示没有被扩容。则进行链表操作或者红黑树操作,注意,这里的 f 头节点被锁住了,保证了同时只有一个线程修改链表。防止出现链表成环。
    8. 和 HashMap 一样,如果链表树超过8,则修改链表为红黑树。
    9. 将数组加1(CAS方式),如果需要扩容,则调用 transfer 方法(非常复杂,以后再详解)进行移动和重新散列,该方法中,如果是槽中只有单个节点,则使用CAS直接插入,如果不是,则使用 synchronized 进行同步,防止并发成环。

    这里说一说 initTable 方法:

    /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 小于0说明被其他线程改了
            if ((sc = sizeCtl) < 0)
                // 自旋等待
                Thread.yield(); // lost initialization race; just spin
            // CAS 修改 sizeCtl 的值为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        // sc 在初始化的时候用户可能会自定义,如果没有自定义,则是默认的
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        // 创建数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sizeCtl 计算后作为扩容的阀值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    该方法为了在并发环境下的安全,加入了一个 sizeCtl 变量来进行判断,只有当一个线程通过CAS修改该变量成功后(默认为0,改成 -1),该线程才能初始化数组。保证了初始化数组时的安全性。

    总结

    ConcurrentHashMap 是并发大师 Doug Lea 的杰作,可以说鬼斧神工,总的来说,使用了 CAS 加 synchronized 来保证了 put 操作并发时的危险(特别是链表),相比 同步容器 hashTable 来说,如果容器大小是16,并发的性能是他的16倍,注意,读的时候是没有锁的,完全并发,而 HashTable 在 get 方法上直接加上了 synchronized 关键字,性能差距不言而喻。

    当然,楼主这篇文章可能之写到了 ConcurrentHashMap 的皮毛,关于如何扩容,楼主没有详细介绍,而楼主在阅读源码的收获也很多,发现了很多有趣的东西,比如 ThreadLocalRandom 类在 addCount 方法中的应用,大家可以看看该类,非常的实用。

    注意:这篇文章仅仅是 ConcurrentHashMap 的开头,关于 ConcurrentHashMap 里面的精华太多,值得我们好好学习。

    ConcurrentHashMap size 方法原理分析

    ConcurrentHashMap 博大精深,从他的 50 多个内部类就能看出来,似乎 JDK 的并发精髓都在里面了。但他依然拥有体验良好的 API 给我们使用,程序员根本感觉不到他内部的复杂。但,他内部的每一个方法都复杂无比,就连 size 方法,都挺复杂的。

    今天就一起来看看这个 size 方法。

    size 方法

    代码如下:

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
    }
    

    最大返回 int 最大值,但是这个 Map 的长度是有可能超过 int 最大值的,所以 JDK 8 增了 mappingCount 方法。代码如下:

    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }
    

    相比较 size 方法,mappingCount 方法的返回值是 long 类型。所以不必限制最大值必须是 Integer.MAX_VALUE。而 JDK 推荐使用这个方法。但这个返回值依然不一定绝对准确。

    从这两个方法中可以看出,sumCount 方法是核心。

    sumCount 方法实现

    代码如下:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    上面的方法逻辑:当 counterCells 不是 null,就遍历元素,并和 baseCount 累加。

    两个属性 : baseCount 和 counterCells。

    先看 baseCount。

    /**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     * 当没有争用时,使用这个变量计数。
     */
    private transient volatile long baseCount;
    

    一个 volatile 的变量,在 addCount 方法中会使用它,而 addCount 方法在 put 结束后会调用。在 addCount 方法中,会对这个变量做 CAS 加法。

    image.png

    但是如果并发导致 CAS 失败了,怎么办呢?使用 counterCells。

    image.png

    如果上面 CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。

    image.png

    而这个 CounterCell 类又是上面鬼呢?

    // 一种用于分配计数的填充单元。改编自LongAdder和Striped64。请查看他们的内部文档进行解释。
    @sun.misc.Contended 
    static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
    

    使用了 @sun.misc.Contended 标记的类,内部一个 volatile 变量。注释说,改编自LongAdder和Striped64,关于这两个类,请看 Java8 Striped64 和 LongAdder。

    而关于这个注解,有必要解释一下。这个注解标识着这个类防止需要防止 "伪共享".

    说说伪共享。引用 一下别人的说法:

    避免伪共享(false sharing)。先引用个伪共享的解释:缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,

    如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

    所以伪共享对性能危害极大。

    JDK 8 版本之前没有这个注解,Doug Lea 使用拼接来解决这个问题,把缓存行加满,让缓存之间的修改互不影响。

    在我的机器上测试,加和不加这个注解的性能差距达到了 5 倍。

    总结

    好了,关于 Size 方法就简单介绍到这里。总结一下:

    JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值(size 方法是接口定义的,不能修改)。

    在没有并发的情况下,使用一个 baseCount volatile 变量就足够了,当并发的时候,CAS 修改 baseCount 失败后,就会使用 CounterCell 类了,会创建一个这个对象,通常对象的 volatile value 属性是 1。在计算 size 的时候,会将 baseCount 和 CounterCell 数组中的元素的 value 累加,得到总的大小,但这个数字仍旧可能是不准确的。

    还有一个需要注意的地方就是,这个 CounterCell 类使用了 @sun.misc.Contended 注解标识,这个注解是防止伪共享的。是 1.8 新增的。使用时,需要加上 -XX:-RestrictContended 参数。

    ConcurrentHashMap#helpTransfer() 分析

    ConcurrentHashMap 鬼斧神工,并发添加元素时,如果 map 正在扩容,其他线程甚至于还会帮助扩容,也就是多线程扩容。就这一点,就可以写一篇文章好好讲讲。今天一起来看看。

    源码分析

    为什么帮助扩容?

    在 putVal 方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。那么就帮助 Map 进行扩容。以加快速度。

    具体代码如下:

    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        } // 如果对应槽位不为空,且他的 hash 值是 -1,说明正在扩容,那么就帮助其扩容。以加快速度
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
    

    传入的参数是:成员变量 table 和 对应槽位的 f 变量。

    怎么验证 hash 值是 MOVED 就是正在扩容呢?

    在 Cmap(ConcurrentHashMap 简称) 中,定义了一堆常量,其中:

    static final int MOVED     = -1; // hash for forwarding nodes
    

    hash for forwarding nodes,说明这个为了移动节点而准备的常量。

    在 Node 的子类 ForwardingNode 的构造方法中,可以看到这个变量作为 hash 值进行了初始化。

    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    

    而这个构造方法只在一个地方调用了,即 transfer(扩容) 方法。

    点到为止。

    关于扩容后面再开一篇。

    好了,如何帮助扩容呢?那要看看 helpTransfer 方法的实现。

    /**
     * Helps transfer if a resize is in progress.
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 如果 table 不是空 且 node 节点是转移类型,数据检验
        // 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验
        // 尝试帮助扩容
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            // 根据 length 得到一个标识符号
            int rs = resizeStamp(tab.length);
            // 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改
            // 且 sizeCtl  < 0 (说明还在扩容)
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                // 如果 sizeCtl 无符号右移  16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
                // 或者 sizeCtl == rs + 1  (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
                // 或者 sizeCtl == rs + 65535  (如果达到最大帮助线程的数量,即 65535)
                // 或者转移下标正在调整 (扩容结束)
                // 结束循环,返回 table
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    // 进行转移
                    transfer(tab, nextTab);
                    // 结束循环
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    

    关于 sizeCtl 变量:

    • -1 :代表table正在初始化,其他线程应该交出CPU时间片
    • -N: 表示正有N-1个线程执行扩容操作(高 16 位是 length 生成的标识符,低 16 位是扩容的线程数)
    • 大于 0: 如果table已经初始化,代表table容量,默认为table大小的0.75,如果还未初始化,代表需要初始化的大小

    代码步骤:

    1. 判 tab 空,判断是否是转移节点。判断 nextTable 是否更改了。
    2. 更加 length 得到标识符。
    3. 判断是否并发修改了,判断是否还在扩容。
    4. 如果还在扩容,判断标识符是否变化,判断扩容是否结束,判断是否达到最大线程数,判断扩容转移下标是否在调整(扩容结束),如果满足任意条件,结束循环。
    5. 如果不满足,并发转移。

    这里有一个花费了很长时间纠结的地方:

    sc == rs + 1
    

    这个判断可以在 addCount 方法中找到答案:默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1。

    如果 sizeCtl == 标识符 + 1 ,说明库容结束了,没有必要再扩容了。

    总结一下:

    当 Cmap put 元素的时候,如果发现这个节点的元素类型是 forward 的话,就帮助正在扩容的线程一起扩容,提高速度。其中, sizeCtl 是关键,该变量高 16 位保存 length 生成的标识符,低 16 位保存并发扩容的线程数,通过这连个数字,可以判断出,是否结束扩容了。

    如下图:

    image.png

    ConcurrentHashMap#transfer() 扩容逐行分析

    ConcurrentHashMap 是并发中的重中之重,也是最常用的数据结果,之前的文章中,我们介绍了 putVal 方法。并发编程之 ConcurrentHashMap(JDK 1.8) putVal 源码分析。其中分析了 initTable 方法和 putVal 方法,但也留下了一句话:

    这篇文章仅仅是 ConcurrentHashMap 的开头,关于 ConcurrentHashMap 里面的精华太多,值得我们好好学习。

    说道精华,他的扩容方法绝对是精华,要知道,ConcurrentHashMap 扩容是高度并发的。

    今天来逐行分析源码。

    先说结论

    首先说结论。源码加注释我会放在后面。该方法的执行逻辑如下:

    1. 通过计算 CPU 核心数和 Map 数组的长度得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理 16 个桶。因此,如果长度是 16 的时候,扩容的时候只会有一个线程扩容。

    2. 初始化临时变量 nextTable。将其在原有基础上扩容两倍。

    3. 死循环开始转移。多线程并发转移就是在这个死循环中,根据一个 finishing 变量来判断,该变量为 true 表示扩容结束,否则继续扩容。

      1. 进入一个 while 循环,分配数组中一个桶的区间给线程,默认是 16. 从大到小进行分配。当拿到分配值后,进行 i-- 递减。这个 i 就是数组下标。(其中有一个 bound 参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容,还有一个 advance 参数,该参数指的是是否继续递减转移下一个桶,如果为 true,表示可以继续向后推进,反之,说明还没有处理好当前桶,不能推进)
      2. 出 while 循环,进 if 判断,判断扩容是否结束,如果扩容结束,清空临死变量,更新 table 变量,更新库容阈值。如果没完成,但已经无法领取区间(没了),该线程退出该方法,并将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
      3. 如果没有完成任务,且 i 对应的槽位是空,尝试 CAS 插入占位符,让 putVal 方法的线程感知。
      4. 如果 i 对应的槽位不是空,且有了占位符,那么该线程跳过这个槽位,处理下一个槽位。
      5. 如果以上都是不是,说明这个槽位有一个实际的值。开始同步处理这个桶。
      6. 到这里,都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了。
    4. 处理每个桶的行为都是同步的。防止 putVal 的时候向链表插入数据。

      1. 如果这个桶是链表,那么就将这个链表根据 length 取于拆成两份,取于结果是 0 的放在新表的低位,取于结果是 1 放在新表的高位。
      2. 如果这个桶是红黑数,那么也拆成 2 份,方式和链表的方式一样,然后,判断拆分过的树的节点数量,如果数量小于等于 6,改造成链表。反之,继续使用红黑树结构。
      3. 到这里,就完成了一个桶从旧表转移到新表的过程。

    好,以上,就是 transfer 方法的总体逻辑。还是挺复杂的。再进行精简,分成 3 步骤:

    1. 计算每个线程可以处理的桶区间。默认 16.
    2. 初始化临时变量 nextTable,扩容 2 倍。
    3. 死循环,计算下标。完成总体判断。
    4. 如果桶内有数据,同步转移数据。通常会像链表拆成 2 份。

    大体就是上的的 3 个步骤。

    再来看看源码和注释。

    再看源码分析

    源码加注释:

    /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     * 
     * transferIndex 表示转移时的下标,初始为扩容前的 length。
     * 
     * 我们假设长度是 32
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
        // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO
        // 新的 table 尚未初始化
        if (nextTab == null) {            // initiating
            try {
                // 扩容  2 倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                // 更新
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // 扩容失败, sizeCtl 使用 int 最大值。
                sizeCtl = Integer.MAX_VALUE;
                return;// 结束
            }
            // 更新成员变量
            nextTable = nextTab;
            // 更新转移下标,就是 老的 tab 的 length
            transferIndex = n;
        }
        // 新 tab 的 length
        int nextn = nextTab.length;
        // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
        boolean advance = true;
        // 完成状态,如果是 true,就结束此方法。
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
            while (advance) {
                int nextIndex, nextBound;
                // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
                // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
                // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
                if (--i >= bound || finishing)
                    advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
                else if ((nextIndex = transferIndex) <= 0) {
                    // 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
                    // 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
                    i = -1;
                    advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                }// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标
                    i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
                    advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
                }
            }// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
            //  如果 i >= tab.length(不知道为什么这么判断)
            //  如果 i + tab.length >= nextTable.length  (不知道为什么这么判断)
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) { // 如果完成了扩容
                    nextTable = null;// 删除成员变量
                    table = nextTab;// 更新 table
                    sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
                    return;// 结束方法。
                }// 如果没完成
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
                        return;// 不相等,说明没结束,当前线程结束方法。
                    finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量
                    i = n; // 再次循环检查一下整张表
                }
            }
            else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
                advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标
            else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。
                advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标
            else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
                synchronized (f) {
                    // 判断 i 下标处的桶节点是否和 f 相同
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;// low, height 高位桶,低位桶
                        // 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
                        if (fh >= 0) {
                            // 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
                            // 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
                            //  如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
                            int runBit = fh & n;
                            Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
                            // 遍历这个桶
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                // 取于桶中每个节点的 hash 值
                                int b = p.hash & n;
                                // 如果节点的 hash 值和首节点的 hash 值取于结果不同
                                if (b != runBit) {
                                    runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
                                    lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
                                }
                            }
                            if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
                                ln = null;
                            }// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                // 如果与运算结果是 0,那么就还在低位
                                if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else // 1 则创建高位
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 其实这里类似 hashMap 
                            // 设置低位链表放在新链表的 i
                            setTabAt(nextTab, i, ln);
                            // 设置高位链表,在原有长度上加 n
                            setTabAt(nextTab, i + n, hn);
                            // 将旧的链表设置成占位符
                            setTabAt(tab, i, fwd);
                            // 继续向后推进
                            advance = true;
                        }// 如果是红黑树
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            // 遍历
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                // 和链表相同的判断,与运算 == 0 的放在低位
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                } // 不是 0 的放在高位
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            // 低位树
                            setTabAt(nextTab, i, ln);
                            // 高位数
                            setTabAt(nextTab, i + n, hn);
                            // 旧的设置成占位符
                            setTabAt(tab, i, fwd);
                            // 继续向后推进
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    

    代码加注释比较长,有兴趣可以逐行对照,有 2 个判断楼主看不懂为什么这么判断,知道的同学可以提醒一下。

    然后,说说精华的部分。

    1. Cmap 支持并发扩容,实现方式是,将表拆分,让每个线程处理自己的区间。如下图:
    image.png
    假设总长度是 64 ,每个线程可以分到 16 个桶,各自处理,不会互相影响。
    
    1. 而每个线程在处理自己桶中的数据的时候,是下图这样的:

      扩容前的状态。

      当对 4 号桶或者 10 号桶进行转移的时候,会将链表拆成两份,规则是根据节点的 hash 值取于 length,如果结果是 0,放在低位,否则放在高位。

      因此,10 号桶的数据,黑色节点会放在新表的 10 号位置,白色节点会放在新桶的 26 号位置。

      下图是循环处理桶中数据的逻辑:

    image.png
    处理完之后,新桶的数据是这样的:
    
    image.png

    总结

    transfer 方法可以说很牛逼,很精华,内部多线程扩容性能很高,

    通过给每个线程分配桶区间,避免线程间的争用,通过为每个桶节点加锁,避免 putVal 方法导致数据不一致。同时,在扩容的时候,也会将链表拆成两份,这点和 HashMap 的 resize 方法类似。

    而如果有新的线程想 put 数据时,也会帮助其扩容。鬼斧神工,令人赞叹。

    ConcurrentHashMap#addCount() 分析

    ConcurrentHashMap 精华代码很多,前面分析了 helpTransfer 和 transfer 和 putVal 方法,今天来分析一下 addCount 方法,该方法会在 putVal 方法中调用。

    该方法可以配合 size 方法一起查看,关于该方法,楼主也写了一篇文章分析过:并发编程 —— ConcurrentHashMap size 方法原理分析

    具体代码如下:

    addCount(1L, binCount);
    return null;
    

    当插入结束的时候,会调用该方法,并传入一个 1 和 binCount 参数。从方法名字上,该方法应该是对哈希表的元素进行计数的。

    一起来看看 addCount 是如何操作的。

    源码分析

    源码加注释:

    // 从 putVal 传入的参数是 1, binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.且他的大小是链表的长度(如果不是红黑数结构的话)。
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 如果计数盒子不是空 或者
        // 如果修改 baseCount 失败
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 如果计数盒子是空(尚未出现并发)
            // 如果随机取余一个数组位置为空 或者
            // 修改这个槽位的变量失败(出现并发了)
            // 执行 fullAddCount 方法。并结束
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // 如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容) 且
            // table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // 根据 length 得到一个标识
                int rs = resizeStamp(n);
                // 如果正在扩容
                if (sc < 0) {
                    // 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
                    // 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
                    // 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
                    // 如果 nextTable == null(结束扩容了)
                    // 如果 transferIndex <= 0 (转移状态变化了)
                    // 结束循环 
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        // 扩容
                        transfer(tab, nt);
                }
                // 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // 更新 sizeCtl 为负数后,开始扩容。
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    

    方法加了注释以后,还是挺长的。

    总结一下该方法的逻辑:

    1. x 参数表示的此次需要对表中元素的个数加几。check 参数表示是否需要进行扩容检查,大于等于0 需要进行检查,而我们的 putVal 方法的 binCount 参数最小也是 0 ,因此,每次添加元素都会进行检查。(除非是覆盖操作)
    2. 判断计数盒子属性是否是空,如果是空,就尝试修改 baseCount 变量,对该变量进行加 X。
    3. 如果计数盒子不是空,或者修改 baseCount 变量失败了,则放弃对 baseCount 进行操作。
    4. 如果计数盒子是 null 或者计数盒子的 length 是 0,或者随机取一个位置取于数组长度是 null,那么就对刚刚的元素进行 CAS 赋值。
    5. 如果赋值失败,或者满足上面的条件,则调用 fullAddCount 方法重新死循环插入。
      这里如果操作 baseCount 失败了(或者计数盒子不是 Null),且对计数盒子赋值成功,那么就检查 check 变量,如果该变量小于等于 1. 直接结束。否则,计算一下 count 变量。
    6. 如果 check 大于等于 0 ,说明需要对是否扩容进行检查。
    7. 如果 map 的 size 大于 sizeCtl(扩容阈值),且 table 的长度小于 1 << 30,那么就进行扩容。
    8. 根据 length 得到一个标识符,然后,判断 sizeCtl 状态,如果小于 0 ,说明要么在初始化,要么在扩容。
    9. 如果正在扩容,那么就校验一下数据是否变化了(具体可以看上面代码的注释)。如果检验数据不通过,break。
    10. 如果校验数据通过了,那么将 sizeCtl 加一,表示多了一个线程帮助扩容。然后进行扩容。
    11. 如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位 —— 一个负数。然后加 2。 表示,已经有一个线程开始扩容了。然后进行扩容。然后再次更新 count,看看是否还需要扩容。

    总结一下

    总结下来看,addCount 方法做了 2 件事情:

    1. 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
    2. 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

    有几个要点注意:

    1. 第一次调用扩容方法前,sizeCtl 的低 16 位是加 2 的,不是加一。所以 sc == rs + 1 的判断是表示是否完成任务了。因为完成扩容后,sizeCtl == rs + 1。
    2. 扩容线程最大数量是 65535,是由于低 16 位的位数限制。
    3. 这里也是可以帮助扩容的,类似 helpTransfer 方法。

    相关文章

      网友评论

          本文标题:深入理解 ConcurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/ujpuaftx.html