在多线程情况下,我们的HashMap在JDK1.8之前最大的问题就是会造成环链,在JDK1.8开始之后虽然解决了环链,但是还是会因为并发的情况下,导致数据覆盖而丢失。虽然我们有HashTable和Collections下的同步器可以解决这个问题,但是这两种方案都不能算是一个优秀的解决方案,所以就有了我们要介绍的ConcurrentHashMap。本文主要是针对JDK1.8的源码进行分析,但是在介绍之前也会简单提一下,1.8之前是如何设计的!
在了解ConcurrentHashMap不妨先了解一下HashMap!
JDK1.7
我们在JDK1.8之前采用的是Segment
+HashEntry
的方式实现的。结构如下:
我们是采用分段锁来实现并发的更新。Segment是继承自我们的ReentrantLock来充当锁的角色,每一个Segment都对应一个锁。从图中我们也可以看到,我们的每一个Segment对象都对应了哈希表的若干个哈希桶,相当于一小段哈希表!
这样我们在实现并发更新的时候,就不会锁住这个哈希表,而是锁住Segment对应的那一个对象那一部分,就会提高了我们的性能和效率。具体的源码这里就不分析了,因为我们主要是介绍1.8的ConcurrentHashMap。
JDK1.8
我们的ConcurrentHashMap在1.8之后就放弃了分段锁的解决方案,而是采用了CAS+Synchronized来保证并发更新的安全。底层和我们的HashMap一样,采用的是数组+链表+红黑树的存储结构!
我们在上面说到了1.8是采用CAS+Synchronized来保证并发安全,所以在如果对CAS还不了解的话,可以先看我的关于CAS的博客。(点击跳转)
好了接下来我们就开始对源码进行分析了。
基本属性
ConcurrentHashMap很多基本属性都和我们的HashMap一样,所以这里我只介绍几个不一样的,而且后面我们分析源码会用到的。
//我们的哈希表,可是使用迭代器来进行迭代
transient volatile Node<K,V>[] table;
//默认为null,扩容的时候新生成的数组,其大小为原数组的两倍。
private transient volatile Node<K,V>[] nextTable;
//基础计数器,通过CAS来进行更新
private transient volatile long baseCount;
/*
*默认为0,用来控制table的初始化和扩容操作的
*当为负数时,它正在进行初始化或者在扩容:
*-1,表示正在进行初始化;-N表示N-1个线程在进行扩容
*当为正数的时候:
*如果table未初始化,表示需要初始化的大小;
*如果table初始化完成,表示table的容量,默认是table的0.75倍,
*/
private transient volatile int sizeCtl;
还有就是对比我们的HashMap,我们的Node也进行了重写,将我们的值和下一个结点都用了Volatile来修饰,线程修改后立刻刷回主存,增加了内存的可见性。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
...部分代码省略...
}
构造方法
ConcurrentHashMap有五个构造方法,其中四个与HashMap类似,所以我们主要介绍这个多了一个参数的构造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
我们第一个参数是容量大小,可以指定;第二个参数是我们的负载装载因子;第三个是指定我们的更新的并发线程数量;然后进行一些边界处理和赋值处理。最后就将我们的要扩容的大小赋值给了sizeCtl(上面介绍了,我们下次要扩容的大小),注意这里我们并没有进行初始化table,而是在第一次put的时候才会进行初始化,下面会讲到。
我们同样会在上面的构造方法里面看到一个方法tableSizeFor,我们点进去看,原来和我们的HashMap的那个设计容量为2的整数次幂方法一样,至于为什么要设置成2的整数次幂,我在HashMap方法里面也提到了。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
put方法
我们知道在进行第一次put的时候会进行扩容,那么如果有多个线程同时进来,我们是如何保证只有一个线程成功的进行了扩容呢?我们在第一次put的时候putVal方法里面有这么一行代码
if (tab == null || (n = tab.length) == 0)
tab = initTable();
我们调用了initTable方法,在下面注释上给出解析
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果sizectl(sc)小于0,说明已经有线程进行在初始化了,我们的其他进来的线程作罢
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//使用cas操作,将我们的sc更新为-1,代表在进行初始化了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//右移两位再操作,相当于0.75*n,设置了一个扩容的阈值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
回来我们继续看一下我们的完整的putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//key的散列,获取哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果表没有进行初始化,则进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果可以直接将我们的哈希值插入数组,则直接存进去,不用加锁
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果插入的点是我们的table的连接点,说明在扩容,我们就帮助当前线程扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//然后在进行具体的增加操作的时候,加锁
synchronized (f) {
//确定f在tab中是链表的头结点
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果哈希桶是树,按照树的方式插入新值
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果节点大于等于8,进行变换红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//调用生成树的方法
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//能执行到这一步,说明节点不是被替换的,是被插入的,所以要将map的元素数量加1
addCount(1L, binCount);
return null;
}
当table容量不足的时候,即table的元素数量达到容量阈值sizeCtl,需要对table进行扩容。
整个扩容分为两部分:
- 构建一个nextTable,大小为table的两倍。
- 把table的数据复制到nextTable中。
这两个过程在单线程下实现比较简单,但是在多线程下比较复杂。我们的ConcurrentHashMap是支持并发插入的,这里用图文简单分析一下:
多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。
(这里具体的addCount方法和transfer方法暂时看的不是大懂,后面会补上!)
get方法
get方法比较简单,就是如果是在桶第一个就返回;如果是树的结构调用树的方法去遍历查找;如果是链表就遍历下去查找;如果都没找到就返回null;
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get方法这么简单贴上来只是为了说明,我们的get方法是没有加锁的,无阻塞的。之所以能够正确的读取值是因为我们在上面也说到了,重写了node,里面的变量都用了volatile关键字来进行修饰。而且通过代码可以得出ConcurrentHashMap的key和Value都不能为null。
面试题分析
同样的,再进行了稍微的源码分析,我们试着来解决一些面试题。
1、ConcurrentHashMap使用什么技术来保证线程安全?
我们在上面分析过了,1.7的时候采用的Segment
分段锁来实现,1.8采用的是CAS+Synchronized来实现的。具体实现细节,balabala简单描述一下。
2、ConcurrentHashMap的get方法是否要加锁,为什么?
不用,我们说过了,get方法是无阻塞不加锁的。因为我们重写了node类,里面的变量都用了volatile关键字来进行修饰,可以保证最新值的获取!
3、ConcurrentHashMap1.7和1.8的区别?
数据结构
- 1.7:
Segment
+HashEntry
- 1.8:数组+链表+红黑树
并发安全实现
- 1.7:分段式锁(锁的对象是一个Segment)
- 1.8:CAS+Synchronized(降低了锁的粒度,对象是一个Node)
其他的面试题,无非与HashMap大径相似,可以看看我的HashMap,里面也有面试题详解。(点击跳转)
总结
关于源码其实还有很多都没有分析,因为这比HashMap要复杂也难。所以挑一些高频考点来进行分析。感谢下面的参考资料!
参考资料
https://www.jianshu.com/p/e694f1e868ec
公众号《Java3y》多线程系列文章
网友评论