美文网首页
03 ConcurrentHashMap1.7源码深入剖析

03 ConcurrentHashMap1.7源码深入剖析

作者: 攻城老狮 | 来源:发表于2022-09-05 15:25 被阅读0次

    1 ConcurrentHashMap 机制

    ConcurrentHashMap 在 1.7版本中采用分段锁机制实现线程安全的支持高并发的HashMap集合类。ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个Hash 表划分为多个分段;而每个 Segment 元素,即每个分段则类似于一个Hashtable;这样,在执行 put 操作时首先根据 hash 算法定位到元素属于哪个 Segment,然后对该Segment 加锁即可。因此,ConcurrentHashMap 在多线程并发编程中可是实现多线程 put 操作。

    • HashMap 线程不安全

    多线程环境下,使用 Hashmap 进行 put 操作会引起死循环,所以在并发情况下不能使用HashMap。

    • HashTable 线程安全但效率低

    HashTable为保证线程安全付出的代价太大,get()、put()等方法都是 synchronized 的,这相当于给整个哈希表加了一把大锁。在并发调用HashTable的方法时就会造成大量的时间损耗。

    2 构造方法

    //初始化参数数值
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    
    /**
    * 无参构造方法,调用本身的传递三个参数的构造方法
    * 参数1:initialCapacity 初始化容量
    * 参数2:loadFactor 加载因子
    * 参数3:concurrencyLevel 默认并发度(用于构建Segment的数组长度,但必须保证2的幂次方,不一定是最终使用的数值)
    */
    public ConcurrentHashMap() {
      this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
    
    /**
    * 用于对 ConcurrentHashMap进行初始化的细节均在该构造方法中实现,
    * 构造方法中会确认Segment数值的长度,每个Segment数值里面HashTable的长度,
    * 并初始化首个s[0]的Segment元素。
    */
    public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
      // 判别传入的参数是否合法
      if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
      // 判断默认级别是否超过允许的最大开辟长度,猜测防止无限扩容Segment长度导致oom
      // => static final int MAX_SEGMENTS = 1 << 16;
      if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
      // Find power-of-two sizes best matching arguments
      // 保存ssize是2的几次幂
      int sshift = 0;
      // 用于初始化segments大小
      int ssize = 1;
      // 循环遍历找到大于等于concurrencyLevel的最小2次幂,例如:concurrencyLevel = 17,则大于等于 17 的最小2次幂为 32
      while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
      }
      // 用于后续计算Segments数组下标,使用高位参与运算
      this.segmentShift = 32 - sshift;
      // 掩码,用于位操作计算下标
      this.segmentMask = ssize - 1;
      // 初始容量不能超过规定的最大值
      // static final int MAXIMUM_CAPACITY = 1 << 30;
      if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
      // c 表示每个Segment元素中HashTable的大小
      int c = initialCapacity / ssize;
      // 确保总segment中table的容量大于等于initialCapacity
      // 例如 initialCapacity = 17,ssize = 16 则 c = 1, 1 * 16 < 17,所以 ++c 才可以满足总容量的需求
      if (c * ssize < initialCapacity)
        ++c;
      // 实际 hashtable 使用的容量,确保为2的幂次,hashtable 最小为 2
      // static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
      int cap = MIN_SEGMENT_TABLE_CAPACITY;
      // 如果容量不满足要求需要以2的幂次扩容,例如 cap = 2, c = 3   需要对 cap 扩容至 4
      while (cap < c)
        cap <<= 1;
      // create segments and segments[0]
      // 初始化index为0的segment,方便后面创建的segment对象可以直接使用这个segment的参数,不需要重复计算
      Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
      // 根据 ssize 大小创建 Segments 数组
      Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
      // 将 s0 放入 Segments 数组中
      UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
      // 赋值
      this.segments = ss;
    }
    

    3 put() 方法

    /**
    * put 方法为核心方法,是可以保证多线程安全的方法。
    * 主要逻辑是首先查看是否对应下标位置存在 Segment 元素,
    * 如果不存在,则进行线程安全的初始化;
    * 如果存在,则调用该位置的 Segment 对象的 put 方法具体存放数据到 HashTable 中。
    */
    public V put(K key, V value) {
      Segment<K,V> s;
      // 不支持 value == null 的情况
      if (value == null)
        throw new NullPointerException();
      // 计算 key 的 hash 值
      int hash = hash(key);
      // 计算在 Segments 数组中的元素下标,使用hash值的高位参与运算获取下标位置
      int j = (hash >>> segmentShift) & segmentMask;
      // 如果下标位置不存在Segment元素,需要进行Segment对象的初始化,主要在 ensureSegment() 方法中实现
      if ((s = (Segment<K,V>)UNSAFE.getObject        // nonvolatile; recheck
           (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
      // put 操作逻辑
      return s.put(key, hash, value, false);
    }
    

    4 ensureSegment() 方法

    /**
    *   ensureSegment() 方法主要负责对 Segment 对象的初始化,
    * 涉及到并发安全的问题,主要采用 CAS 的方式进行无锁自旋。
    */
    private Segment<K,V> ensureSegment(int k) {
      final Segment<K,V>[] ss = this.segments;
      // Segment元素下标
      long u = (k << SSHIFT) + SBASE; // raw offset
      Segment<K,V> seg;
      // 当前对应Segment元素为null,说明需要进行对象的初始化
      if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 直接使用Segment[0]的属性来初始化,不需要二次计算参数数据
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再检测一下是否存在,如果存在就不必进行new对象的操作逻辑,提升效率
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck
          // new Segment 对象
          Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
          // CAS 的方式将新创建的Segment对象添加到Segments数组中
          while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                 == null) {
            if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
              break;
          }
        }
      }
      // 返回新创建或者其他线程新创建的Segment对象
      return seg;
    }
    

    5 Segment 的 put() 方法

    /**
    * Segment的put()方法实现具体将key插入到Segment本身的table中,插入前需要先加锁,
    * 之后遍历插入位置的链表,遇到相同的元素则修改并返回旧值,
    * 到尾节点依旧没有找到相同元素,则创建新的entry元素插入。
    */
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
      // 如果没有获取到锁,则阻塞在这一步
      HashEntry<K,V> node = tryLock() ? null :
      scanAndLockForPut(key, hash, value);
      // 往下执行,代表已获取到锁
      V oldValue;
      try {
        HashEntry<K,V>[] tab = table;
        // 计算 key 应该在该 Segment 元素的 HashTable 插入的下标
        int index = (tab.length - 1) & hash;
        // 获取对应在segment中的头节点
        HashEntry<K,V> first = entryAt(tab, index);
        // 循环遍历头节点
        for (HashEntry<K,V> e = first;;) {
          // 头节点不为null
          if (e != null) {
            K k;
            // 判断是否存在相同元素
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) {
              // 旧值
              oldValue = e.value;
              // 覆盖旧值
              if (!onlyIfAbsent) {
                e.value = value;
                ++modCount;
              }
              break;
            }
            // 遍历链表下一个节点
            e = e.next;
          }
          // 头节点为null
          else {
            // node已初始化
            if (node != null)
              // 直接使用 scanAndLockForPut() 方法得到的 node,这里 first 一定为null,感觉大哥李写成 null 更容易理解
              node.setNext(first);
            // 需要初始化node
            else
              node = new HashEntry<K,V>(hash, key, value, first);
            // 长度+1
            int c = count + 1;
            // 判断是否需要扩容
            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
              rehash(node);
            // 直接将 node 设置在tab的index上
            else
              setEntryAt(tab, index, node);
            ++modCount;
            count = c;
            oldValue = null;
            break;
          }
        }
      } finally {
        // 解锁
        unlock();
      }
      // 返回旧值
      return oldValue;
    }
    

    6 scanAndLockForPut() 方法

    /**
    * scanAndLockForPut() 方法主要是在不断 tryLock() 尝试加锁,每次尝试加锁失败后,可以额外做些判断逻辑。
    */
    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
      // 获取在Segment对应位置的首节点
      HashEntry<K,V> first = entryForHash(this, hash);
      HashEntry<K,V> e = first;
      HashEntry<K,V> node = null;
      // 标志位,用于控制下面 if-else 的分支逻辑
      int retries = -1; // negative while locating node
      // 尝试加锁
      while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        // 循环遍历e,直到找到key值相等或为null
        if (retries < 0) {
          if (e == null) {
            // 当前节点是否为null
            if (node == null) // speculatively create node
              // 初始化node
              node = new HashEntry<K,V>(hash, key, value, null);
            retries = 0;
          }
          // key值相等,则设置retries为0
          else if (key.equals(e.key))
            retries = 0;
          // 继续找链表的下一个元素
          else
            e = e.next;
        }
        // 判断重试次数是否大于最大重试次数
        // static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        else if (++retries > MAX_SCAN_RETRIES) {
          // 直接加锁
          lock();
          break;
        }
        // 每隔2次判断,重现获取的首节点值与first不相等时,重置标志位。说明其他线程已经修改,需要重新获取最新的数据
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
          e = first = f; // re-traverse if entry changed
          retries = -1;
        }
      }
      return node;
    }
    

    7 rehash() 方法

    /**
    * rehash() 方法用于数组的扩容操作,首先将新数组容量定义为原数组的2倍,之后将旧元素计算hash值,插入到新数组中。再将新元素采用头插法插入新数组,最后完成数组的覆盖,完成扩容。
    */
    private void rehash(HashEntry<K,V> node) {
      HashEntry<K,V>[] oldTable = table;
      int oldCapacity = oldTable.length;
      // Segment中的新hashtable数组容量扩容为原来的2倍
      int newCapacity = oldCapacity << 1;
      threshold = (int)(newCapacity * loadFactor);
      // 新创建一个两倍原容量的table数组
      HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
      int sizeMask = newCapacity - 1;
      // 遍历原table
      for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        // 原数组下标位置有元素
        if (e != null) {
          HashEntry<K,V> next = e.next;
          // 计算在新数组的下标
          int idx = e.hash & sizeMask;
          // 当前列表只有个首节点,则直接赋值到newTable的index值上,由于调用该函数的上层已经加锁,所以直接操作即可,线程安全的
          if (next == null)   //  Single node on list
            newTable[idx] = e;
          // 需要遍历链表
          else { // Reuse consecutive sequence at same slot
            // 此处逻辑是找到链表最后末尾连续几个元素,这些元素的hash相同,可以插入到新数组的相同下标位置
            HashEntry<K,V> lastRun = e;
            int lastIdx = idx;
            for (HashEntry<K,V> last = next;
                 last != null;
                 last = last.next) {
              int k = last.hash & sizeMask;
              if (k != lastIdx) {
                lastIdx = k;
                lastRun = last;
              }
            }
            // 直接将找到的最后一段相同节点的首节点赋值
            newTable[lastIdx] = lastRun;
            // Clone remaining nodes
            // 从头节点开始遍历lastRun的前一节点,一个一个的插入剩余元素
            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
              V v = p.value;
              int h = p.hash;
              int k = h & sizeMask;
              HashEntry<K,V> n = newTable[k];
              newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
            }
          }
        }
      }
      // 前面已经将原数组的旧元素添加到新数组,此处再将新元素插入
      int nodeIndex = node.hash & sizeMask; // add the new node
      // 头插法
      node.setNext(newTable[nodeIndex]);
      newTable[nodeIndex] = node;
      // 赋值新数组
      table = newTable;
    }
    

    相关文章

      网友评论

          本文标题:03 ConcurrentHashMap1.7源码深入剖析

          本文链接:https://www.haomeiwen.com/subject/wjtbnrtx.html