美文网首页
concurrentHashMap 1.7原理解析

concurrentHashMap 1.7原理解析

作者: 后来丶_a24d | 来源:发表于2019-10-24 11:14 被阅读0次

    concurrenthashmap不允许key值为null,在put的时候会校验如果为null则抛异常,原因是无法分辨是key没找到的null还是有key值为null,这在多线程里面是模糊不清的

    构造函数

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //  参数校验
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 并发度控制,最大是65535
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 等于ssize从1向左移位的 次数
        int sshift = 0;
        int ssize = 1;
        // 找出最接近concurrencyLevel的2的n次幂的数值
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 这里之所 以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
        this.segmentShift = 32 - sshift;
        // 散列运算的掩码,等于ssize减1
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // 里HashEntry数组的长度度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        // 保证HashEntry数组大小一定是2的n次幂
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 初始化Segment数组,并实际只填充Segment数组的第0个元素。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
    
    • 确认ConcurrentHashMap的并发度,也就是Segment数组长度,并保证它是2的n次幂 16的并发度,取高4位
    • 确认HashEntry数组的初始化长度,并保证它是2的n次幂(hash & (n - 1)) 1111减少hash碰撞
    • 将Segment数组初始化好并且只填充第0个元素

    put 流程

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        // 1. 先获取key的hash值
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            // 2. 定位到Segment
            s = ensureSegment(j);
        // 3.调用Segment的put方法
        return s.put(key, hash, value, false);
    }
    
    Segment.put() 方法
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 1. 加锁
        HashEntry<K,V> node = tryLock() ? null :
                // scanAndLockForPut在没有获取到锁的情况下,去查询key是否存在,如果不存在就新建一个Node
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            // 确定元素在tabl数组上的位置
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    // 如果原来位置上有值并且key相同,那么直接替换原来的value
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    // 元素总数加一
                    int c = count + 1;
                    // 判断是否需要扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    
    • 先获取key的hash值
    • 定位到Segment
    • 调用Segment的put方法
    • 加锁
    • 定位key在tabl数组上的索引位置index,获取到头结点
    • 判断是否有hash冲突
    • 如果没有冲突直接将新节点node添加到数组index索引位
    • 如果有冲突,先判断是否有相同key
    • 有相同key直接替换对应node的value值
    • 没有添加新元素到链表尾部
    • 解锁

    rehash

    private void rehash(HashEntry<K,V> node) {
        // 复制老数组
        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        // table数组扩容2倍
        int newCapacity = oldCapacity << 1;
        // 扩容阈值也增加两倍
        threshold = (int)(newCapacity * loadFactor);
        // 创建新数组
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
        // 计算新的掩码
        int sizeMask = newCapacity - 1;
        for (int i = 0; i < oldCapacity ; i++) {
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                // 计算新的索引位
                int idx = e.hash & sizeMask;
                // 转移数据
                if (next == null)   //  Single node on list
                    newTable[idx] = e;
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;
                    int lastIdx = idx;
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        // 将新的节点加到对应索引位
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }
    
    • 新建扩容后的数组,容量是原来的两倍
    • 遍历扩容前的数组
    • 通过e.hash & sizeMask;计算key新的索引位
    • 转移数据
    • 将扩容后的数组指向成员变量table

    get() 方法

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        // 计算出Segment的索引位
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // 以原子的方式获取Segment
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 原子方式获取HashEntry
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                // key相同
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    // value是volatile所以可以不加锁直接取值返回
                    return e.value;
            }
        }
        return null;
    }
    
    • 我们可以看到get方法是没有加锁的,因为HashEntry的value和next属性是volatile的,volatile直接保证了可见性,所以读的时候可以不加锁。

    size() 方法

    • size的核心思想是先进性两次不加锁统计,如果两次的值一样则直接返回,否则第三个统计的时候会将所有segment全部锁定,再进行size统计,所以size()尽量少用。因为这是在并发情况下,size其他线程也会改变size大小,所以size()的返回值只能表示当前线程、当时的一个状态,可以算其实是一个预估值

    参考文献

    相关文章

      网友评论

          本文标题:concurrentHashMap 1.7原理解析

          本文链接:https://www.haomeiwen.com/subject/jlervctx.html