前言
在分析ConcurrentHashMap之前,希望大家先读完HashMap的源码,因为ConcurrentHashMap基本算法和HashMap是一致的,只是增加了并发控制而已,有了HashMap的基础才能更好的理解ConcurrentHashMap,推荐大家先看看这两篇文章:
HashMap的hash机制详解
HashMap源码分析
1. 重要成员
/**
* 初始化和扩容标志,也是并发控制非常重要的一员,当sizeCtl<0时
* 表明当前正在初始化或扩容,sizeCtl=-1正在初始化,sizeCtl<-1说明正在扩容
* 而且此时sizeCtl = -(1+正在扩容的线程数量).
* 当还未进行初始化时sizeCtl为初始化容量大小,默认16,
*/
private transient volatile int sizeCtl;
/**
* 扩容时使用
*/
private transient volatile int transferIndex;
/**
*真正存储数据的数组
*/
transient volatile Node<K,V>[] table;
这里要注意,此处没有了JDK1.7中的分段锁的概念了,全部都是基于CAS的。
2. 并发基础——CAS
整个ConcurrentHashMap完全没有方法级别的锁,到底是什么机制来保证并发的呢?这里简单的介绍下。
首先大家对乐观锁和悲观锁要有个大致理解:
- 悲观锁 :悲观锁认为竞争一定会发生,所以不管如何都会锁住资源,不允许其他线程进入,sychorinized关键字就是标准的悲观锁
- 乐观锁: 所谓的乐观锁就是认为竞争不一定会发生,比如有个变量A=3,我希望将它变成A=4,那么可以先比较 如果A=3,那么说明没有其他线程竞争修改这个变量,我可以直接设置A=4, 这个比较和设置过程在硬件上是原子级别的,如果比较时发现A!=3,说明有其他线程修改了,这个情况会被返回,调用者可以针对这个情况特殊处理。
我们看下ConcurrentHashMap是如何将一个Node节点放到数组table的一个位置上的:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
这里重点关注传入的c和v,整个compareAndSwapObject就是先比较tab数组上某个位置(通过内存偏移量算出来的) 上的节点是不是 c,如果是就认为没有竞争,直接将该位置设置为 v,否则返回false。一般都是通过一个死循环来调用这个方法的,比如:
for (Node<K,V>[] tab = table;;) {
if (casTabAt(tab, i, null, r)) {
//修改成功,会继续执行其他业务
break;
}
//修改失败,会死循环下次重试
}
这种机制就是保证ConcurrentHashMap高效并发的基础了。
由于ConcurrentHashMap处理hash冲突以及hash算法都是一样的,所以这里一些基本功能不再分析,我们重点分析一些由于并发导致的和HashMap区别较大的方法。
3. 扩容
ConcurrentHashMap的扩容是支持多线程并发扩容的,所以扩容效率很高,在看源码之前,我们先大致看下并发扩容的思想,扩容的核心就在于将旧的table数组中的数据迁移到新的数组中来。我们先看张图:
并发扩容.png
之所以能并发扩容就在于这里,将现有的数据分成了几部分,每个线程领一个自己的部分 ,线程领到了自己的部分后如何复制到新的数组的呢?
单个线程复制过程
对于扩容的单个线程来说,每次复制都是从尾部开始,一个节点复制完毕后会在这个位置放置一个ForwardingNode节点,表明这个节点已经处理过了。
有了上述基础我们再结合代码分析.
3.1 确定每个线程扩容时负责的数组部分长度
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
这里主要是根据cpu的数量来计算的,但是如果算出来小于16的话,stride=16,也就是说一个线程处理的数量最少是16.
3.2 申请新空间
扩容第一步就是申请新的table数组,这个和HashMap一样,都是直接两倍扩容:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
3.2 遍历自己负责的所有元素
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) //分支1
advance = false;
else if ((nextIndex = transferIndex) <= 0) { //分支2
i = -1;
advance = false;
}
else if (U.compareAndSwapInt // 分支3
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//省略一些具体实现
}
这里以一个长度32的ConcurrentHashMap扩容到64为例,记住,之前在申请空间时:
if (nextTab == null) { // initiating
//省略代码
nextTable = nextTab;
transferIndex = n;
}
这也就意味着,一个线程刚开始扩容时,transferIndex = 32, i=0,bound=0;,所以会进入分支3,
这个时候会通过CAS操作将transferIndex赋值为16,bound=16, i=31. 记住transferIndex,每个线程扩容起始位置都是由它决定的,这个线程将他改成了16,那么下个并发线程扩容就会从16开始了,从而做到每个线程负责自己的数据。最终大部分操作都会进入分支1,通过--i来遍历该线程负责的部分数组,从而拷贝数据。
3.3 迁移table数组中的单个元素。
3.3.1 该位置没有数据
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
这个时候无需拷贝,只要将再这个位置放置一个ForwardingNode节点即可。
3.3.2 该位置已经被拷贝过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
此处的MOVED为-1,ForwardingNode的hash值都为-1,说明这个节点已经被处理过了。所有数据拷贝完成后会重新遍历一遍检查,这个时候时会进入这个分支。
3.3.3 该位置是一个链表
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//省略部分代码
}
这里的思想其实和HashMap一样(不熟悉的可以先看看本文开始推荐的两篇文章),都是把链表拆成两部分,一部分放在nextTab的i位置,一部分放在nextTab的i+n位置。
注意,这里使用了synchronized锁住了当前节点,这也是一种没办法的事。但由于锁住的只是一个节点,并不会影响到其他扩容线程。
3.3.4 该位置是一个红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
红黑树道理也是一样,也是拆分成两部分,但这里会统计放在nextTab的i位置的数量和i+n位置的数量,如果低于6会退化成链表。
3.3.5 扩容结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { //分支1
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //分支2
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//走到这里说明所有的扩容线程都结束了,也就是此次扩容彻底结束。
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
当自己的部分数组都copy完毕后,通常会先进入分支2,将finishing置为true,i=n,由于此时i=n,会重新遍历一遍自己负责的部分数组,确保每个节点都被复制了,最后会进入分支1,将sizeCtl 置为下次扩容的阈值,其实sizeCtl = n2-n/2 = 2n 0.75. 也就是新的容量乘以扩容因子。至此,此次该线程扩容结束。
4. 总结
总体而言,本文并没有再重复介绍一些和HashMap中一样的算法,比如具体的hash算法,比如如何判断扩容时怎样将一个链表拆成两部分,主要介绍了思路以及并发相关的,个人理解,如果有错误恳请指正。
网友评论