美文网首页
concurrentHashMap 1.7原理解析

concurrentHashMap 1.7原理解析

作者: 后来丶_a24d | 来源:发表于2019-10-24 11:14 被阅读0次

concurrenthashmap不允许key值为null,在put的时候会校验如果为null则抛异常,原因是无法分辨是key没找到的null还是有key值为null,这在多线程里面是模糊不清的

构造函数

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //  参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并发度控制,最大是65535
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 等于ssize从1向左移位的 次数
    int sshift = 0;
    int ssize = 1;
    // 找出最接近concurrencyLevel的2的n次幂的数值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 这里之所 以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
    this.segmentShift = 32 - sshift;
    // 散列运算的掩码,等于ssize减1
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 里HashEntry数组的长度度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 保证HashEntry数组大小一定是2的n次幂
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 初始化Segment数组,并实际只填充Segment数组的第0个元素。
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
  • 确认ConcurrentHashMap的并发度,也就是Segment数组长度,并保证它是2的n次幂 16的并发度,取高4位
  • 确认HashEntry数组的初始化长度,并保证它是2的n次幂(hash & (n - 1)) 1111减少hash碰撞
  • 将Segment数组初始化好并且只填充第0个元素

put 流程

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 先获取key的hash值
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 2. 定位到Segment
        s = ensureSegment(j);
    // 3.调用Segment的put方法
    return s.put(key, hash, value, false);
}

Segment.put() 方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 1. 加锁
    HashEntry<K,V> node = tryLock() ? null :
            // scanAndLockForPut在没有获取到锁的情况下,去查询key是否存在,如果不存在就新建一个Node
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // 确定元素在tabl数组上的位置
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 如果原来位置上有值并且key相同,那么直接替换原来的value
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                // 元素总数加一
                int c = count + 1;
                // 判断是否需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
  • 先获取key的hash值
  • 定位到Segment
  • 调用Segment的put方法
  • 加锁
  • 定位key在tabl数组上的索引位置index,获取到头结点
  • 判断是否有hash冲突
  • 如果没有冲突直接将新节点node添加到数组index索引位
  • 如果有冲突,先判断是否有相同key
  • 有相同key直接替换对应node的value值
  • 没有添加新元素到链表尾部
  • 解锁

rehash

private void rehash(HashEntry<K,V> node) {
    // 复制老数组
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // table数组扩容2倍
    int newCapacity = oldCapacity << 1;
    // 扩容阈值也增加两倍
    threshold = (int)(newCapacity * loadFactor);
    // 创建新数组
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 计算新的掩码
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 计算新的索引位
            int idx = e.hash & sizeMask;
            // 转移数据
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 将新的节点加到对应索引位
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
  • 新建扩容后的数组,容量是原来的两倍
  • 遍历扩容前的数组
  • 通过e.hash & sizeMask;计算key新的索引位
  • 转移数据
  • 将扩容后的数组指向成员变量table

get() 方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // 计算出Segment的索引位
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 以原子的方式获取Segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 原子方式获取HashEntry
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // key相同
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                // value是volatile所以可以不加锁直接取值返回
                return e.value;
        }
    }
    return null;
}
  • 我们可以看到get方法是没有加锁的,因为HashEntry的value和next属性是volatile的,volatile直接保证了可见性,所以读的时候可以不加锁。

size() 方法

  • size的核心思想是先进性两次不加锁统计,如果两次的值一样则直接返回,否则第三个统计的时候会将所有segment全部锁定,再进行size统计,所以size()尽量少用。因为这是在并发情况下,size其他线程也会改变size大小,所以size()的返回值只能表示当前线程、当时的一个状态,可以算其实是一个预估值

参考文献

相关文章

网友评论

      本文标题:concurrentHashMap 1.7原理解析

      本文链接:https://www.haomeiwen.com/subject/jlervctx.html