美文网首页
图解ConcurrentHashMap

图解ConcurrentHashMap

作者: 水欣 | 来源:发表于2018-03-07 12:09 被阅读0次

    概述

    上篇文章介绍了HashMap在多线程并发情况下是不安全的,多线程并发推荐使用ConcurrentHashMap,那么ConcurrentHashMap是什么?它的设计思想是什么,源码是怎么实现呢?

    ConcurrentHashMap 是什么

    Concurrent翻译过来是并发的意思,字面理解它的作用是处理并发情况的HashMap,在介绍它之前先回顾下之前的知识。通过前面文章学习,我们知道多线程并发下HashMap是不安全的(如死循环),更普遍的是多线程并发下,由于堆内存对各个线程是共享的,而HashMap的put方法不是原则操作,假设Thread1先put值,然后sleep 2s(也可以是系统时间片切换失去执行权),在这2秒内值被Thread2改了,Thread1"醒来"再get的时候发现已经不是原来的值了,这就容易出问题。


    image.png

    那么如何避免这种多线程“奥迪变奥拓”的情况呢?常规思路就是给HashMap的put方法加锁(synchronized),保证同一个时刻只允许一个线程拥有对hashMap有写的操作权限即可。然而假如线程1中操作耗时,占着茅坑半天不出来,其他需要操作该hashMap的线程就需要在门口排队半天,严重影响用户体验(HashTable就是这么干的)。举个生活中的例子,很多银行除了存取钱,还支持贵重物品,贵重物品都放在保险箱里,把HashMap和HashTable比作银行,结构:


    image.png
    把线程比作人,对应的情况如下:
    • HashMap牌银行:我们的服务宗旨是不用排队,同一时间多人都有机会修改保险柜里的东西,你以为你存的是美元?取出来的其实是日元,破产就在一瞬间。
    • HashTable牌银行:我们的服务宗旨是要排队,同一时间只有一个人有机会修改保险柜里的东西,其余的人只能看不能动手改,保你存的是美元取得还是美元。你说如果那人在里面睡着了不出来怎么办?不要着急,来,坐下来打会麻将等他出来。


      image.png

      多线程下用HashMap不确定性太高,有破产的风险,不能选;用HashTable不会破产,但是用户体验不太好,那么怎样才能做到多人存取既不影响他人存值,也不用排队呢?有人提议搞个“银行者联盟”,多开几个想HashTable这种“带锁”的银行就好了,有多少人办理业务,就开多少个银行,一对一服务,这个区都是大老板,开银行的成本都是小钱,于是“银行者联盟”成立了。
      接下来的情况是这样的:比如盖伦和亚索一起去银行存他们的大宝剑,这个“银行者联盟”一顿操作,然后对盖伦说,1号银行现在没人,你可以去那存,不用排队,然后盖伦就去1号银行存他的大宝剑,1号银行把盖伦接进门,马上拉闸,然后把盖伦的大宝剑放在第x行第x个保险箱,等盖伦办妥离开后,再开闸;同样“银行者联盟”对亚索说,2号银行现在没人,你可以去那存,不用排队,然后亚索去2号银行存他的大宝剑,2号银行把亚索接进门,马上拉闸,一顿操作把亚索的大宝剑放在第x行第x号保险箱,等亚索离开后再开闸,此时不管盖伦和亚索在各自银行里面待多久都不会影响彼此,不用担心自己的大宝剑被人偷换了。这就是ConcurrentHashMap的设计思路,用一个图来理解


      image.png
      从上图可以看出,此时锁的对应的单个银行,而不是整个“银行者联盟”。分析这种设计的特点:
    • 多个银行组成的“银行者联盟”
    • 当有人来办理业务时,“银行者联盟”需要确定这个人去哪个银行
    • 当此人去到指定银行办理业务后,该银行上锁,其他人不能同时执行修改操作,直到此人离开后解锁
      由这几点基本思想可以引发一些思考,比如:
    1. 成立“银行者联盟”时初始银行数是多少?怎么设计合理?
      上面这张图没有给出是否需要排队的结论,这是因为需要结合实际情况分析,比如初始化有16个银行,只有两个人来办理业务,那自然不需要排队;如果现在16个银行都有人在办理业务,这时候来了第17个人,那么他还是需要排队的。由于“银行者联盟”事先无法得知会有多少人来办理业务,所以在它创立的时候需要制定一个“标准”,即初始银行数量,人多的情况“银行者联盟”应该多开几家银行,避免别人排队;人少的情况应该少开,避免浪费钱
    2. 当有人来办理业务的时候,“银行者联盟”怎么确定此人去哪个银行?
      正常情况下,如果所有银行都是未上锁状态,那么有人来办理业务去哪儿都不用排队,当其中有些银行已经上锁,那么后续“银行者联盟”给人推荐的时候就不能把客户往上锁的银行引了。因此“银行者联盟”需要时刻保持清醒的头脑,对自己的银行空闲情况了如指掌,每次给用户推荐都应该是最好的选择。
    3. “银行者联盟”怎么保证同一时间不会有两个人在同一个银行拥有存权限?
      通过你对指定银行加锁/解锁的方式实现。

    源码分析

    java7 源码分析

    通过Java7的源码分析下代码实现,先看下一些重要的成员

    //默认的数组大小16(HashMap里的那个数组)
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //扩容因子0.75
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
     
    //ConcurrentHashMap中的数组
    final Segment<K,V>[] segments
    //默认并发标准16
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //Segment是ReentrantLock子类,因此拥有锁的操作
     static final class Segment<K,V> extends ReentrantLock implements Serializable {
      //HashMap的那一套,分别是数组、键值对数量、阈值、负载因子
      transient volatile HashEntry<K,V>[] table;
      transient int count;
      transient int threshold;
      final float loadFactor;
      Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
                this.loadFactor = lf;
                this.threshold = threshold;
                this.table = tab;
            }
     }
     
     //换了马甲还是认识你!!!HashEntry对象,存key、value、hash值以及下一个节点
     static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;
     }
    //segment中HashEntry[]数组最小长度
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //用于定位在segments数组中的位置,下面介绍
    final int segmentMask;
    final int segmentShift;
    

    上面这些一下出来有点接受不了没关系,下面都会介绍到
    接下来从最简单的初始化开始分析

    ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
    

    默认构造函数会调用带三个参数的构造函数

    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        //步骤① start
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //步骤① end
        //步骤② start
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        //步骤② end
        // create segments and segments[0]
        //步骤③ start
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
        //步骤③ end
    }
    

    上面定义了许多临时变量,注释写的又少,第一次看名字根本不知道这鬼东西代表什么意思,不过我们可以把已知的数据带进去,算出这些变量的值,再分析。假设这是死一次默认创建:

    • 步骤① concurrencyLevel = 16,可以计算出 sshift=4,ssize=16,segmentShift=28,segmentMask=15;
    • 步骤② c=16/16=1,cap=2
    • 步骤③有句注释,创建Segment数组segments并初始化segments[0],所以s0初始化后数组长度为2,负载因子0.75,阈值为1;再看这里的ss的初始化(重点,圈起来要考!!!),ssize此时为16,所以默认数组长度16,给人一种感觉正好和我们传的concurrencyLevel一样?看下下面的例子
    例子1 例子2
    ssize = 1,concurrencyLevel = 10 ssize = 1,concurrencyLevel = 8
    ssize <<= 1 —> 2<10 满足 ssize <<= 1 —> 2<10 满足
    ssize <<= 1 —> 4<10 满足 ssize <<= 1 —> 4<10 满足
    ssize <<= 1 —> 8<10 满足 ssize <<= 1 —> 8<10 不满足 ssize = 8
    ssize <<= 1 —> 16<10 不满足 ssize = 16

    所以我们传concurrencyLevel不一定就是最后数组的长度,长度的计算公式:

    长度=2的n次方(2的n次方>=concurrencyLevel)
    

    到这里只是创建了一个长度为16的Segment数组,并初始化数组0号位置,segmentShift和segmentMast还没派上用场,画图存档:


    image.png

    接着看put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        //步骤①注意valus不能为空!!!
        if (value == null)
            throw new NullPointerException();
        //根据key计算hash值,key也不能为null,否则hash(key)报空指针
        int hash = hash(key);
        //步骤②派上用场了,根据hash值计算在segments数组中的位置
        int j = (hash >>> segmentShift) & segmentMask;
        //步骤③查看当前数组中指定位置Segment是否为空
        //若为空,先创建初始化Segment再put值,不为空,直接put值。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
    

    步骤①可以看到和HashMap的区别,这里的key/value 为空会报空指针异常;步骤②先根据key值计算hash值,再和前面算出来的两个变量计算出这个key应该放在哪个Segment中(具体怎么计算的有兴趣可以去研究下,先高位运算再取余),假设我们算出来该键值对应该放在5号,步骤③判断5号为空,看下ensureSegment()方法

    private Segment<K,V> ensureSegment(int k) {
            //获取segments
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                //拷贝一份和segment 0一样的segment
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                //大小和segment 0一致,为2
                int cap = proto.table.length;
                //负载因子和segment 0一致,为0.75
                float lf = proto.loadFactor;
                //阈值和segment 0一致,为1
                int threshold = (int)(cap * lf);
                //根据大小创建HashEntry数组tab
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                //再次检查
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    根据已有属性创建指定位置的Segment
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
        }
    

    该方法重点在于拷贝了segments[0],因此先创建的Segment与segment[0]的配置相同,由于多个线程都会可能执行该方法,因此这里通过UNSAFE的一些原子性操作的方法做了多次的检查,到目前为止画图存档:


    image.png

    现在“舞台”也有了,请开始你的表演,看下Segment的put方法

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        //步骤① start
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        //步骤① end
        V oldValue;
        try {
            //步骤② start
            //获取Segment中的HashEntry[]
            HashEntry<K,V>[] tab = table;
            //算出在HashEntry[]中的位置
            int index = (tab.length - 1) & hash;
            //找到HashEntry[]中的指定位置的第一个节点
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                //如果不为空,遍历这条链
                if (e != null) {
                    K k;
                    //情况① 之前已存过,则替换原值
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    //情况② 另一个线程的准备工作
                    if (node != null)
                        //链表头插入方式
                        node.setNext(first);
                    else //情况③ 该位置为空,则新建一个节点(注意这里采用链表头插入方式)
                        node = new HashEntry<K,V>(hash, key, value, first);
                    //键值对数量+1
                    int c = count + 1;
                    //如果键值对数量超过阈值
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        //扩容
                        rehash(node);
                    else //未超过阈值,直接放在指定位置
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    //插入成功返回null
                    oldValue = null;
                    break;
                }
            }
        //步骤② end
        } finally {
            //步骤③
            //解锁
            unlock();
        }
        //修改成功,返回原值
        return oldValue;
    }
    

    上面的put方法其实和Java7 HashMap里大致是一样的,只是多了加锁/解锁两步,也正因为这样才保证了同一时刻只有一个线程拥有修改的权限。按步骤分析上面的流程:

    • 步骤①执行tryLock方法获取锁,拿到锁返回null,没拿到锁执行scanAndLockForPut方法;
    • 步骤②和HashMap里的那一套思路是一样的,不理解可以看下之前的文章介绍(情况②下面介绍);
    • 步骤③执行unLock方法解锁。
      假设现在Thread1进来存值,前面没人来过,它可以成功拿到锁,根据计算,得出它要存的键值对应该放在HashEntry[] 的0号位置,0号位置为空,于是新建一个HashEntry,并通过setEntryAt()方法,放在0号位置,然而还没等Thread1释放锁,系统的时间片切到了Thread2,先画图存档


      image.png

      Thead2也来存值,通过前面的计算,恰好Thread2也被定位到segments[5],接下来Thread2尝试获取锁,没有成功(Thread1还未释放),执行scanAndLockForPut()方法:

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        //通过Segment和hash值寻找匹配的HashEntry
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        //重试次数
        int retries = -1; // negative while locating node
        //循环尝试获取锁
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            //步骤①
            if (retries < 0) {
                //情况① 没找到,之前表中不存在
                if (e == null) {
                    if (node == null) // speculatively create node
                        //新建 HashEntry 备用,retries改成0
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                //情况② 找到,刚好第一个节点就是,retries改成0
                else if (key.equals(e.key))
                    retries = 0;
                //情况③ 第一个节点不是,移到下一个,retries还是-1,继续找
                else
                    e = e.next;
            }
            //步骤②
            //尝试了MAX_SCAN_RETRIES次还没拿到锁,简直B了dog!
            else if (++retries > MAX_SCAN_RETRIES) {
                //泉水挂机
                lock();
                break;
            }
            //步骤③
            //在MAX_SCAN_RETRIES次过程中,key对应的entry发生了变化,则从头开始
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }
    

    通过上面的注释分析可以看出,Thread3虽然此刻没有权限修改,但是它也没闲着,利用等锁的这个时间,把自己要存放的键值对在数组中那个位置计算出来了,这样当Thread2一拿到锁就可以立马定位到具体位置操作,节省时间。上面的步骤③稍微解释下,比如Thread2通过查找得知自己要修改的值在0号位置,但在Thread1里面又把该值改到了1号位置,如果它还去0号操作那肯定出问题了了,所以需要重新确定。
    假设Thread2 put值为(“亚索”,“98”),对应1号位置,那么在scanAndLockForPut方法对应情况①,画图存档:


    image.png

    再回到Segment put方法中的情况②,当Thread1 释放锁后,Thread2 持有锁,并准备把亚索放在1号位置,然而此时Segment[5]里的键值对数量2>阈值1,所以调用rehash()方法扩容

    private void rehash(HashEntry<K,V> node) {
        /*
         * Reclassify nodes in each list to new table.  Because we
         * are using power-of-two expansion, the elements from
         * each bin must either stay at same index, or move with a
         * power of two offset. We eliminate unnecessary node
         * creation by catching cases where old nodes can be
         * reused because their next fields won't change.
         * Statistically, at the default threshold, only about
         * one-sixth of them need cloning when a table
         * doubles. The nodes they replace will be garbage
         * collectable as soon as they are no longer referenced by
         * any reader thread that may be in the midst of
         * concurrently traversing table. Entry accesses use plain
         * array indexing because they are followed by volatile
         * table write.
         */
        //旧数组引用
        HashEntry<K,V>[] oldTable = table;
        //旧数组长度
        int oldCapacity = oldTable.length;
        //新数组长度为旧数组的2倍
        int newCapacity = oldCapacity << 1;
        //修改新的阈值
        threshold = (int)(newCapacity * loadFactor);
        //创建新表
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
        int sizeMask = newCapacity - 1;
        //遍历旧表
        for (int i = 0; i < oldCapacity ; i++) {
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                //确定在新表中的位置
                int idx = e.hash & sizeMask;
                //情况① 链表只有一个节点,指定转移到新表指定位置
                if (next == null)   //  Single node on list
                    newTable[idx] = e;
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;
                    int lastIdx = idx;
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        //情况② 扩容前后位置发生改变
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    //将改变的键值对放到新表的对应位置
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                    //情况③ 把链表中剩下的节点拷到新表中
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        //添加新的节点(链表头插入方式)
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }
    

    同样是扩容转移,这里的代码比HashMap中的transfer多了一些操作,在上上篇学习HashMap扩容可知,扩容后键值对的新位置要么和原位置一样,要么等于新位置+旧数组的长度,所以画个图来理解下上面代码这么写的原因


    image.png

    前提:当前HashEntry[]长度为8,阈值为8*0.75=6,所以put第7个键值对需要扩容,盖伦和亚索扩容前后位置不变,妖姬和卡特扩容后位置需要加上原数组长度,所以执行上面代码流程:


    image.png
    image.png
    image.png
    image.png
    image.png
    image.png

    上面的代码先找出扩容前后需要转移的节点,先执行转移,然后再把该条链上剩下的节点转移,之所以这么写是起到复用的效果,注释中也说了,在使用默认阈值的情况下只有大约1/6的节点需要被clone。注意到目前为止,可以看到无论是扩容转移还是新增节点,java7都是采用头插入方式,流程图如下:


    image.png
    相比之下,get方法没有加锁/解锁的操作,代码比较简单就不分析了。

    稍微说下Java8

    Java8 对比Java7有很大的不同,比如取消了Segments数组,允许并发扩容
    先看下ConcurrentHashMap的初始化

    public ConcurrentHashMap() {
    }
    

    和Java7不一样,这里是个空方法,那么它具体的初始化操作呢?直接看下put方法

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key/value不能为空!!!
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //注释① 表为null则初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //CAS方法判断指定位置是否为null,为空则通过创建新节点,通过CAS方法设置在指定位置
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //当前节点正在扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //指定位置不为空
            else {
                V oldVal = null;
                //注释② 加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //节点是链表的情况
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历整体链
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果已存在,替换原值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //如果是新加节点,则以尾部插入实现添加
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //节点是红黑树的情况
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            //遍历红黑树
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    //链表中节点个数超过8转成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //注释③ 添加节点
        addCount(1L, binCount);
        return null;
    }
    

    代码有点长,主要是因为引入了红黑树的判断和操作,以及线程安全的操作。同样key/value为空会报空指针异常,这也是和HashMap一个明显的区别。

    注释①

    调用initTable初始化数组

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl小于0,当前线程让出执行权
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //CAS 操作将 sizeCtl 值改为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //默认创建大小为16的数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //初始化完再改回来
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    put方法并没有加锁,那么它是如何保证创建新表的时候并发安全呢?答案就是这里的sizeCtl, sizeCtl默认值为0,当一个线程初始化数组时,会将sizeCtl改成-1,由于被volatile修饰,对于其他线程来说这个变化是可见的,上面代码看到后续线程判断sizeCtl小于0就会让出执行权。

    注释②

    Java8 摒弃了Segment,而是对数组中单个位置加锁。当指定位置节点不为null时,情况与Java8 HashMap 操作类似,新节点的添加还是尾部插入方式。

    注释③

    不管是链表的还是红黑树,确定之后总的节点数会加1,可能会引起扩容,Java8 ConcurrentHashMap 支持并发扩容,之前扩容总是由一个线程将旧数组中的键值对转移到新数组中,支持并发的话,转移所需要的时间就可以缩短了,当前相应的并发处理逻辑也就更复杂了,扩容转移通过transfer方法完成,Java8中该方法很长,感兴趣的可以看下源码
    用一个图表示Java8 ConcurrentHashMap的样子


    image.png

    总结

    通过分析源码对比了HashMap与ConcurrentHashMap的差别,以及Java7和Java8 上ConcurrentHashMap 设计的不同,当然还有很多坑没有填,比如其中调用了很多UNSAFE的CAS方法,可以减少性能上的消耗,平时很少用,了解的比较少;以及红黑树的具体原理和实现,后续慢慢填...

    相关文章

      网友评论

          本文标题:图解ConcurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/euddfftx.html