1 ConcurrentHashMap 机制
ConcurrentHashMap 在 1.7版本中采用分段锁机制实现线程安全的支持高并发的HashMap集合类。ConcurrentHashMap 在对象中保存了一个 Segment 数组,即将整个Hash 表划分为多个分段;而每个 Segment 元素,即每个分段则类似于一个Hashtable;这样,在执行 put 操作时首先根据 hash 算法定位到元素属于哪个 Segment,然后对该Segment 加锁即可。因此,ConcurrentHashMap 在多线程并发编程中可是实现多线程 put 操作。
- HashMap 线程不安全
多线程环境下,使用 Hashmap 进行 put 操作会引起死循环,所以在并发情况下不能使用HashMap。
- HashTable 线程安全但效率低
HashTable为保证线程安全付出的代价太大,get()、put()等方法都是 synchronized 的,这相当于给整个哈希表加了一把大锁。在并发调用HashTable的方法时就会造成大量的时间损耗。
2 构造方法
//初始化参数数值
static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 无参构造方法,调用本身的传递三个参数的构造方法
* 参数1:initialCapacity 初始化容量
* 参数2:loadFactor 加载因子
* 参数3:concurrencyLevel 默认并发度(用于构建Segment的数组长度,但必须保证2的幂次方,不一定是最终使用的数值)
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* 用于对 ConcurrentHashMap进行初始化的细节均在该构造方法中实现,
* 构造方法中会确认Segment数值的长度,每个Segment数值里面HashTable的长度,
* 并初始化首个s[0]的Segment元素。
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 判别传入的参数是否合法
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 判断默认级别是否超过允许的最大开辟长度,猜测防止无限扩容Segment长度导致oom
// => static final int MAX_SEGMENTS = 1 << 16;
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// 保存ssize是2的几次幂
int sshift = 0;
// 用于初始化segments大小
int ssize = 1;
// 循环遍历找到大于等于concurrencyLevel的最小2次幂,例如:concurrencyLevel = 17,则大于等于 17 的最小2次幂为 32
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 用于后续计算Segments数组下标,使用高位参与运算
this.segmentShift = 32 - sshift;
// 掩码,用于位操作计算下标
this.segmentMask = ssize - 1;
// 初始容量不能超过规定的最大值
// static final int MAXIMUM_CAPACITY = 1 << 30;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c 表示每个Segment元素中HashTable的大小
int c = initialCapacity / ssize;
// 确保总segment中table的容量大于等于initialCapacity
// 例如 initialCapacity = 17,ssize = 16 则 c = 1, 1 * 16 < 17,所以 ++c 才可以满足总容量的需求
if (c * ssize < initialCapacity)
++c;
// 实际 hashtable 使用的容量,确保为2的幂次,hashtable 最小为 2
// static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 如果容量不满足要求需要以2的幂次扩容,例如 cap = 2, c = 3 需要对 cap 扩容至 4
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 初始化index为0的segment,方便后面创建的segment对象可以直接使用这个segment的参数,不需要重复计算
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 根据 ssize 大小创建 Segments 数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 将 s0 放入 Segments 数组中
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
// 赋值
this.segments = ss;
}
3 put() 方法
/**
* put 方法为核心方法,是可以保证多线程安全的方法。
* 主要逻辑是首先查看是否对应下标位置存在 Segment 元素,
* 如果不存在,则进行线程安全的初始化;
* 如果存在,则调用该位置的 Segment 对象的 put 方法具体存放数据到 HashTable 中。
*/
public V put(K key, V value) {
Segment<K,V> s;
// 不支持 value == null 的情况
if (value == null)
throw new NullPointerException();
// 计算 key 的 hash 值
int hash = hash(key);
// 计算在 Segments 数组中的元素下标,使用hash值的高位参与运算获取下标位置
int j = (hash >>> segmentShift) & segmentMask;
// 如果下标位置不存在Segment元素,需要进行Segment对象的初始化,主要在 ensureSegment() 方法中实现
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// put 操作逻辑
return s.put(key, hash, value, false);
}
4 ensureSegment() 方法
/**
* ensureSegment() 方法主要负责对 Segment 对象的初始化,
* 涉及到并发安全的问题,主要采用 CAS 的方式进行无锁自旋。
*/
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// Segment元素下标
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 当前对应Segment元素为null,说明需要进行对象的初始化
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 直接使用Segment[0]的属性来初始化,不需要二次计算参数数据
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再检测一下是否存在,如果存在就不必进行new对象的操作逻辑,提升效率
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
// new Segment 对象
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// CAS 的方式将新创建的Segment对象添加到Segments数组中
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
// 返回新创建或者其他线程新创建的Segment对象
return seg;
}
5 Segment 的 put() 方法
/**
* Segment的put()方法实现具体将key插入到Segment本身的table中,插入前需要先加锁,
* 之后遍历插入位置的链表,遇到相同的元素则修改并返回旧值,
* 到尾节点依旧没有找到相同元素,则创建新的entry元素插入。
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 如果没有获取到锁,则阻塞在这一步
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
// 往下执行,代表已获取到锁
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 计算 key 应该在该 Segment 元素的 HashTable 插入的下标
int index = (tab.length - 1) & hash;
// 获取对应在segment中的头节点
HashEntry<K,V> first = entryAt(tab, index);
// 循环遍历头节点
for (HashEntry<K,V> e = first;;) {
// 头节点不为null
if (e != null) {
K k;
// 判断是否存在相同元素
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
// 旧值
oldValue = e.value;
// 覆盖旧值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
// 遍历链表下一个节点
e = e.next;
}
// 头节点为null
else {
// node已初始化
if (node != null)
// 直接使用 scanAndLockForPut() 方法得到的 node,这里 first 一定为null,感觉大哥李写成 null 更容易理解
node.setNext(first);
// 需要初始化node
else
node = new HashEntry<K,V>(hash, key, value, first);
// 长度+1
int c = count + 1;
// 判断是否需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
// 直接将 node 设置在tab的index上
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
// 返回旧值
return oldValue;
}
6 scanAndLockForPut() 方法
/**
* scanAndLockForPut() 方法主要是在不断 tryLock() 尝试加锁,每次尝试加锁失败后,可以额外做些判断逻辑。
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 获取在Segment对应位置的首节点
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
// 标志位,用于控制下面 if-else 的分支逻辑
int retries = -1; // negative while locating node
// 尝试加锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
// 循环遍历e,直到找到key值相等或为null
if (retries < 0) {
if (e == null) {
// 当前节点是否为null
if (node == null) // speculatively create node
// 初始化node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
// key值相等,则设置retries为0
else if (key.equals(e.key))
retries = 0;
// 继续找链表的下一个元素
else
e = e.next;
}
// 判断重试次数是否大于最大重试次数
// static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
else if (++retries > MAX_SCAN_RETRIES) {
// 直接加锁
lock();
break;
}
// 每隔2次判断,重现获取的首节点值与first不相等时,重置标志位。说明其他线程已经修改,需要重新获取最新的数据
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
7 rehash() 方法
/**
* rehash() 方法用于数组的扩容操作,首先将新数组容量定义为原数组的2倍,之后将旧元素计算hash值,插入到新数组中。再将新元素采用头插法插入新数组,最后完成数组的覆盖,完成扩容。
*/
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// Segment中的新hashtable数组容量扩容为原来的2倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 新创建一个两倍原容量的table数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
// 遍历原table
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
// 原数组下标位置有元素
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算在新数组的下标
int idx = e.hash & sizeMask;
// 当前列表只有个首节点,则直接赋值到newTable的index值上,由于调用该函数的上层已经加锁,所以直接操作即可,线程安全的
if (next == null) // Single node on list
newTable[idx] = e;
// 需要遍历链表
else { // Reuse consecutive sequence at same slot
// 此处逻辑是找到链表最后末尾连续几个元素,这些元素的hash相同,可以插入到新数组的相同下标位置
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 直接将找到的最后一段相同节点的首节点赋值
newTable[lastIdx] = lastRun;
// Clone remaining nodes
// 从头节点开始遍历lastRun的前一节点,一个一个的插入剩余元素
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 前面已经将原数组的旧元素添加到新数组,此处再将新元素插入
int nodeIndex = node.hash & sizeMask; // add the new node
// 头插法
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
// 赋值新数组
table = newTable;
}
网友评论