concurrenthashmap不允许key值为null,在put的时候会校验如果为null则抛异常,原因是无法分辨是key没找到的null还是有key值为null,这在多线程里面是模糊不清的
构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并发度控制,最大是65535
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 等于ssize从1向左移位的 次数
int sshift = 0;
int ssize = 1;
// 找出最接近concurrencyLevel的2的n次幂的数值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 这里之所 以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
this.segmentShift = 32 - sshift;
// 散列运算的掩码,等于ssize减1
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 里HashEntry数组的长度度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 保证HashEntry数组大小一定是2的n次幂
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 初始化Segment数组,并实际只填充Segment数组的第0个元素。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
- 确认ConcurrentHashMap的并发度,也就是Segment数组长度,并保证它是2的n次幂 16的并发度,取高4位
- 确认HashEntry数组的初始化长度,并保证它是2的n次幂(hash & (n - 1)) 1111减少hash碰撞
- 将Segment数组初始化好并且只填充第0个元素
put 流程
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 先获取key的hash值
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 2. 定位到Segment
s = ensureSegment(j);
// 3.调用Segment的put方法
return s.put(key, hash, value, false);
}
Segment.put() 方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 1. 加锁
HashEntry<K,V> node = tryLock() ? null :
// scanAndLockForPut在没有获取到锁的情况下,去查询key是否存在,如果不存在就新建一个Node
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 确定元素在tabl数组上的位置
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 如果原来位置上有值并且key相同,那么直接替换原来的value
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
// 元素总数加一
int c = count + 1;
// 判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
- 先获取key的hash值
- 定位到Segment
- 调用Segment的put方法
- 加锁
- 定位key在tabl数组上的索引位置index,获取到头结点
- 判断是否有hash冲突
- 如果没有冲突直接将新节点node添加到数组index索引位
- 如果有冲突,先判断是否有相同key
- 有相同key直接替换对应node的value值
- 没有添加新元素到链表尾部
- 解锁
rehash
private void rehash(HashEntry<K,V> node) {
// 复制老数组
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// table数组扩容2倍
int newCapacity = oldCapacity << 1;
// 扩容阈值也增加两倍
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 计算新的掩码
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算新的索引位
int idx = e.hash & sizeMask;
// 转移数据
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新的节点加到对应索引位
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
- 新建扩容后的数组,容量是原来的两倍
- 遍历扩容前的数组
- 通过e.hash & sizeMask;计算key新的索引位
- 转移数据
- 将扩容后的数组指向成员变量table
get() 方法
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// 计算出Segment的索引位
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 以原子的方式获取Segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 原子方式获取HashEntry
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// key相同
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
// value是volatile所以可以不加锁直接取值返回
return e.value;
}
}
return null;
}
- 我们可以看到get方法是没有加锁的,因为HashEntry的value和next属性是volatile的,volatile直接保证了可见性,所以读的时候可以不加锁。
size() 方法
- size的核心思想是先进性两次不加锁统计,如果两次的值一样则直接返回,否则第三个统计的时候会将所有segment全部锁定,再进行size统计,所以size()尽量少用。因为这是在并发情况下,size其他线程也会改变size大小,所以size()的返回值只能表示当前线程、当时的一个状态,可以算其实是一个预估值
参考文献
网友评论