1. JDK1.7实现
1.1 概述
在多线程环境下,使用HashMap进行put操作时可能会引起死循环,导致CPU利用率飙升,另一方面也无法解决数据同步问题,因此在并发环境下不适合使用HashMap。为了解决数据同步以及并发问题,使用 HashTable
、synchronizedMap
的确能解决上述问题。但是,这两种方式基本都是对整个哈希表进行加锁,这在并发环境下,严重影响系统性能。因此,引入ConcurrentHashMap
之后,既能解决数据同步问题,也不影响系统性能。
![](https://img.haomeiwen.com/i2269232/60d0ef75922478fc.png)
1.2 实现原理
ConcurrentHashMap是由Segment和HashEntry数组结构组成,和HashMap一样,仍然是数组+链表组成。其中
Segment
使用了可重入锁ReentrantLock
,HashEntry
则用于存储键值对数据。
ConcurrentHashMap采用了分段锁技术,分段锁是将数据按段存储,然后给每段数据配一把锁。当一个线程访问其中一个段数据的时候,首先对这段数据进行加锁,其他段的数据也能被其他线程访问。但是,有些方法需要跨段访问,比如size()
和containsValue()
,它们可能需要按顺序锁定整个表的数据段,操作完毕后,又按顺序释放所有段的锁。
1.2.1 get()
get()
不需要加锁,只需要将key通过hash之后定位到具体的Segment,再通过一次hash定位到具体的元素上。由于HashEntry中的value属性是用volatile关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
1.2.2 put()
put()
需要加锁,虽然HashEntry中的value属性是用volatile关键词修饰的,但是并不能保证并发的原子性,所以put()操作需要加锁处理。首先也是通过将key通过hash之后定位到具体的Segment,再通过一次hash定位到具体的元素上,在put()之前会进行一次扩容检查,之后再做插入操作。而HashMap是在插入元素之后才校验是否需要扩容,有可能扩容之后后续就不再有元素插入,这样就浪费了本次扩容(扩容非常消耗性能)。
1.2.3 size()
每个Segment
都有一个volatile
修饰的全部变量count
,求整个ConcurrentMap
的size
时,很明显就是将所有的count
累加即可。但是volatile
修饰的变量却不能保证多线程的原子性,所以直接累加很容易出现并发问题。如果每次调用size()
方法时,将其余的修改操作加锁,效率也很低。所以最终的做法是,先尝试两次将count
累加,如果容器的count
发生了变化再加锁统计size
。
ConcurrentHashMap统计时如何知道count大小发生了变化呢?每个Segment 都有一个
modCount
变量,每当进行一次put和remove操作,modCount将会+1。只要modCount发生了变化就认为容器的大小也在发生变化
2. JDK1.8实现
2.1 概述
JDK1.8中的ConcurrentHashMap优化如下:
- 直接用Node数组+链表+红黑树的数据结构来实现。
- HashEntry改为Node,用来保存key、value以及key的hash值。
- 抛弃原有的Segment分段锁,采用CAS+synchronized 对每一行数据进行加锁,保证并发安全性。
-
val
和next
都使用了volatile修饰,目的是在多线程环境下线程A修改结点的val或者新增节点的时候对线程B是可见的。
![](https://img.haomeiwen.com/i2269232/e02396cacb6a044d.png)
2.2 实现原理
2.2.1 get()
- 根据key计算hashcode,定位到该table索引位置,如果查询的key正好在首结点Node上,那么直接返回值。
- 如果是红黑树,那就按照树的方式获取值。
- 如果都不满足,那就按照链表的方式遍历获取值。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算两次hashcode
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// 读取首节点的Node元素
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // 如果该节点就是首节点就返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来
// 查找,查找到就返回
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 既不是首节点也不是ForwardingNode,那就往下遍历
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
2.2.2 put()过程
- 元素插入前检查是否需要进行初始化,如果没有初始化就先调用
initTable()
初始化。 - 根据key计算hashcode,定位到对应的
Node
。如果Node为空,利用CAS
尝试写入数据,失败则自旋保证成功。 - 如果当前位置的
hashcode == MOVED == -1
,则需要进行扩容。 - 如果都不满足,则利用
synchronized
锁写入数据。 - 如果链表节点数量大于或等于
TREEIFY_THRESHOLD
(默认值为8)则要转换为红黑树。 - 如果添加成功就调用
addCount()
统计 size,并且检查是否需要扩容。
TREEIFY_THRESHOLD
用于判断是否需要将链表转换为红黑树的阈值。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 两次hash,减少hash冲突,可以均匀分布
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) { // 对这个table进行迭代
Node<K,V> f; int n, i, fh;
// 这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果i位置没有数据,就直接无锁插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果在进行扩容,则先进行扩容操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 表示该节点是链表结构
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 这里涉及到相同的key进行put就会覆盖原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 插入链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树结构
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果链表的长度大于8时就会进行红黑树的转换
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 统计size,并且检查是否需要扩容
addCount(1L, binCount);
return null;
}
2.3 代码示例
public class ConcurrentHashMapDemo {
private static final ConcurrentHashMap map = new ConcurrentHashMap();
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for (int i=1; i <= 10; i++) {
EXECUTOR_SERVICE.execute(new MyThread());
}
}
static class MyThread implements Runnable {
@Override
public void run() {
map.put("name", Thread.currentThread().getName());
map.size();
System.out.println(map.get("name"));
map.remove("name");
}
}
}
网友评论