前言
之前我们了解了 HashMap(HashMap 基于 JDK 1.7 源码解析),知道其是非线程安全的,当我们只有一个线程在使用 HashMap 的时候,自然不会有问题,但是并不适用于多线程情况,可以通过 Collections.synchronizedMap() 方法或者替换为 Hashtable,但是这两种方式基本都是对整个表结构做锁定操作,性能不是很理想,这时我们可以使用 ConcurrentHashMap,需要注意的是,ConcurrentHashMap 也分 1.7,1.8版本,两者实现上略有不同。
1.7 版本
数据结构
ConcurrentHashMap 数据结构为一个 Segement 数组,Segment 的数据结构为 HashEntry 数组,而 HashEntry 存储的是键值对,可以构成链表。如下图所示:
Segment 和 HashEntry
Segment 类继承 ReentrantLock 类,从而使 Segment 对象能充当锁的角色,不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上每当一个线程占用锁访问一个 Segment 时,不会影响其他的 Segment,这就是 ConcurrentHashMap 的锁分段技术。Segment 源码如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 在本 segment 范围内,包含的 HashEntry 元素的个数
* 该变量被声明为 volatile 型,保证可见性,即每次读到都是
* 最新数据
*/
transient volatile int count;
/**
* table 被更新的次数
*/
transient int modCount;
/**
* 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
*/
transient int threshold;
/**
* table 是由 HashEntry 对象组成的数组
* 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
* table 数组的数组成员代表散列映射表的一个桶
* 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
* 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16
*/
transient volatile HashEntry<K,V>[] table;
/**
* 负载因子
*/
final float loadFactor;
}
其中的 HashEntry:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
HashEntry 几乎是不可变的,代表的是每个 hash 链中的一个节点。可以看到除了 value 其余都是 fianl 的,这意味着不能从 hash 链的中间或尾部添加加或者删除节点,因为这需要修改 next 引用值,所有的节点的修改值只能从头部开始。
put 方法
代码如下:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
通过 key 找到 Segment,再在 Segment 中执行 put 操作。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue; // 旧值
try {
HashEntry<K,V>[] tab = table; // table 数组
int index = (tab.length - 1) & hash; //找到在 table 中的索引
HashEntry<K,V> first = entryAt(tab, index); // 根据索引获取到 HashEntry
// 遍历 HashEntry 链表
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// key 存在,更新值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); //扩容
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock(); // 释放锁
}
return oldValue;
}
由于 HashEntry 不能保证原子性(volatile 只能保证可见性),因此要对 put 操作要做加锁处理。
get 方法
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get 逻辑比较简单,如下:
- 根据 key 值计算 hash 定位到具体的 Segment。
- 再通过一次 hash 定位到具体的位置上。
由于 HashEntry 的 value 是用 volatile 修饰的,保证了可见性,所以每次获取到都是最新值,get() 方法是非常高效的,因为整个过程中都不用加锁。
Remove 方法
V remove(Object key, int hash, Object value) {
lock(); //加锁
try {
int c = count - 1;
HashEntry<K, V>[] tab = table;
//根据 hash 找到 table 的索引
int index = hash & (tab.length - 1);
//找到索引对应的那个桶
HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) { //找到要删除的节点
oldValue = v;
++modCount;
HashEntry<K, V> newFirst = e.next;// 待删节点的后继结点
for (HashEntry<K, V> p = first; p != e; p = p.next)
//头插法
newFirst = new HashEntry<K, V>(p.key, p.hash,
newFirst, p.value);
//把桶链接到新的头结点
//新的头结点是原链表中,删除节点之前的那个节点
tab[index] = newFirst;
count = c;
}
}
return oldValue;
} finally {
unlock(); //释放锁
}
}
整个操作是在持有段锁的情况下执行的,需要注意的是删除结点后,该删除结点前面的结点会倒序,如下图:
1.8 版本
数据结构
-
8 的实现抛弃了 Segment 分段锁机制,利用 CAS + synchronized 来保证并发更新的安全,底层采用数组 + 链表 + 红黑树的数据结构,用 Node 替换 了 HashEntry(名称改变了而已),如下图所示:
put 方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算 hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化 table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // 利用 CAS 尝试写入数据
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 扩容处理
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) { // 利用 sychronized 获取锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
get 方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); // 计算 hash 值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//在桶上直接返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// 通过红黑树获取值
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 链表单向查询
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结
1.7 版本主要使用了分段锁的机制,将数据分成许多个 Segment,每个 Segment 拥有一把锁,当一个线程占用锁访问其中一个 Segment 的时候,其他 Segment 的数据也能被其他线程访问。有些方法需要跨 Segment,比如 size() 和 containsValue(),它们可能需要锁定整个表而而不仅仅是某个Segment,这需要按顺序锁定所有 Segment,操作完毕后,又按顺序释放所有段的锁,否则可能会出现死锁。
1.8 版本 抛弃了 1.7 版本的分段锁机制,引入了 CAS + Synchronized 的机制,数据结构也变成了 数组 + 链表 + 红黑树 的形式,CAS 是非常轻量级的同步操作,而且也跟新版本虚拟机对 Synchronized 的优化有关,有兴趣的同学可以去具体了解。
网友评论