一 1.7版本的实现方式
- 采用分段锁机制,底层有两个对象,一个是Segment对象,另一个是HashEntry对象
- 一个Segment就是一把锁,它继承自ReentrantLock,每个Segment锁住若干个桶(不一定是一个)。
- 一个HashEntry是一个链表,里面包含Key-Value和next等信息。
- 一个ConcurrentHashMap里包含一个HashEntry数组和一个Segment数组。
二 ConcurrentHashMap的核心字段/内部类
字段/类名 | 类型 | 含义 | 备注 |
---|---|---|---|
table | Node[] | 底层保存数据的数组 | 初始化时为null |
sizeCtl | volatile int | 用于控制初始化和扩容,处理并发问题 | 这个值为长度没有关系 |
nextTable | Node[] | 通常情况下为null,扩容时使用 | - |
ForwardingNode | 内部类 | - | 在扩容时用,扩容后,需要把table中的数据转移到nextTable中,转移的过程中可能存在并发问题,如果table中某个元素的这个值,说明这个元素正在被迁移。 |
- sizeCtl的几种值及意义:
- -1:表示正在初始化
- -N: 表示有N-1个线程在扩容
- 其他情况下,如果table未被初始化,则表示初始化的长度,如果已经初始化,则表示table的容量。
三 初始化过程
ConcurrentHashMap的初始化没有初始化table,只是计算了sizeCtl:
- 使用无参构造方法时,sizeCtl = 0,后续table初始化时使用默认容量16
- 使用带有initialCapacity,sizeCtl是一个其乘以0.75表这个initialCapacity大的最小2的整数次幂。
table的初始化放在第一次put的操作中,并且使用sizeCtl的原子操作来控制并发:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//do real initTable
break;
}
}
return tab;
}
四 put操作
4.1 hashCode到table下标的映射方式:
h = key.hashCode();
(h ^ (h >>> 16)) & HASH_BITS; //HASH_BITS=0x7fffffff
index = h ^ (table.length - 1);
4.2 put操作的并发控制
分以下几种情况:
4.2.1 table[index] == null
说明这个节点重来未被插入过元素,直接CAS把这个节点值从null更新到新插入的值:
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
4.2.2 table[index] instanceof ForwardingNode
说明正在被扩容中,此时会去帮助一起扩容。
扩容完成后会令table = nextTable,然后重新再执行一次插入操作
4.2.3 table[index] != null
也就是说table对应下标的已经有其他元素了,这时需要把新元素加到链尾
如果链表长度>=8,则需要转换成红黑树
如果已经是红黑树,则加入红黑树中
这种情况下的并发控制:直接synchronized队头结点(或者红黑树的根结点)
synchronized (f) {
if (tabAt(tab, i) == f) {
}
}
f即为table中对应下标的元素,f是在前面的判断中付值的,这里之所以要再判断if (tabAt(tab, i) == f) ,是因为这个位置的元素可能已经被修改了(例如其他线程扩容了)
取table中的元素并不是采用table[index] 而是用UNSAFE.getObjectVolatile, 虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
从网上查到的观点:volatile修饰数组时,只能保存数组引用的可见性,不能保存数组中元素的可见性,所以这里table虽然使用了volatile修饰,但是仍然不能保存table[index]可以拿到最新的值。
五扩容操作
参考文章https://www.jianshu.com/p/f6730d5784ad
分为以下几步:
- 第一个触发扩容的线程初始化nextTable数组,长度为原数组的两倍。同时使用CAS sizeCtl的值以确保nextTable只会初始化一次。
- 初始化完成后开始迁移数据,从旧数组的最后一个元素开始迁移。这个过程可以多个线程一起执行。
- 开始迁移之前,要先判断旧数组中的元素,如果为null,则直接跳过,如果为ForwardingNode则说明有其他线程正在迁移。
- 否则说明该位置节点没有迁移,CAS操作把该位置节点设置成ForwardingNode,表明当前线程拿到了这个位置的迁移权限。
- 开始迁移,遍历该链的所有节点,使用节点的hashCode & n 把节点分成两类。把结果为1的和结果为0的分别做成hn和ln两个链表,把ln放在nextTable的旧位置上(原来是第5个桶就还放在第5个桶上),把hn向后移动n个桶(就是放在nextTable的5+n的位置上)
可以看出来,扩容操作的并发控制分为两点:
- 一是使用sizeCtl的CAS操作保证nextTable只会初始化一次
- 二是使用把旧数组的元素CAS为ForwardingNode的方式来保证一个桶只会被一个线程迁移。
六 如何统计数量
Map有个size()方法会返回当前的数量,在HashMap中直接使用一个size类型记录。但是在ConcurrentHashMap中要考虑高并发的情况,首先肯定不能像HashMap一样用一个普通的int字段统计。
可以使用一个原子类来记录,或者普通int字段加CAS操作,但是这样在高并发的情况下乐观锁的竞争压力太大,性能不行。
ConcurrentHashMap在低并发的情况下就是这种方式,使用一个volatile long型的baseCount的CAS操作来计数。但是在高并发的情况下(如果baseCount的CAS操作出现竞争,就会转用高并发的方式)会有不同的方案。
在高并发时ConcurrentHashMap使用了一组计数器来计数,每个线程第一次操作计数时都会随机选定一个计数器,然后使用这个计数器进行CAS操作。该线程以后再次操作计数时,都会使用同一个计数器(这是为了减少CAS的竞争)。
计数器数组及里面的计数器的初始化都需要做并发控制,确保只初始化一次,ConcurrentHashMap中使用cellsBusy来控制并发
网友评论