ConcurrentHashMap

作者: 清雨季 | 来源:发表于2019-07-03 15:55 被阅读0次

    一 1.7版本的实现方式

    • 采用分段锁机制,底层有两个对象,一个是Segment对象,另一个是HashEntry对象
    • 一个Segment就是一把锁,它继承自ReentrantLock,每个Segment锁住若干个桶(不一定是一个)。
    • 一个HashEntry是一个链表,里面包含Key-Value和next等信息。
    • 一个ConcurrentHashMap里包含一个HashEntry数组和一个Segment数组。

    二 ConcurrentHashMap的核心字段/内部类

    字段/类名 类型 含义 备注
    table Node[] 底层保存数据的数组 初始化时为null
    sizeCtl volatile int 用于控制初始化和扩容,处理并发问题 这个值为长度没有关系
    nextTable Node[] 通常情况下为null,扩容时使用 -
    ForwardingNode 内部类 - 在扩容时用,扩容后,需要把table中的数据转移到nextTable中,转移的过程中可能存在并发问题,如果table中某个元素的这个值,说明这个元素正在被迁移。
    • sizeCtl的几种值及意义:
    • -1:表示正在初始化
    • -N: 表示有N-1个线程在扩容
    • 其他情况下,如果table未被初始化,则表示初始化的长度,如果已经初始化,则表示table的容量。

    三 初始化过程

    ConcurrentHashMap的初始化没有初始化table,只是计算了sizeCtl:

    • 使用无参构造方法时,sizeCtl = 0,后续table初始化时使用默认容量16
    • 使用带有initialCapacity,sizeCtl是一个其乘以0.75表这个initialCapacity大的最小2的整数次幂。

    table的初始化放在第一次put的操作中,并且使用sizeCtl的原子操作来控制并发:

        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                  //do real initTable
                   break;
                }
            }
            return tab;
        }
    

    四 put操作

    4.1 hashCode到table下标的映射方式:

    h = key.hashCode();
    (h ^ (h >>> 16)) & HASH_BITS; //HASH_BITS=0x7fffffff
    index = h ^ (table.length - 1);
    

    4.2 put操作的并发控制

    分以下几种情况:

    4.2.1 table[index] == null
    说明这个节点重来未被插入过元素,直接CAS把这个节点值从null更新到新插入的值:

              if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
    

    4.2.2 table[index] instanceof ForwardingNode
    说明正在被扩容中,此时会去帮助一起扩容。

    扩容完成后会令table = nextTable,然后重新再执行一次插入操作

    4.2.3 table[index] != null
    也就是说table对应下标的已经有其他元素了,这时需要把新元素加到链尾

    如果链表长度>=8,则需要转换成红黑树

    如果已经是红黑树,则加入红黑树中

    这种情况下的并发控制:直接synchronized队头结点(或者红黑树的根结点)

     synchronized (f) {
                        if (tabAt(tab, i) == f) {
                        }
    }
    

    f即为table中对应下标的元素,f是在前面的判断中付值的,这里之所以要再判断if (tabAt(tab, i) == f) ,是因为这个位置的元素可能已经被修改了(例如其他线程扩容了)

    取table中的元素并不是采用table[index] 而是用UNSAFE.getObjectVolatile, 虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

    从网上查到的观点:volatile修饰数组时,只能保存数组引用的可见性,不能保存数组中元素的可见性,所以这里table虽然使用了volatile修饰,但是仍然不能保存table[index]可以拿到最新的值。

    五扩容操作

    参考文章https://www.jianshu.com/p/f6730d5784ad

    分为以下几步:

    • 第一个触发扩容的线程初始化nextTable数组,长度为原数组的两倍。同时使用CAS sizeCtl的值以确保nextTable只会初始化一次。
    • 初始化完成后开始迁移数据,从旧数组的最后一个元素开始迁移。这个过程可以多个线程一起执行。
    • 开始迁移之前,要先判断旧数组中的元素,如果为null,则直接跳过,如果为ForwardingNode则说明有其他线程正在迁移。
    • 否则说明该位置节点没有迁移,CAS操作把该位置节点设置成ForwardingNode,表明当前线程拿到了这个位置的迁移权限。
    • 开始迁移,遍历该链的所有节点,使用节点的hashCode & n 把节点分成两类。把结果为1的和结果为0的分别做成hn和ln两个链表,把ln放在nextTable的旧位置上(原来是第5个桶就还放在第5个桶上),把hn向后移动n个桶(就是放在nextTable的5+n的位置上)

    可以看出来,扩容操作的并发控制分为两点:

    • 一是使用sizeCtl的CAS操作保证nextTable只会初始化一次
    • 二是使用把旧数组的元素CAS为ForwardingNode的方式来保证一个桶只会被一个线程迁移。

    六 如何统计数量

    Map有个size()方法会返回当前的数量,在HashMap中直接使用一个size类型记录。但是在ConcurrentHashMap中要考虑高并发的情况,首先肯定不能像HashMap一样用一个普通的int字段统计。

    可以使用一个原子类来记录,或者普通int字段加CAS操作,但是这样在高并发的情况下乐观锁的竞争压力太大,性能不行。

    ConcurrentHashMap在低并发的情况下就是这种方式,使用一个volatile long型的baseCount的CAS操作来计数。但是在高并发的情况下(如果baseCount的CAS操作出现竞争,就会转用高并发的方式)会有不同的方案。

    在高并发时ConcurrentHashMap使用了一组计数器来计数,每个线程第一次操作计数时都会随机选定一个计数器,然后使用这个计数器进行CAS操作。该线程以后再次操作计数时,都会使用同一个计数器(这是为了减少CAS的竞争)。

    计数器数组及里面的计数器的初始化都需要做并发控制,确保只初始化一次,ConcurrentHashMap中使用cellsBusy来控制并发

    相关文章

      网友评论

        本文标题:ConcurrentHashMap

        本文链接:https://www.haomeiwen.com/subject/glbfhctx.html