美文网首页个人学习
图解 Java8 ConcurrentHashMap 常用方法源

图解 Java8 ConcurrentHashMap 常用方法源

作者: 爱打乒乓的程序员 | 来源:发表于2020-02-24 18:14 被阅读0次

简介

众所周知,在 HashMap 作为共享变量的场景下,是线程不安全的。在 Java8 下,我们可以使用Collections.synchronizedMap(Map<K,V> m) 将 HashMap 对象包装为线程安全,其实就是在 HashMap 每个方法都加上 synchronized 加锁保证线程安全。但是也有一些问题,读写操作都会加锁,在读并发量大的情况下性能并不高,所以这篇文章介绍另外一种保证 Map 在多线程环境下线程安全的并发类 — — ConcurrentHashMap

以下内容只显示部分常用方法的源码和部分代码逻辑,因为没有必要每一行每一个方法都要细读,用到的时候再看,否则一开始就精读每个方法的源码,会给你学习 ConcurrentHashMap 带来不少阻碍。By the way,理解 ConcurrentHashMap 的源码,建议首先看 HashMap 的源码,因为很多代码逻辑都是相似,只不过就是在 HashMap 基础上添加支持并发的逻辑,如果熟悉 HashMap 的源码之后,阅读 ConcurrentHashMap 的源码难度就不大啦 ~ 若不熟悉 HashMap 源码的朋友,可以查看我的拙作 图解 Java8 HashMap 常用方法源码(附常见面试题和答案),或者参考其它优秀的博客。

ConcurrentHashMap 和 HashMap 两者的异同:

相似之处:

  • 底层数据结构相同,都是 数组 + 链表 + 红黑树 结构
  • 都实现了 Map 接口,继承了 AbstractMap 抽象类,所以两者大部分使用都是相同的,只不过就是 ConcurrentHashMap 保证了线程安全

不同之处:

  • HashMap 键值对允许 null 值,而 ConcurrentHashMap 键值对都不允许 null 值
  • 红黑树的结构不一样。HashMap 红黑树节点为 TreeNode,而 ConcurrentHashMap 红黑树节点为 TreeNode + TreeBin
  • ConcurrentHashMap 扩容是通过 ForwardingNode 节点保证扩容时线程安全。

首先,先从 ConcurrentHashMap 的 Demo 入手吧!

示例:

    public static void main(String[] args) {
        ConcurrentHashMap<Integer,Integer> concurrentHashMap = new ConcurrentHashMap(16);
        for (int i = 0; i < 10; i++) {
            concurrentHashMap.put(i,i);
        }
        System.out.println(concurrentHashMap.get(1));
        System.out.println(concurrentHashMap.remove(2));
        System.out.println(concurrentHashMap.size());
        System.out.println(concurrentHashMap.mappingCount());
    }

输出结果:

1
2
9
9

ConcurrentHashMap 源码剖析

数据结构

ConcurrentHashMap 基本数据结构为数组,当有碰撞的时候会在同一个 hash 位置上形成一个链表,当链表长度大于 8 且数组的容量大于 64 则将链表转化为红黑树。

链表节点、树节点和转移节点

首先了解 ConcurrentHashMap 几个节点的声明和作用

    // 链表节点
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        // 下面还有一坨,先省略
    }
    // 红黑树节点,维护节点属性和查找功能
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent; 
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;
        boolean red;
        TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        // 下面还有一坨,先省略
    }
    // 维护红黑树结构,负责根节点的加锁和解锁
    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        static final int WRITER = 1; // 已经获得写锁状态
        static final int WAITER = 2; // 等待写锁状态
        static final int READER = 4; // 增加数据时读锁的状态
        // 下面还有一坨,先省略
    }
    // 转移节点,扩容时保证线程安全
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        // 下面还有一坨,先省略
    }

构造函数

    public ConcurrentHashMap() {
    }

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    /**
     * 在JDK1.8之前本质是 ConcurrentHashMap 分段锁总数,表示同时更新 ConcurrentHashMap 且不产生锁竞争的最大线程数;
     * 在JDK1.8中,仅在构造器中确保初始容量>=concurrentLevel,为兼容旧版本而保留;
     */
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)
            initialCapacity = concurrencyLevel;
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

我们可以根据构造方法自定义 ConcurrentHashMap 数组的容量、阀值、加载因子。建议在开发过程中不要更改加载因子,最好能够根据业务情况给 ConcurrentHashMap 数组适当的容量,避免频繁扩容,影响性能。

初始化 ConcurrentHashMap 数组:initTable()

通过自旋 + CAS + 双重 check 保证了数组初始化时的线程安全

    // 初始化 table
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 自旋保证初始化成功
        while ((tab = table) == null || tab.length == 0) {
            // 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
            if ((sc = sizeCtl) < 0)
                Thread.yield();
                // CAS 赋值保证当前只有一个线程在初始化,-1 代表当前只有一个线程能初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 再次判断 table 是否被初始化,因为其他线程可能已经抢先初始化
                    if ((tab = table) == null || tab.length == 0) {
                        // 初始化数组大小
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

添加数组节点 put(K key, V value)

    // 计算key的hash值,与HashMap相似,hashCode值的高16位和低16位进行异或运算,这样可以尽量避免一些hash值出现冲突
    // 同时和 HASH_BITS 与运算是为了保证结果是正数(因为hashCode的结果有可能是负数)
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

    // key 和 value 都不可以为null
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key 或 value 为null则抛出异常
        if (key == null || value == null) throw new NullPointerException();
        // 取得key的hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果数组未被初始化则初始化数组
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 当前数组索引位置节点为null,则可以直接插入新增节点
                // 无锁的CAS操作插入节点
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;
            }
            else if ((fh = f.hash) == MOVED) // 当前Map在扩容,先协助扩容,再更新值。
                tab = helpTransfer(tab, f); // 协助扩容
            else { // 有 hash 冲撞
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) { // 再次判断 f 节点是否为第一个节点,防止其他线程已修改f节点(可能扩容后索引位置的节点不是 f 了)
                        if (fh >= 0) { // 节点结构为链表
                            binCount = 1;
                            // 循环链表,将节点放入链表中,与 HashMap 的逻辑相似
                            // 如果节点的hash和key与参数的hash和key一样则进行覆盖操作,否则就在链表尾部插入新节点
                            // 和 HashMap.put 源码不一样的是,还有一个binCount变量累加,这是判断链表的节点数是否超出8,超出的话则转为红黑树
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 节点的结构为树
                            Node<K,V> p;
                            binCount = 2;
                            // 以红黑树的算法插入节点
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 链表长度超出阀值转为红黑树结构
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果是覆盖操作,则返回被覆盖的值
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 检查当前容量是否需要进行扩容
        addCount(1L, binCount);
        return null;
    }

put 方法源码比较长,以下结合源码画出一张流程图协助理解。

简单来说,执行添加节点方法包括几个核心操作:
1.检查数组是否初始化
2.根据索引位置的节点判断是否为空节点,如果是空节点则直接添加节点,否则则判断该节点是链表节点还是红黑树的节点;在迭代链表节点或红黑树的节点过程中,如果找到 key 相同的节点则执行覆盖操作,否则添加到链表尾部或红黑树子节点
3.如果当前线程执行添加操作过程中,遇到其它线程正在扩容则会停下来协助扩容,扩容结束后再继续执行插入操作
4.如果链表长度超过 8 且数组节点数量超过 64 则会转化为红黑树结构
5.新增完成后,检查是否需要扩容,如果需要扩容的话执行扩容方法

获取节点 get(Object key)

    // 根据参数 key 获取节点
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        // 判断数组是否被初始化、数组长度是否大于0,并且根据计算出来的 hashCode 寻址找到不为 null 的节点
        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
            // 判断节点的hash与根据参数计算出来的hash是否一样,因为节点的hash值在扩容的时候,hash值会变成小于0
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0) // 如果当前hash值<0,说明正在扩容,调用ForwardingNode的find方法来定位到nextTable
                return (p = e.find(h, key)) != null ? p.val : null;
            // 循环链表或者红黑树
            while ((e = e.next) != null) {
                if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

相比于 put 方法,get 方法就简洁多了。核心逻辑就是判断数组中是否存在 key 、数组是否正在扩容、节点是否为链表或红黑树结构。


获取 ConcurrentHashMap 的元素个数 —— size() 和 mappingCount()

注意:由于可能多个线程操作,所以返回的元素个数只是估计值,不一定准确!
    // 实际上调用 sumCount() 获取元素个数
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
    }

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

    // 如果 counterCells 对象不是 null,则遍历 counterCells 数组,baseCount 累加对象的值
    // 大致可以这样理解,baseCount 是统计当前的节点总数,但是当多个线程同时执行 CAS 修改 baseCount值,
    // 失败的线程会将值放到 CounterCell 中,所以为了尽量返回接近实际值,就将 baseCount 和 CounterCell 的所有元素加上
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    /**
     * Returns the number of mappings. This method should be used
     * instead of {@link #size} because a ConcurrentHashMap may
     * contain more mappings than can be represented as an int. The
     * value returned is an estimate; the actual count may differ if
     * there are concurrent insertions or removals.
     * 大意就是建议使用 mappingCount() 代替 size() 方法;
     * 因为 ConcurrentHashMap 包含的映射数量可能超出 int 类型的最大范围(size方法的返回值是int,mappingCount方法的返回值是long)
     * 由于存在并发,所以返回的值不一定准确
     */
    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

不难发现,无论是使用 size() 还是 mappingCount() 方法其核心都是调用 sumCount() 方法。通过计算 bashCount 和 CounterCell 的元素总和返回 ConcurrentHashMap 的节点个数(不一定准确)。

删除节点 remove(Object key) 和 clear()

    // 删除数组节点
    public V remove(Object key) {
        return replaceNode(key, null, null);
    }

    // 删除节点的核心逻辑
    final V replaceNode(Object key, V value, Object cv) {
        // 计算key的hashCode
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果数组还没初始化或者数组不存在key直接跳出循环返回null
            if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED) // 如果数组正在扩容,会先协助扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                // 节点加锁,保证节点的删除是原子性。过程与 HashMap 的逻辑差不多。
                // 设置 f 的前驱节点 pred 的后继节点为 f 的后继节点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) { // 节点结构为树
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            // 节点数量减1
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

    // 清除所有的数组节点
    public void clear() {
        long delta = 0L;
        int i = 0;
        Node<K,V>[] tab = table;
        while (tab != null && i < tab.length) {
            int fh;
            Node<K,V> f = tabAt(tab, i);
            if (f == null)
                ++i;
            else if ((fh = f.hash) == MOVED) {// 先协助扩容,再删除所有元素。这是为了避免删除元素之后,又有新的节点添加进来
                tab = helpTransfer(tab, f);
                i = 0;// 重新遍历数组
            }
            else {
                // 节点加锁,保证节点的删除具有原子性,无论是链表还是红黑树,都是将数组对应的索引值设置为null
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null);
                        while (p != null) {
                            --delta;
                            p = p.next;
                        }
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        // 更新数组节点数量
        if (delta != 0L)
            addCount(delta, -1);
    }

remove方法和clear方法逻辑并不难,看源码注释结合流程图应该就能够理解。

好了,接下来就是ConcurrentHashMap的核心 —— 扩容啦!

粤语,大意是事情的进展非常顺利

ConcurrentHashMap 的核心 —— 扩容(addCount、helpTransfer、tryPresize、transfer)

    // 计算数组中的节点个数是否大于阈值,如果超出阀值则协助扩容
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // CAS 更新 baseCount
        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 多线程 CAS 发生失败时执行
            if (as == null
                    || (m = as.length - 1) < 0
                    || (a = as[ThreadLocalRandom.getProbe() & m]) == null
                    || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 循环更新 basecount
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // 检查是否需要扩容,默认 check 是 1
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 数组的节点数量大于等于阈值 sizeCtl,并且数组不为空,数组长度小于最大容量则扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
                // 根据数组长度得到一个标识
                int rs = resizeStamp(n);
                // sc < 0 表示其他线程已经在扩容,尝试帮助扩容
                if (sc < 0) {
                    // 判断扩容是否结束,如果结束则跳出循环
                    // (sc >>> RESIZE_STAMP_SHIFT) != rs 代表 sizeCtl 发送了变化
                    // sc == rs + 1 代表扩容结束,不再有线程进行扩容
                    // sc == rs + MAX_RESIZERS 协助线程数已经达到最大
                    // (nt = nextTable) == null 代表扩容结束
                    // transferIndex <= 0 代表转移状态发生了变化
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // 其他线程正在扩容,sc + 1 表示多了一个线程在帮助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 执行到这一步,代表当前对象没有正在扩容,CAS 更新 sizectl,向左移16+2位,变成负数
                    // 仅当前线程在扩容
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    // 协助扩容
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 节点数组不为空,而且 f 节点的类型是 ForwardingNode 表明数组正在扩容
        // (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null 表示迁移完成了,详见transfer()
        if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            // 表示在扩容
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
                // sc >>> RESIZE_STAMP_SHIFT) != rs 说明扩容完毕或者有其它协助扩容者
                // sc == rs + 1 表示只剩下最后一个扩容线程了,其它都扩容完毕了
                // transferIndex <= 0 扩容结束了
                // sc == rs + MAX_RESIZERS 到达最大值
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // CAS 更新 sizeCtl=sizeCtl+1,表示新增加一个线程辅助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

    // 扩容
    private final void tryPresize(int size) {
        // 如果给定的容量>=MAXIMUM_CAPACITY的一半,直接扩容到允许的最大值,否则调用函数计算合适的容量值
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // 如果 sizeCtl 小于0,说明数组还没有被初始化
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                // CAS将SIZECTL状态置为-1,表示正在进行初始化
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        // 确认其他线程没有对table修改
                        if (table == tab) {
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);// 无符号右移2位,此即0.75*n
                        }
                    } finally {
                        sizeCtl = sc; // 更新扩容阀值
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY) // 若欲扩容值不大于原阀值,或现有容量>=最值,什么都不用做了
                break;
            else if (tab == table) { // table不为空,且在此期间其他线程未修改table
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

    // 扩容
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        // 变量 stride 代表每个线程负责的迁移的数量
        int n = tab.length, stride;
        // 计算每个线程需要负责的迁移数量。
        // 如果 cpu 数量大于 1,迁移数量为 n >>> 3(也就是除以8) / cpu 个数,否则就数组长度。
        // 如果小于 MIN_TRANSFER_STRIDE 就直接设置线程需要负责的迁移数量为MIN_TRANSFER_STRIDE
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;
        // 初始化新数组
        if (nextTab == null) {
            try {
                // 新数组的容量为原数组的两倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        // 新数组的长度
        int nextn = nextTab.length;
        // 创建一个 ForwardingNode 节点,用于标记
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //并发扩容的关键属性 如果等于true 说明这个节点已经处理过
        boolean advance = true;
        // 完成状态,如果为true,就结束方法
        boolean finishing = false;
        //通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化i和bound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点;
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) { //transferIndex<=0表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出
                    i = -1;
                    advance = false;
                }
                // CAS 操作,为当前线程分配数组索引区间
                else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // i < 0 ,表示数据迁移已经完成
            // i >= n 和 i + n >= nextn 表示最后一个线程也执行完成了,扩容完成了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 如果 finishing=true,说明扩容所有工作完成,然后跳出循环
                if (finishing) {
                    // 删除成员变量,方便GC
                    nextTable = null;
                    // 更新table
                    table = nextTab;
                    // 更新阈值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 表示一个线程退出扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 说明还有其他线程正在扩容
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // 当前线程为最后一个线程,负责再检查一个整个队列
                    finishing = advance = true;
                    i = n;
                }
            }
            else if ((f = tabAt(tab, i)) == null)//  第一个线程获取i处的数据为null,
                advance = casTabAt(tab, i, null, fwd);// 设置当前节点为 fwd 节点
            else if ((fh = f.hash) == MOVED)// 如果当前节点为 MOVED,说明已经处理过了,直接跳过
                advance = true;
            else {
                // 节点不为空,锁住i位置的头结点
                synchronized (f) {
                    // 再次核对,防止其他线程对该hash值进行修改
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // 说明该位置存放的是普通节点,以下的操作类似 HashMap,节点的hash和数组长度与运算
                        // 如果为0则直接将原先的索引位置插入到新数组,否则重新计算数组索引位置并插入
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // CAS 操作保证节点插入是原子性的
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) { // 节点是红黑树结构
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

扩容部分的代码真的好长好长,如果你能够看到这里,我敬你是一条汉子!说实话,我看着都脑壳疼

扩容是 ConcurrentHashMap 的核心源码,其复杂度也大于 ConcurrentHashMap 其他的方法,所以理解难度也提高不少。如有疑问或者错误,麻烦在评论区告知我一下~

总结

HashMap 与 ConcurrentHashMap 在实际开发和面试都是高频的出现的,所以需要我们对其有深入的了解,这样才能避免入坑。同时还需要知道,ConcurrentHashMap 在JDK1.8之后作了优化,采用了 Synchronized + CAS + Node + Unsafe 取代了 JDK1.8 之前的 Segment + HashEntry + Unsafe 实现。用 Synchronized + CAS 代替 Segment ,这样锁的粒度更小,降低实现的复杂度。

参考资料:
https://www.jianshu.com/p/5dbaa6707017
https://blog.51cto.com/14220760/2395683?source=dra

相关文章

网友评论

    本文标题:图解 Java8 ConcurrentHashMap 常用方法源

    本文链接:https://www.haomeiwen.com/subject/fnctoctx.html