数据结构
ConcurrentHashMap 实现并发操作的原理
使用了锁分段技术:ConcurrentHashMap持有一组锁(segment[]),并将数据尽可能分散在不同的锁段中(即,每个锁只会控制部分的数据HashEntry[])。这样如果写操作的数据分布在不同的锁中,那么写操作将可并行操作。因此来实现一定数量(即,锁数量)并发线程修改。
同时通过Unsafe.putOrderedObject、UNSAFE.getObjectVolatile(👈这两个方法很重要,下文会介绍)来操作segment[]、HashEntry[]的元素使得在提升了性能的情况下在并发环境下依旧能获取到最新的数据,同时HashEntry的value为volatile属性,从而实现不加锁的进行并发的读操作,并且对该并发量并无限制。
注意,中不使用volatile的属性来实现segment[]和HashEntry[]在多线程间的可见性。因为如果是修改操作,则在释放锁的时候就会将当前线程缓存中的数据写到主存中,所以就无需在修改操作的过程中因修改volatile属性字段而频繁的写线程内存数据到主存中。
源码解析
重要属性
//散列映射表的默认初始容量为 16。
static final int DEFAULT_INITIAL_CAPACITY = 16;
//散列映射表的默认装载因子为 0.75,用于表示segment中包含的HashEntry元素的个数与HashEntry[]数组长度的比值。当某个segment中包含的HashEntry元素的个数超过了HashEntry[]数组的长度与装载因子的乘积时,将触发扩容操作。
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//散列表的默认并发级别为 16。该值表示segment[]数组的长度,也就是锁的总数。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//散列表的最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//segment中HashEntry[]数组最小长度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//散列表的最大段数,也就是segment[]数组的最大长度
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
//在执行size()和containsValue(value)操作时,ConcurrentHashMap的做法是先尝试 RETRIES_BEFORE_LOCK 次( 即,2次 )通过不锁住segment的方式来统计、查询各个segment,如果2次执行过程中,容器的modCount发生了变化,则再采用加锁的方式来操作所有的segment
static final int RETRIES_BEFORE_LOCK = 2;
//segmentMask用于定位segment在segment[]数组中的位置。segmentMask是与运算的掩码,等于segment[]数组size减1,掩码的二进制各个位的值都是1( 因为,数组长度总是2^N )。
final int segmentMask;
//segmentShift用于决定H(key)参与segmentMask与运算的位数(高位),这里指在从segment[]数组定位segment:通过key的哈希结果的高位与segmentMask进行与运算哈希的结果。(详见下面举例)
final int segmentShift;
//Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。
final ConcurrentHashMap.Segment<K, V>[] segments;
重要对象
- ConcurrentHashMap.Segment
Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色,并且是一个可重入锁。每个 Segment 对象维护其包含的若干个桶(即,HashEntry[])。
//最大自旋次数,若是单核则为1,多核则为64。该字段用于scanAndLockForPut、scanAndLock方法
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* table 是由 HashEntry 对象组成的数组
* 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
* table 数组的元素代表散列映射表的一个桶
* 每个 table 守护整个 ConcurrentHashMap 数据总数的一部分
* 如果并发级别为 16,table 则维护 ConcurrentHashMap 数据总数的 1/16
*/
transient volatile HashEntry<K,V>[] table;
//segment中HashEntry的总数。 PS:注意JDK 7中该字段不是volatile的
transient int count;
//segment中数据被更新的次数
transient int modCount;
//当table中包含的HashEntry元素的个数超过本变量值时,触发table的扩容
transient int threshold;
//装载因子
final float loadFactor;
- ConcurrentHashMap.HashEntry
HashEntry封装了key-value对,是一个单向链表结构,每个HashEntry节点都维护着next HashEntry节点的引用。
static final class HashEntry<K,V>
final int hash;
final K key;
volatile V value;
//HashEntry链表中的下一个entry。PS:JDK 7中该字段不是final的,意味着该字段可修改,而且也确实在remove方法中对该地段进行了修改
volatile HashEntry<K,V> next;
构造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
a) 限制并发等级最大为MAX_SEGMENTS,即2^16。
b) 计算真实的并发等级ssize,必须是2的N次方,即 ssize( actual_concurrency_level ) >= concurrency_level。
举例:concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。
Q:为什么数组的长度都需要设计成2^N次方了?
A:这是因为元素在数组中的定位主要是通过H(key) & (数组长度 - 1)方式实现的,这样我们称(数组长度 - 1)为element_mask。那么假设有一个长度为16和长度为15的数组,他们element_mask分别为15和14。即array_16_element_mask = 15(二进制”1111”);array_15_element_mask = 14(二进制”1110”)。你会发现所以和”1110”进行与操作结果的最后一位都是0,这就导致数组的’0001’、’0011’、’1001’、’0101’、’1101’、’0111’位置都无法存放数据,这就导致了数组空间的浪费,以及数据没有得到更好的分散。而使用array_16_element_mask = 15(二进制”1111”)则不会有该问题,数据可以分散到数组个每个索引位置。
c) sshift表示在通过H(key)来定位segment的index时,参与到segmentMask掩码与运算的H(key)高位位数。
d) 计算每个Segment中HashEntry[]数组的长度,根据数据均匀分配到各个segment的HashEntry[]中,并且数组长度必须是2的N次方的思路来获取。注意,HashEntry[]数组的长度最小为2。
e) 创建一个Segment对象,将新建的Segment对象放入Segment[]数组中index为0的位置。这里只先构建了Segnemt[]数组的一个元素,则其他index的元素在被使用到时通过ensureSegment(index)方法来构建。
重要方法
- segment的定位
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
通过key的二次哈希运算后再进行移位和与运算得到key在segment[]数组中所对应的segment
a) hash(key)
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
👆这里之所以需要将key.hashCode再进行一次hash计算,是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。
b) 取hash(key)结果的(32 - segmentShift)位数的高位和segmentMask掩码进行与运算。(其实,与运算时,就是“hash(key)的高segmentMask(十进制值)位"于“segmentMask的二进制值”进行与操作,此时进行与操作的两个数的有效二进制位数是一样的了。)
c) 将b)的结果j进行 (j << SSHIFT) + SBASE 以得到key在segement[]数组中的位置
举例:假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。
0100 | 0111 | 0110 | 0111 | 1101 | 1010 | 0100 | 1110 |
---|---|---|---|---|---|---|---|
1111 | 0111 | 0100 | 0011 | 0000 | 0001 | 1011 | 1000 |
0111 | 0111 | 0110 | 1001 | 0100 | 0110 | 0011 | 1110 |
1000 | 0011 | 0000 | 0000 | 1100 | 1000 | 0001 | 1010 |
可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。
- HashEntry定位
int h = hash(key);
((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
主要通过对key进行二次hash运算,再讲哈希结果和HashEntry[]的长度掩码进行与运算得到key所对应的HashEntry在数组中的索引。HashEntry的定位和Segment的定位方式很像,但是HashEntry少了将hash(key)的结果进行掩码取高位后再与数组长度与操作,而是直接将hash(key)的结果和数组长度的掩码进行与操作。其目的是避免两次哈希后的值一样,导致元素虽然在Segment里散列开了,但是却没有在HashEntry里散列开( 也就是说,如果Segment和HashEntry的定位方式一样,那么到同一个Segment的key都会落到该segment中的同一个HashEntry了 )。
-
Unsafe类中的putOrderedObject、getObjectVolatile方法
getObjectVolatile:使用volatile读的语义获取数据,也就是通过getObjectVolatile获取数据时回去主存中获取最新的数据放到线程的缓存中,这能保证正确的获取最新的数据。
putOrderedObject:为了控制特定条件下的指令重排序和内存可见性问题,Java编译器使用一种叫内存屏障(Memory Barrier,或叫做内存栅栏,Memory Fence)的CPU指令来禁止指令重排序。java中volatile写入使用了内存屏障中的LoadStore屏障规则,对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。volatile的写所插入的storeLoad是一个耗时的操作,因此出现了一个对volatile写的升级版本,利用lazySet方法进行性能优化,在实现上对volatile的写只会在之前插入StoreStore屏障,对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见,也就是按顺序的写入。UNSAFE.putOrderedObject正是提供了这样的语义,避免了写写指令重排序,但不保证内存可见性,因此读取时需借助volatile读保证可见性。 -
ensureSegment(k)
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
根据计算得到的index从segment[]数组中获取segment,如果segment不存在,则创建一个segment并通过CAS算法放入segment[]数组中。这里的获取和插入分别通过UNSAGE.getObjectVolatile(👈保证获取segment[]最新数据)和UNSAFE.cmpareAndSwapObject(👈保证原子性的将新建的segment插入segment[]数组,并使其他线程可见)实现,并不直接对segment[]数组操作。
- HashEntry<K,V> scanAndLockForPut(K key, int hash, V value)
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在put操作尝试加锁没成功时,不是直接进入等待状态,而是调用👆scanAndLockForPut()方法,该方法实现了:
a) 首次进入该方法,重试次数retries初始值为-1。
b) 若retries为-1,则判断查询key对应的HashEntry节点链中是否已经存在了该节点,如果还没则预先创建一个新节点。然后将retries=0;
c) 然后尝试MAX_SCAN_RETRIES次获取锁( 自旋锁 ),如果依旧没能成功获得锁,则进入等待状态(互斥锁)。
JDK7尝试使用自旋锁来提升性能,好处在于:自旋锁当前的线程不会挂起,而是一直处于running状态,这样一旦能够获得锁时就key在不进行上下文切换的情况下获取到锁。
d) 如果在尝试MAX_SCAN_RETRIES次获取锁的过程,key对应的entry发送了变化,则将尝试次数重置为-1,从第b)步骤重新开始
- void scanAndLock(Object key, int hash)
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
在replace、remove操作尝试加锁没成功时,不是直接进入等待状态,而是调用👆scanAndLock()方法。该方法是实现和scanAndLockForPut()差不了多少,主要的区别在于scanAndLockForPut()方法在key对应entry不存在时是不会去创建一个HashEntry对象的。
- V get(Object key)
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在JDK 7中get的实现原理已经和JDK 6不同了,JDK 6通过volatile实现多线程间内存的可见性。而JDK 7为了提升性能,用UNSAFE.getObjectVolatile(...)来获取segment[]数组和HashEntry[]数组中对应index的最新值。同时值得说明的是,当volatile引用一个数组时,数组中的元素是不具有volatile特性的,所以,也需要通过UNSAFE.getObjectVolatile(…)来获取数组中真实的数据。
- put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
a) 通过key算出对应的segment在segment[]中的位置,如果对应的segment不存在,则创建。
b) 将key、value插入到segment中对应的HashEntry中
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
a) 尝试获得锁,如果失败,则调用scanAndLockForPut(...)通过自旋等待的方式获得锁。注意,这里锁操作锁的只是当前这个segment,而不会影响segment[]数组中其他的segment对象的写操作。这是ConcurrentHashMap实现并发写操作的精髓所在。通过分段锁来支持一定并发量的写操作,并通过volatile以及UNSAFE.getObjectVolatile、UNSAFE.putOrderedObject来实现不加锁的读操作,也就是支持任何并发量的读操作。
b) 计算key应插入的HashEntry在HashEntry[]数组的index,并通过UNSAFE.getObjectVolatile(...)方式获取最新的到HashEntry对象
c) 判断HashEntry链中是否已经存在该key了,如果存在则将key的value替换成新值,并将modCount加1
d) 如果HashEntry链中不存在该key,则将key-value出入到HashEntry链头处,并将count加1,但此时count还未更新到segment中。
e) 如果在count加1后发现目前HashEntry链表长度以及达到了阈值并且HashEntry的链表长度小于限制的最大长度,则会进行HashEntry的扩容操作。注意,在JDK 7中是确定当前put操作是会加入一个新节点情况下才会触发扩容操作,而在JDK 6中,可能存在put操作只是替换一个已经存在的key的value值的情况下也会触发扩容操作。
f) 如果count加1未触发阈值,则通过UNSAFE.putOrderedObject(…)方式将最新的HashEntry更新到HashEntry[]数组中。
g) 更新segment中的modCount、count值
h) 释放锁。释放锁的操作会将当前线程缓存里的数据写到主存中。
- rehash(HashEntry<K,V> node)
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
当HashEntry的数量达到阈值时就会触发HashEntry[]数组的扩容操作
a) 创建new HashEntry[]数组,new HashEntry[]数组的容量为old HashEntry的2倍
b) 设置新的阈值
c) 将old HashEntry[]数组中的内容放入new HashEntry[]中,这并不是盲目的将元素一一取出然后计算元素在new HashEntry的位置,然后插入。这里Doug Lea做了一些优化。
-
如果old HashEntry[]数组的元素HashEntry链表,若该HashEntry链表的头节点不存在next节点,即说明该HashEntry链表是个单节点,则直接将HashEntry插入到new HashEntry[]数组对应的位置中。
-
因为new HashEntry[]的length是old HashEntry[]的2倍,所以对应的new sizeMask比old sizeMask多了old HashEntry[] length的大小( 比如,old_HashEntry_array_length为8,则old sizeMask为’0000 0111’;new_HashEntry_array_length为16,则new sizeMask为’0000 1111’)。所以元素在new HashEntry[]的new index要么和old index一样,要么就是old_index + old_HashEntry_array_length。因此我们可通过对节点的复用来减少不必要的节点创建,通过计算每个HashEntry链表中每个entry的new index值,如果存在从某个entry开始到该HashEntry链表末尾的所有entrys,它们的new index值都一样,那么就该entry直接插入到new HashEntry[newIndex]中,当然最坏的请求就是该entry就是HashEntry链的最后一个entry。然后只需重建HashEntry中该entry之前的到链表头的entry节点,分别将新构建的entry插入到new HashEntry[]中。
再者,经统计,在使用默认阈值的情况下,一般只有1/6的节点需要重新构建 -
最后将当前操作新构建的节点加入到new HashEntry[]数组中
d) old HashEntry如果没有其他读线程操作引用时,将会尽快被垃圾回收。
e) 扩容操作因为要重新构建正整个HashEntry[]数组,所以不需要通过UNSAFE.putOrderedObject(...)方式将元素插入一个已经存在的HashEntry[]中,而是直接通过索引操作插入到new HashEntry[]数组就好,最后我们会将new HashEntry[]直接赋值给volatile tables字段,这样就可以保证new HashEntry[]对其他线程可见了 -
remove操作
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
a) 根据key计算出该key对应的segment在segment[]数组中的index,并获取该segment。
b) 将key从该segment中移除
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
a) 尝试获得锁,如果失败则调用scanAndLock(...)通过自旋等待的方式获得锁。
b) 获取key锁对应的HashEntry链表,并在该HashEntry中找到key对应entry节点
c) 如果key对应的节点是在HashEntry链表头,则直接将key的next节点通过UNSAFE.putOrderedObject的方式这是为对HashEntry[]数组中对应的位置,即使得next节点称为成为链表头。
d) 如果key不是HashEntry的链表头节点,则将key的前一个节点的next节点修改为key的next节点。额,这么说太绕了,举个例子吧~
key对应的节点:current_HashEntry;current_HashEntry的前一个节点:pre_HashEntry;current_HashEntry的下一个节点:next_HashEntry
删除前:
pre_HashEntry.next ——> current_HashEntry
current_HashEntry.next ——> next_HashEntry
删除后:
pre_HashEntry.next ——> next_HashEntry
e) 修改segment属性:modCount加1,count减1
f) 释放锁
- size()
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
a) 会先尝试RETRIES_BEFORE_LOCK次( 即2次 )不加锁的情况下,将segment[]数组中每个segment的count累加,同时也会将每个segment的modCount进行累加。如果两次不加锁的操作后,modCountSum值是一样的,这说明在这两次累加segmentcount的过程中ConcurrentHashMap没有发生结构性变化,那么就直接返回累加的count值
b) 如果在两次累加segment的count操作期间ConcurrentHashMap发生了结构性改变,则会通过将所有的segment都加锁,然后重新进行count的累加操作。在完成count的累加操作后,释放所有的锁。最后返回累加的count值。
c) 注意,如果累加的count值大于了Integer.MAX_VALUE,则返回Integer.MAX_VALUE。
弱一致性
相对于HashMap的fast-fail,ConcurrentHashMap的迭代器并不会抛出ConcurrentModificationException异常。这是由于ConcurrentHashMap的读行为是弱一致性的。
也就是说,在同时对一个segment进行读线程和写线程操作时,并不保证写操作的行为能被并行允许的读线程所感知。
比如,当一个写线程和读线程并发的对同一个key进行操作时:写线程在操作一个put操作,如果这个时候put的是一个已经存在的key值,则会替换该key对应的value值,因为value是volatile属性的,所以该替换操作时能立即被读线程感知的。但如果此时的put是要新插入一个entry,则存在两种情况:①在写线程通过UNSAFE.putOrderedObject方式将新entry插入到HashEntry链表后,读线程才通过UNSAFE.getObjectVolatile来获取对应的HashEntry链表,那么这个时候读线程是能够获取到这个新插入的entry的;②反之,如果读线程的UNSAFE.getObjectVolatile操作在写线程的UNSAFE.putOrderedObject之前,则就无法感知到这个新加入的entry了。
其实在大多数并发的业务逻辑下,我们是允许这样的弱一致性存在的。如果你的业务逻辑不允许这样的弱一致性存在的,你可以考虑对segment中的HashEntry链表的读操作加锁,或者将segment改造成读写锁模式。但这都将大大降低ConcurrentHashMap的性能并且使得你的程序变得复杂且难以维护。或许你该考虑使用其他的存储模型代替ConcurrentHashMap。
后记
虽然JDK 8已经出来很久了,但是我还是花了很多时间在JDK 7的ConcurrentHashMap上,一个很重要的原因是,我认为ConcurrentHashMap在并发模式下的设计思想是很值得我们深究和学习的,无论是jdk7相对于jdk6的各种细节和性能上的优化,还是jdk8的大改造都是对并发编程各种模式很好的学习。文章还有很多可以继续深入挖掘的点,希望在后期的学习中能够继续完善~
参考
http://www.infoq.com/cn/articles/ConcurrentHashMap/
http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html
https://my.oschina.net/7001/blog/896587
网友评论