美文网首页
源码分析:Redis hash和Java HashMap、Con

源码分析:Redis hash和Java HashMap、Con

作者: 史啸天 | 来源:发表于2021-06-12 17:23 被阅读0次

    简述

    大家在开发工作中经常使用的key-value的数据结构,像Java中有HashMap,线程安全的ConcurrentHashMap,在Redis中hash结构更是最基础的;
    这里我们来分析一下它们的底层数据结构,包括如何扩容的;

    HashMap

    Java的HashMap是比较传统的数据结构了,网上关于HashMap的数据结构介绍有很多,就不再过多赘述了,这里咱们以JDK1.8的为主;
    我们重点关注HashMap的几个常用方法,话不多说,上代码。

    构造方法

    1、无参数构造方法

    public HashMap() {
      // 哈希加载因子,赋予默认值0.75
      this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
    }
    

    2、1个参数的构造方法

    public HashMap(int initialCapacity) {
      // 调用两个参数的构造方法,DEFAULT_LOAD_FACTOR=0.75
      this(initialCapacity, DEFAULT_LOAD_FACTOR);
    }
    

    3、2个参数的构造方法

    public HashMap(int initialCapacity, float loadFactor) {
      // 入参小于0,抛出异常
      if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                                   initialCapacity);
      // 超过最大值,赋予最大值
      if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
      // 加载因子小于0或者非数字,抛出异常
      if (loadFactor <= 0 || Float.isNaN(loadFactor))
                throw new IllegalArgumentException("Illegal load factor: " +
                                                   loadFactor);
      // 赋值加载因子
      this.loadFactor = loadFactor;
      // 赋值初始化大小
      this.threshold = tableSizeFor(initialCapacity);
    }
    

    4、参数是Map的构造方法

    public HashMap(Map<? extends K, ? extends V> m) {
      // 赋值默认加载因子
      this.loadFactor = DEFAULT_LOAD_FACTOR;
      // 实现是循环调用put方法初始化一个HashMap
      putMapEntries(m, false);
    }
    /**
     * 初始化一个HashMap
     */
    final void putMapEntries(Map<? extends K, ? extends V> m, boolean evict) {
      int s = m.size();
      if (s > 0) {
        // 如果Node数组为null,开始初始化
        if (table == null) { // pre-size
          float ft = ((float)s / loadFactor) + 1.0F;
          int t = ((ft < (float)MAXIMUM_CAPACITY) ? 
                          (int)ft : MAXIMUM_CAPACITY);
          if (t > threshold)
            threshold = tableSizeFor(t);
         // 如果传入的map大于初始长度,开始执行扩容方法
         } else if (s > threshold){
          resize();
         }
        // 循环调用putVal方法,开始构造map
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
          K key = e.getKey();
          V value = e.getValue();
          putVal(hash(key), key, value, false, evict);
        }
      }
    }
    
    put方法
    /**
      *  首先key使用hash()方法,返回一个int值,底层也是调用putVal方法
      */
    public V put(K key, V value) {
      return putVal(hash(key), key, value, false, true);
    }
    /**
      *  
      */
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                       boolean evict) {
      Node<K,V>[] tab; Node<K,V> p; int n, I;
      // Node数组为null或者长度为0,调用resize扩容方法
      if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
      // 找出元素存放桶的位置,并判断该位置是否为null,如果是null创建新的Node放入数组中
      if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
      else {
        // 桶的位置有元素,则执行以下方法
        Node<K,V> e; K k;
        // 判断当前位置的key的hash值和旧key的hash值是否相同
        if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k))))
          // 新旧key的hash值相同,进行覆盖
          e = p;
        // 校验当前Node已经是红黑树结构
        else if (p instanceof TreeNode)
          // 执行红黑树的put操作
          e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
          // 遍历Node上的链表
          for (int binCount = 0; ; ++binCount) {
            // 判断链表的下一个节点是否为空,为空则代表已经达到链表末尾
            if ((e = p.next) == null) {
              // 在链表末尾增加一个元素
              p.next = newNode(hash, key, value, null);
              // 判断是否达到了红黑树的阈值,大于等于8-1
              if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                // 尝试转换为红黑树
                treeifyBin(tab, hash);
              // 跳出,结束循环
              break;
            }
            // 判断key是否与链表中的某个key相等(包括hash值),完全相等则跳出
            if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k))))
              break;
             // 将p赋值下一个节点,e=p.next,通过这种方式实现遍历链表
             p = e;
          }
        }
        // 如果e不为null,则表示链表上存在key相同的元素
        if (e != null) { // existing mapping for key
          V oldValue = e.value;
          // 判断是否要替换值
          if (!onlyIfAbsent || oldValue == null)
            // 替换value值
            e.value = value;
          // 空的钩子函数,回调函数
          afterNodeAccess(e);
          // 返回链表重复旧的value值
          return oldValue;
        }
      }
      // 次数加1
      ++modCount;
      // size加1,如果大于容量,执行扩容操作
      if (++size > threshold)
        resize();
      // 又是一个空的钩子函数,回调函数
      afterNodeInsertion(evict);
      // 返回null
      return null;
    }
    /**
     * 尝试将链表转换成红黑树
     */
    final void treeifyBin(HashMap.Node<K,V>[] tab, int hash) {
        int n, index; HashMap.Node<K,V> e;
        // Node数组是空,或者Node数组小于64,进行扩容操作
        if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
            resize();
        // 否则,判断是否为null,进行红黑树转换
        else if ((e = tab[index = (n - 1) & hash]) != null) {
            HashMap.TreeNode<K,V> hd = null, tl = null;
            do {
                HashMap.TreeNode<K,V> p = replacementTreeNode(e, null);
                if (tl == null)
                    hd = p;
                else {
                    p.prev = tl;
                    tl.next = p;
                }
                tl = p;
            } while ((e = e.next) != null);
            if ((tab[index] = hd) != null)
                hd.treeify(tab);
        }
    }
    
    resize方法(扩容方法)
    /**
     * 扩容方法
     */
    final HashMap.Node<K,V>[] resize() {
        // 旧的Node数组
        HashMap.Node<K,V>[] oldTab = table;
        // 旧的数组长度
        int oldCap = (oldTab == null) ? 0 : oldTab.length;
        // 旧的Map容量
        int oldThr = threshold;
        int newCap, newThr = 0;
        // 如果旧的数组长度大于0
        if (oldCap > 0) {
            // 旧的数组长度大于最大值,直接返回旧的Node数组
            if (oldCap >= MAXIMUM_CAPACITY) {
                threshold = Integer.MAX_VALUE;
                return oldTab;
            }
            // 扩容,新的Node.length乘以2,容量也乘以2
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                    oldCap >= DEFAULT_INITIAL_CAPACITY)
                newThr = oldThr << 1; // double threshold
        }
        // 如果旧数组长度不大于0,但是容量大于0,则代表要进行初始化操作
        else if (oldThr > 0) // initial capacity was placed in threshold
            newCap = oldThr;
        // 都不大于0,则代表初始化,赋予默认大小
        else {               // zero initial threshold signifies using defaults
            // DEFAULT_INITIAL_CAPACITY = 1 << 4 是16
            newCap = DEFAULT_INITIAL_CAPACITY;
            // 容量是加载因子0.75 乘以 默认桶容量16
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
        }
        // 如果新的容量为0,代表执行的是初始化操作,给予新容量赋值
        if (newThr == 0) {
            float ft = (float)newCap * loadFactor;
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                    (int)ft : Integer.MAX_VALUE);
        }
        // 赋值新的容量
        threshold = newThr;
        // 创建新的Node数组,并赋值给全局Node数组
        HashMap.Node<K,V>[] newTab = (HashMap.Node<K,V>[])new HashMap.Node[newCap];
        table = newTab;
        // 如果旧的Node数组不是null,代表不仅是初始化操作,开始进行数组拷贝
        if (oldTab != null) {
            // 循环
            for (int j = 0; j < oldCap; ++j) {
                HashMap.Node<K,V> e;
                // 非空判断
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    //  Node的当前数组,不存在链表结构
                    if (e.next == null)
                        // 直接进行新Node数组的位置计算,并赋值
                        newTab[e.hash & (newCap - 1)] = e;
                    // 如果是红黑树,进行红黑树的扩容
                    else if (e instanceof HashMap.TreeNode)
                        ((HashMap.TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        // 不需要移动的链表
                        HashMap.Node<K,V> loHead = null, loTail = null;
                        // 需要移动的链表
                        HashMap.Node<K,V> hiHead = null, hiTail = null;
                        // Node数组的链表
                        HashMap.Node<K,V> next;
                        do {
                            next = e.next;
                            // 将链表拆分成需要移动和不需要移动的两个
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        // 循环操作链表
                        } while ((e = next) != null);
                        // 不需要移动的原位置赋值
                        if (loTail != null) {
                            loTail.next = null;
                            newTab[j] = loHead;
                        }
                        // 需要移动的原位置+旧数组长度位置
                        if (hiTail != null) {
                            hiTail.next = null;
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        // 返回新的扩容后的Node数组
        return newTab;
    }
    
    remove方法
    /**
     * 删除操作
     */
    public V remove(Object key) {
        HashMap.Node<K,V> e;
        return (e = removeNode(hash(key), key, null, false, true)) == null ?
                null : e.value;
    }
    /**
     * 真正的删除方法
     */
    final HashMap.Node<K,V> removeNode(int hash, Object key, Object value,
                                       boolean matchValue, boolean movable) {
        HashMap.Node<K,V>[] tab; HashMap.Node<K,V> p; int n, index;
        // 校验空和初始化操作,根据hash运算找出key所在桶位置
        if ((tab = table) != null && (n = tab.length) > 0 &&
                (p = tab[index = (n - 1) & hash]) != null) {
            HashMap.Node<K,V> node = null, e; K k; V v;
            // 如果桶位置的key完全相等
            if (p.hash == hash &&
                    ((k = p.key) == key || (key != null && key.equals(k))))
                node = p;
            // 判断是否存在链表,链表不为空执行下面方法
            else if ((e = p.next) != null) {
                // 判断是否是红黑树结构
                if (p instanceof HashMap.TreeNode)
                    // 调用红黑树的方法,找到树的具体节点
                    node = ((HashMap.TreeNode<K,V>)p).getTreeNode(hash, key);
                else {
                    // 循环找到元素在链表的具体位置
                    do {
                        if (e.hash == hash &&
                                ((k = e.key) == key ||
                                        (key != null && key.equals(k)))) {
                            node = e;
                            break;
                        }
                        p = e;
                    } while ((e = e.next) != null);
                }
            }
            if (node != null && (!matchValue || (v = node.value) == value ||
                    (value != null && value.equals(v)))) {
                // node节点如果是红黑树
                if (node instanceof HashMap.TreeNode)
                    // 调用红黑树的移除方法
                    ((HashMap.TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
                // 经过上序操作,node如果和p相等,则表示这个位置不存在链表
                else if (node == p)
                    // 将当前桶位置赋值为空,因为当前桶位置不存在链表,所以node.next是个空
                    tab[index] = node.next;
                else
                    // 移动链表,将后续链表值赋予当前位置
                    p.next = node.next;
                // 修改次数加1
                ++modCount;
                // 长度减1
                --size;
                // 钩子回调函数
                afterNodeRemoval(node);
                // 返回当前节点,最终remove操作会返回value值
                return node;
            }
        }
        return null;
    }
    
    总结

    需要注意几个细节,在面试中可能会被经常问到

    1、直接new出来的HashMap容量是多少?
    答:我们从源码可以看到,只有入参是Map的构造方法会进行Node数组的初始化和赋值;其他的构造方法new出来的HashMap是不会进行初始化的,只有在第一个元素put进去的时候,会调用resize方法,进行数组的初始化;

    2、什么情况下会触发扩容?链表长度达到8一定会转换红黑树吗?
    -首先,Map的size大于16(容量)*0.75(加载因子)的时候会触发resize()扩容操作;
    -其次,再新增元素到链表中时,会判断是否大于等于8-1,满足的话会 调用treeifyBin()方法,尝试进行红黑树转换;
    -最后,在转换红黑树之前,会判断Node数组长度是否小于64,如果小于64就放弃转换,进行扩容操作;
    基于源码我们可以理解扩容是基于两个逻辑:
            一个是HashMap中数组元素过多,容易频繁触发hash冲突;
            另一个是元素虽然不多,但是元素都集中在某一个位置上,严重影响效率,就会通过扩容,来重新计算hash,让其分布的更加均匀;

    3、put有返回值嘛?返回的是什么?
    答:从源码我们可以看到,put方法会返回放入的value值,但是只有存入的元素被放置在链表的位置才会有返回,如果放入数组位置,返回的是null;

    ConcurrentHashMap

    构造方法

    1、无参构造方法

    // 真的什么都不做,只创建一个ConcurrentHashMap对象
    public ConcurrentHashMap() {
    }
    

    2、1个参数的构造方法

    public ConcurrentHashMap(int initialCapacity) {
        // 如果入参小于0,抛出异常
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 进行初始化容量
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                MAXIMUM_CAPACITY :
                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    

    3、2个参数的构造方法

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        // 调用3个参数的构造方法,并发级别默认为1
        this(initialCapacity, loadFactor, 1);
    }
    

    4、3个参数的构造方法

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        // 校验入参,小于0则抛出异常
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 初始容量小于并发级别
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        // 初始化容量
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    

    5、参数是Map的构造方法

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        // 默认容量16
        this.sizeCtl = DEFAULT_CAPACITY;
        // 调用putAll方法,初始化
        putAll(m);
    }
    /**
      * 将Map批量添加
      */
    public void putAll(Map<? extends K, ? extends V> m) {
        // 初始化大小
        tryPresize(m.size());
        // 循环初始化Map
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }
    
    put方法
    /**
      * 添加元素方法
      **/
    public V put(K key, V value) {
        // 调用put方法
        return putVal(key, value, false);
    }
    /**
      *  真正的put方法
      **/
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key或者value是null,抛出NPE异常
        if (key == null || value == null) throw new NullPointerException();
        // 计算key的hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 循环
        for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
            ConcurrentHashMap.Node<K,V> f; int n, i, fh;
            // 如果数组是null,或者length是0,进行初始化操作
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 存入桶的位置为null
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 比较并替换(cas)操作,将元素放入桶中,如果成功,跳出循环
                if (casTabAt(tab, i, null,
                        new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 根据hash值判断是否正在进行扩容
            else if ((fh = f.hash) == MOVED)
                // 协助进行扩容,在扩容中详细注释
                tab = helpTransfer(tab, f);
            // 当前桶位置存在元素(hash冲突),链表形式添加元素
            else {
                V oldVal = null;
                // 对当前桶位进行加锁
                synchronized (f) {
                    // 链表头节点
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            // 循环链表
                            for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // key完全相同,替换value值,并跳出
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                ConcurrentHashMap.Node<K,V> pred = e;
                                // next是null,代表链表末尾
                                if ((e = e.next) == null) {
                                    // 添加到链表末尾
                                    pred.next = new ConcurrentHashMap.Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是红黑树
                        else if (f instanceof ConcurrentHashMap.TreeBin) {
                            ConcurrentHashMap.Node<K,V> p;
                            binCount = 2;
                            if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 是否大于等于8
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 转换红黑树,并非一定会转红黑树
                        treeifyBin(tab, i);
                    // 返回value
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 统计节点个数,检察是否需要扩容操作
        addCount(1L, binCount);
        return null;
    }
    /**
     * 链表转红黑树方法
     */
    private final void treeifyBin(ConcurrentHashMap.Node<K,V>[] tab, int index) {
        ConcurrentHashMap.Node<K,V> b; int n, sc;
        // 为空判断
        if (tab != null) {
            // 校验Node数组长度是否小于64
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                // 扩容操作
                tryPresize(n << 1);
            // 需要链表转红黑树的桶位置
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                // 该桶位加锁
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        ConcurrentHashMap.TreeNode<K,V> hd = null, tl = null;
                        // 循环链表,创建treeNode替换原来的Node
                        for (ConcurrentHashMap.Node<K,V> e = b; e != null; e = e.next) {
                            ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>(e.hash, e.key, e.val, null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new ConcurrentHashMap.TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
    
    扩容

    在上面的方法中会看到很多扩容的方法,比如:addCount()、helpTransfer()、tryPresize()等,有的是协助扩容,有的是本身进行扩容,他们的底层调用都是transfer()方法进行扩容操作;
    transfer()方法有两个参数,tab(当前Node数组)和nextTab(扩容Node数组),当nextTab不为空时,会进行协助扩容;

    /**
     * 扩容方法
     */
    private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 如果nextTab为空
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 创建一个原来两倍长度的数组
                ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 循环开始元素移动
        for (int i = 0, bound = 0;;) {
            ConcurrentHashMap.Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 所有节点都完成移动,
                if (finishing) {
                    // 清空扩容的nextTable
                    nextTable = null;
                    // table赋值扩容后的tab
                    table = nextTab;
                    // 扩容阈值变更
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 遍历到的节点是空,则放入ForwardingNode
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 已处理过的位置
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        ConcurrentHashMap.Node<K,V> ln, hn;
                        // 链表扩容执行
                        if (fh >= 0) {
                            int runBit = fh & n;
                            // 最后一个节点,也就是反序链表
                            ConcurrentHashMap.Node<K,V> lastRun = f;
                            for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 红黑树的处理,过程与上面类似
                        else if (f instanceof ConcurrentHashMap.TreeBin) {
                            ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                            ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                            ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    
    总结

    ConcurrentHashMap做为JUC包中应对并发操作的Map,在JDK1.8中使用了很多Unsafe中的CAS操作,有兴趣的同学可以再深入研究一下;下面我们来总结下可能遇到的面试问题;

    1、ConcurrentHashMap也会转红黑树吗?
    答:是的,从源码可以看到ConcurrentHashMap和HashMap一样,在链表长度达到8时不一定会触发红黑树操作,只有size大于64,且链表长度大于8时才会触发转红黑树的操作;

    2、ConcurrentHahMap的扩容方法也叫resize方法吗?
    答:不是的,我们看到ConcurrentHashMap采用的是协同扩容,如果检测到正在进行扩容,会加入到扩容操作中,不管是扩容还是协同扩容,他们调用的扩容方法都是transfer(),只不过入参不同进行的步骤就不一样;

    3、协同扩容有操作上限吗?假设正在进行扩容,此时有100个put操作,这100个put都会加入到扩容操作中吗?
    答:不是的,在transfer方法中我们可以看到,有下面一行判断

    private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
    ... 后面省略
    

    其中stride参数控制扩容线程数量,这个判断了机器cpu核数,来控制扩容加入扩线程的数量;

    Redis hash

    大家用过redis的都可以感受到,整个redis都是基于这种key-value的方式存储;在redis中基于dict方法构建的,我们先来看下数据结构;

    数据结构
    /*
     * 字典
     */
    typedef struct dict {
    
        // 类型特定函数
        dictType *type;
    
        // 私有数据
        void *privdata;
    
        // 哈希表
        dictht ht[2];
    
        // rehash 索引
        // 当 rehash 不在进行时,值为 -1
        int rehashidx; /* rehashing not in progress if rehashidx == -1 */
    
        // 目前正在运行的安全迭代器的数量
        int iterators; /* number of iterators currently running */
    
    } dict;
    /*
     * 哈希表
     *
     * 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
     */
    typedef struct dictht {
        
        // 哈希表数组
        dictEntry **table;
    
        // 哈希表大小
        unsigned long size;
        
        // 哈希表大小掩码,用于计算索引值
        // 总是等于 size - 1
        unsigned long sizemask;
    
        // 该哈希表已有节点的数量
        unsigned long used;
    
    } dictht;
    /*
     * 哈希表节点
     */
    typedef struct dictEntry {
        
        // 键
        void *key;
    
        // 值
        union {
            void *val;
            uint64_t u64;
            int64_t s64;
        } v;
    
        // 指向下个哈希表节点,形成链表
        struct dictEntry *next;
    
    } dictEntry;
    
    初始化方法

    在创建一个字典的时候,会调用初始化方法,初始化两个dictht哈希表;

    /*
     * 创建一个新的字典
     *
     * T = O(1)
     */
    dict *dictCreate(dictType *type,
            void *privDataPtr)
    {
        dict *d = zmalloc(sizeof(*d));
    
        _dictInit(d,type,privDataPtr);
    
        return d;
    }
    /*
     * 初始化哈希表
     *
     * T = O(1)
     */
    int _dictInit(dict *d, dictType *type,
            void *privDataPtr)
    {
        // 初始化两个哈希表的各项属性值
        // 但暂时还不分配内存给哈希表数组
        _dictReset(&d->ht[0]);
        _dictReset(&d->ht[1]);
    
        // 设置类型特定函数
        d->type = type;
    
        // 设置私有数据
        d->privdata = privDataPtr;
    
        // 设置哈希表 rehash 状态
        d->rehashidx = -1;
    
        // 设置字典的安全迭代器数量
        d->iterators = 0;
    
        return DICT_OK;
    }
    /*
     * 重置(或初始化)给定哈希表的各项属性值
     *
     * p.s. 上面的英文注释已经过期
     *
     * T = O(1)
     */
    static void _dictReset(dictht *ht)
    {
        ht->table = NULL;
        ht->size = 0;
        ht->sizemask = 0;
        ht->used = 0;
    }
    
    新增操作
    /*
     * 将给定的键值对添加到字典中,如果键已经存在,那么删除旧有的键值对。
     *
     * 如果键值对为全新添加,那么返回 1 。
     * 如果键值对是通过对原有的键值对更新得来的,那么返回 0 。
     *
     * T = O(N)
     */
    int dictReplace(dict *d, void *key, void *val)
    {
        dictEntry *entry, auxentry;
    
        // 尝试直接将键值对添加到字典
        // 如果键 key 不存在的话,添加会成功
        // T = O(N)
        if (dictAdd(d, key, val) == DICT_OK)
            return 1;
    
        // 运行到这里,说明键 key 已经存在,那么找出包含这个 key 的节点
        // T = O(1)
        entry = dictFind(d, key);
    
        // 先保存原有的值的指针
        auxentry = *entry;
        // 然后设置新的值
        // T = O(1)
        dictSetVal(d, entry, val);
        // 然后释放旧值
        // T = O(1)
        dictFreeVal(d, &auxentry);
    
        return 0;
    }
    /*
     * 尝试将给定键值对添加到字典中
     *
     * 只有给定键 key 不存在于字典时,添加操作才会成功
     *
     * 添加成功返回 DICT_OK ,失败返回 DICT_ERR
     *
     * 最坏 T = O(N) ,平滩 O(1) 
     */
    int dictAdd(dict *d, void *key, void *val)
    {
        // 尝试添加键到字典,并返回包含了这个键的新哈希节点
        // T = O(N)
        dictEntry *entry = dictAddRaw(d,key);
    
        // 键已存在,添加失败
        if (!entry) return DICT_ERR;
    
        // 键不存在,设置节点的值
        // T = O(1)
        dictSetVal(d, entry, val);
    
        // 添加成功
        return DICT_OK;
    }
    /*
     * 尝试将键插入到字典中
     *
     * 如果键已经在字典存在,那么返回 NULL
     *
     * 如果键不存在,那么程序创建新的哈希节点,
     * 将节点和键关联,并插入到字典,然后返回节点本身。
     *
     * T = O(N)
     */
    dictEntry *dictAddRaw(dict *d, void *key)
    {
        int index;
        dictEntry *entry;
        dictht *ht;
    
        // 如果条件允许的话,进行单步 rehash
        // T = O(1)
        if (dictIsRehashing(d)) _dictRehashStep(d);
    
        // 计算键在哈希表中的索引值
        // 如果值为 -1 ,那么表示键已经存在
        // T = O(N)
        if ((index = _dictKeyIndex(d, key)) == -1)
            return NULL;
    
        // T = O(1)
    
        // 如果字典正在 rehash ,那么将新键添加到 1 号哈希表
        // 否则,将新键添加到 0 号哈希表
        ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
        // 为新节点分配空间
        entry = zmalloc(sizeof(*entry));
        // 将新节点插入到链表表头
        entry->next = ht->table[index];
        ht->table[index] = entry;
        // 更新哈希表已使用节点数量
        ht->used++;
    
        // 设置新节点的键
        // T = O(1)
        dictSetKey(d, entry, key);
    
        return entry;
    }
    
    查找操作
    /*
     * 获取包含给定键的节点的值
     *
     * 如果节点不为空,返回节点的值
     * 否则返回 NULL
     *
     * T = O(1)
     */
    void *dictFetchValue(dict *d, const void *key) {
        dictEntry *he;
    
        // T = O(1)
        he = dictFind(d,key);
    
        return he ? dictGetVal(he) : NULL;
    }
    /*
     * 返回字典中包含键 key 的节点
     *
     * 找到返回节点,找不到返回 NULL
     *
     * T = O(1)
     */
    dictEntry *dictFind(dict *d, const void *key)
    {
        dictEntry *he;
        unsigned int h, idx, table;
    
        // 字典(的哈希表)为空
        if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
    
        // 如果条件允许的话,进行单步 rehash
        if (dictIsRehashing(d)) _dictRehashStep(d);
    
        // 计算键的哈希值
        h = dictHashKey(d, key);
        // 在字典的哈希表中查找这个键
        // T = O(1)
        for (table = 0; table <= 1; table++) {
    
            // 计算索引值
            idx = h & d->ht[table].sizemask;
    
            // 遍历给定索引上的链表的所有节点,查找 key
            he = d->ht[table].table[idx];
            // T = O(1)
            while(he) {
    
                if (dictCompareKeys(d, key, he->key))
                    return he;
    
                he = he->next;
            }
    
            // 如果程序遍历完 0 号哈希表,仍然没找到指定的键的节点
            // 那么程序会检查字典是否在进行 rehash ,
            // 然后才决定是直接返回 NULL ,还是继续查找 1 号哈希表
            if (!dictIsRehashing(d)) return NULL;
        }
    
        // 进行到这里时,说明两个哈希表都没找到
        return NULL;
    }
    
    删除操作
    /*
     * 查找并删除包含给定键的节点
     *
     * 参数 nofree 决定是否调用键和值的释放函数
     * 0 表示调用,1 表示不调用
     *
     * 找到并成功删除返回 DICT_OK ,没找到则返回 DICT_ERR
     *
     * T = O(1)
     */
    static int dictGenericDelete(dict *d, const void *key, int nofree)
    {
        unsigned int h, idx;
        dictEntry *he, *prevHe;
        int table;
    
        // 字典(的哈希表)为空
        if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
    
        // 进行单步 rehash ,T = O(1)
        if (dictIsRehashing(d)) _dictRehashStep(d);
    
        // 计算哈希值
        h = dictHashKey(d, key);
    
        // 遍历哈希表
        // T = O(1)
        for (table = 0; table <= 1; table++) {
    
            // 计算索引值 
            idx = h & d->ht[table].sizemask;
            // 指向该索引上的链表
            he = d->ht[table].table[idx];
            prevHe = NULL;
            // 遍历链表上的所有节点
            // T = O(1)
            while(he) {
            
                if (dictCompareKeys(d, key, he->key)) {
                    // 超找目标节点
    
                    /* Unlink the element from the list */
                    // 从链表中删除
                    if (prevHe)
                        prevHe->next = he->next;
                    else
                        d->ht[table].table[idx] = he->next;
    
                    // 释放调用键和值的释放函数?
                    if (!nofree) {
                        dictFreeKey(d, he);
                        dictFreeVal(d, he);
                    }
                    
                    // 释放节点本身
                    zfree(he);
    
                    // 更新已使用节点数量
                    d->ht[table].used--;
    
                    // 返回已找到信号
                    return DICT_OK;
                }
    
                prevHe = he;
                he = he->next;
            }
    
            // 如果执行到这里,说明在 0 号哈希表中找不到给定键
            // 那么根据字典是否正在进行 rehash ,决定要不要查找 1 号哈希表
            if (!dictIsRehashing(d)) break;
        }
    
        // 没找到
        return DICT_ERR; /* not found */
    }
    
    扩容

    扩容触发条件 — 在redis中触发扩容的条件有很多
    1、每次新增dictEntry时都会调用_dictExpandIfNeeded()函数;
    2、在往hash表中添加元素时也会调用到_dictExpandIfNeeded()函数;
    在_dictExpandIfNeeded()函数中可以看到,如果ht[0]是0,会初始化出一个大小是4的哈希表;或者ht[0].used大于等于ht[0].size,会触发扩容函数;

    /*
     * 创建一个新的哈希表,并根据字典的情况,选择以下其中一个动作来进行:
     *
     * 1) 如果字典的 0 号哈希表为空,那么将新哈希表设置为 0 号哈希表
     * 2) 如果字典的 0 号哈希表非空,那么将新哈希表设置为 1 号哈希表,
     *    并打开字典的 rehash 标识,使得程序可以开始对字典进行 rehash
     *
     * size 参数不够大,或者 rehash 已经在进行时,返回 DICT_ERR 。
     *
     * 成功创建 0 号哈希表,或者 1 号哈希表时,返回 DICT_OK 。
     *
     * T = O(N)
     */
    int dictExpand(dict *d, unsigned long size)
    {
        // 新哈希表
        dictht n; /* the new hash table */
    
        // 根据 size 参数,计算哈希表的大小
        // T = O(1)
        unsigned long realsize = _dictNextPower(size);
    
        /* the size is invalid if it is smaller than the number of
         * elements already inside the hash table */
        // 不能在字典正在 rehash 时进行
        // size 的值也不能小于 0 号哈希表的当前已使用节点
        if (dictIsRehashing(d) || d->ht[0].used > size)
            return DICT_ERR;
    
        /* Allocate the new hash table and initialize all pointers to NULL */
        // 为哈希表分配空间,并将所有指针指向 NULL
        n.size = realsize;
        n.sizemask = realsize-1;
        // T = O(N)
        n.table = zcalloc(realsize*sizeof(dictEntry*));
        n.used = 0;
    
        /* Is this the first initialization? If so it's not really a rehashing
         * we just set the first hash table so that it can accept keys. */
        // 如果 0 号哈希表为空,那么这是一次初始化:
        // 程序将新哈希表赋给 0 号哈希表的指针,然后字典就可以开始处理键值对了。
        if (d->ht[0].table == NULL) {
            d->ht[0] = n;
            return DICT_OK;
        }
    
        /* Prepare a second hash table for incremental rehashing */
        // 如果 0 号哈希表非空,那么这是一次 rehash :
        // 程序将新哈希表设置为 1 号哈希表,
        // 并将字典的 rehash 标识打开,让程序可以开始对字典进行 rehash
        d->ht[1] = n;
        d->rehashidx = 0;
        return DICT_OK;
    
        /* 顺带一提,上面的代码可以重构成以下形式:
        
        if (d->ht[0].table == NULL) {
            // 初始化
            d->ht[0] = n;
        } else {
            // rehash 
            d->ht[1] = n;
            d->rehashidx = 0;
        }
    
        return DICT_OK;
    
        */
    }
    /*
     * 计算第一个大于等于 size 的 2 的 N 次方,用作哈希表的值
     *
     * T = O(1)
     */
    static unsigned long _dictNextPower(unsigned long size)
    {
        unsigned long i = DICT_HT_INITIAL_SIZE;
    
        if (size >= LONG_MAX) return LONG_MAX;
        while(1) {
            if (i >= size)
                return i;
            i *= 2;
        }
    }
    

    扩容大小
    在dictExpand()函数中会创建一个ht[1]做为扩容后的hash表,大小根据入参决定,没有入参或入参小于默认大小4,则创建一个默认值*2大小的哈希表;
    渐进式扩容 — 元素迁移
    以上的函数只是创建了一个新的hash表,但并没有直接执行元素迁移操作,真正的元素迁移是在dictRehash()函数中进行的;
    渐进式扩容 — 执行时机
    在新增、删除、查询和随机获取key(dictGetRandomKey函数)时会进行单步扩容,调用dictRehash()函数,传递参数1表示进行1次元素的迁移;
    需要注意的是,当redis长时间没有增删改查操作进行时,也会定时触发元素迁移,是由redis的删除过期键的定时任务调用的,在指定时间内每次执行100的元素迁移;

    /* 
     *
     * 执行 N 步渐进式 rehash 。
     *
     * 返回 1 表示仍有键需要从 0 号哈希表移动到 1 号哈希表,
     * 返回 0 则表示所有键都已经迁移完毕。
     *
     *
     * 注意,每步 rehash 都是以一个哈希表索引(桶)作为单位的,
     * 一个桶里可能会有多个节点,
     * 被 rehash 的桶里的所有节点都会被移动到新哈希表。
     *
     * T = O(N)
     */
    int dictRehash(dict *d, int n) {
    
        // 只可以在 rehash 进行中时执行
        if (!dictIsRehashing(d)) return 0;
    
        // 进行 N 步迁移
        // T = O(N)
        while(n--) {
            dictEntry *de, *nextde;
    
            /* Check if we already rehashed the whole table... */
            // 如果 0 号哈希表为空,那么表示 rehash 执行完毕
            // T = O(1)
            if (d->ht[0].used == 0) {
                // 释放 0 号哈希表
                zfree(d->ht[0].table);
                // 将原来的 1 号哈希表设置为新的 0 号哈希表
                d->ht[0] = d->ht[1];
                // 重置旧的 1 号哈希表
                _dictReset(&d->ht[1]);
                // 关闭 rehash 标识
                d->rehashidx = -1;
                // 返回 0 ,向调用者表示 rehash 已经完成
                return 0;
            }
    
            /* Note that rehashidx can't overflow as we are sure there are more
             * elements because ht[0].used != 0 */
            // 确保 rehashidx 没有越界
            assert(d->ht[0].size > (unsigned)d->rehashidx);
    
            // 略过数组中为空的索引,找到下一个非空索引
            while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
    
            // 指向该索引的链表表头节点
            de = d->ht[0].table[d->rehashidx];
            /* Move all the keys in this bucket from the old to the new hash HT */
            // 将链表中的所有节点迁移到新哈希表
            // T = O(1)
            while(de) {
                unsigned int h;
    
                // 保存下个节点的指针
                nextde = de->next;
    
                /* Get the index in the new hash table */
                // 计算新哈希表的哈希值,以及节点插入的索引位置
                h = dictHashKey(d, de->key) & d->ht[1].sizemask;
    
                // 插入节点到新哈希表
                de->next = d->ht[1].table[h];
                d->ht[1].table[h] = de;
    
                // 更新计数器
                d->ht[0].used--;
                d->ht[1].used++;
    
                // 继续处理下个节点
                de = nextde;
            }
            // 将刚迁移完的哈希表索引的指针设为空
            d->ht[0].table[d->rehashidx] = NULL;
            // 更新 rehash 索引
            d->rehashidx++;
        }
    
        return 1;
    }
    
    总结

    dict结构在redis中是最常用到的结构,所以一定要看一遍源码,需要注意的地方,在上面也已经讲的很详细了,这里附一个有注解的redis3.0源码;
    https://github.com/huangz1990/redis-3.0-annotated

    相关文章

      网友评论

          本文标题:源码分析:Redis hash和Java HashMap、Con

          本文链接:https://www.haomeiwen.com/subject/pfpueltx.html