首先上UML 类图:
整个 ConcurrentHashMap 由Segment数组 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。
在我的理解看来,这个类似于分库分表中的水平切分,把一个大的hashtable 根据hash值来切分成一个个小的hashtable ,然后每个hashtable采用一个表锁。
而且小hashtable至少是2个bucket MIN_SEGMENT_TABLE_CAPACITY=2
结构如下图:
image.png
初始化过程:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 找到2的倍数大于等于concurrencyLevel,假如concurrencyLevel=16
int sshift = 0;
int ssize = 1; //segement的大小
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//sshift =4; ssize =16;
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
//假如把一个bucket看做hashtable中的一行,那么initialCapacity是分表中所有行的总行数。
//c是每个分表能获取几行。
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 初始化,第一个分表, cap =2
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//注意,这里不是TBASE ,应该是数组的基地址
/**
* 设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者
* 有延迟的<code>putObjectVolatile</cdoe>方法,并且不保证值的改变被其他线程立
* 即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候
* 使用才有用。
* public native void putOrderedObject(Object obj, long offset, Object value);
* @https://www.cnblogs.com/mickole/articles/3757278.html Unsafe 的API
*/
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
上面的代码中涉及到几个final变量。在用Unsafe 直接操作内存的方法中,大量被用到。
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
private static final long HASHSEED_OFFSET;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
//hashtable的基地址
TBASE = UNSAFE.arrayBaseOffset(tc);
//segement数组的基地址,也就是segement[0]的地址
SBASE = UNSAFE.arrayBaseOffset(sc);
//table scale hashtable的单位偏移量
ts = UNSAFE.arrayIndexScale(tc);
// segement scale segement的单位偏移量
ss = UNSAFE.arrayIndexScale(sc);
HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
ConcurrentHashMap.class.getDeclaredField("hashSeed"));
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
/**
* Integer.numberOfLeadingZeros(ss) 找到变量ss前面0的个数,比如说
* ss =4 表示换成二进制就是)0x0000-0000-0000-0000-0000-0000-0000-0100
* Integer.numberOfLeadingZeros(ss) = 29
* 32 - 29-1(0100中的1 需要减去) =2
*
* 在get方法中,hash的高4位,取模,确定index索引,然后本来应该是索引乘以segementScale 也就是下面的ss变量=4 是4个字节。
* 但是这里直接采用左移来代替乘法,会更快。左移两位代表乘以4.最后加上segement的基地址
* long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
*/
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
- //hashtable的基地址 TBASE = UNSAFE.arrayBaseOffset(tc);
- //segement数组的基地址,也就是segement[0]的地址 SBASE = UNSAFE.arrayBaseOffset(sc);
- //table scale hashtable的单位偏移量 ts = UNSAFE.arrayIndexScale(tc);
- // segement scale segement的单位偏移量 ss = UNSAFE.arrayIndexScale(sc);
- SSHIFT index索引<< SSHIFT +SBASE 获得某一个segement的地址
- TSHIFT 同理,((long)i << TSHIFT) + TBASE,分表hashtable内部的HashEntry的定位。
另外,Unsafe类是直接操作内存的一个工具,但是用的不好的话,是会造成数据不一致性。少用为妙。附上介绍其API的一片文章:https://www.cnblogs.com/mickole/articles/3757278.html
put 过程分析(分段锁如何实现)
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // segments不是volatile的,所以check一下
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); //ensureSegment这个方法是初始化segement对象
return s.put(key, hash, value, false);
}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循环获取锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 进到这里说明数组该位置的链表是空的,没有任何元素
// 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 顺着链表往下走
e = e.next;
}
// 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁
// lock() 是阻塞方法,直到获取锁后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 这个时候是有大问题了,那就是有新的元素进到了链表,成为了新的表头
// 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
这个方法有两个出口,一个是 tryLock() 成功了,循环终止,另一个就是重试次数超过了 MAX_SCAN_RETRIES,进到 lock() 方法,此方法会阻塞等待,直到成功拿到独占锁。
这个方法就是看似复杂,但是其实就是做了一件事,那就是获取该 segment 的独占锁,如果需要的话顺便实例化了一下 node。
···
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置,
// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 该位置处只有一个元素,那比较好办
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是链表表头
HashEntry<K,V> lastRun = e;
// idx 是当前链表的头结点 e 的新位置
int lastIdx = idx;
// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
newTable[lastIdx] = lastRun;
// 下面的操作是处理 lastRun 之前的节点,
// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
···
这里的扩容比之前的 HashMap 要复杂一些,代码难懂一点。上面有两个挨着的 for 循环,第一个 for 有什么用呢?
仔细一看发现,如果没有第一个 for 循环,也是可以工作的,但是,这个 for 循环下来,如果 lastRun 的后面还有比较多的节点,那么这次就是值得的。因为我们只需要克隆 lastRun 前面的节点,后面的一串节点跟着 lastRun 走就是了,不需要做任何操作。
我觉得 Doug Lea 的这个想法也是挺有意思的,不过比较坏的情况就是每次 lastRun 都是链表的最后一个元素或者很靠后的元素,那么这次遍历就有点浪费了。不过 Doug Lea 也说了,根据统计,如果使用默认的阈值,大约只有 1/6 的节点需要克隆。
get 过程分析
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根据 hash 找到对应的 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 内部数组相应位置的链表,遍历
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
网友评论