在日常开发中,我们经常会使用HashMap,然而HashMap不是线程安全的,在多线程公用一个Map的情况下,ConcurrentHashMap通常是一个更好的选择。
ConcurrentHashMap采用下面的结构来存储数据,并使用CAS+Synchronized来保证并发更新的安全。当几个Node的hash值相同时,限将其存储为一个链表,当链表上的元素个数大于某个数时,就将其转化为一颗红黑树。
ConcurrentHashMap存储结构成员变量
//承载数据的数据,大小为2^n
transient volatile Node<K,V>[] table;
//扩容时使用的
private transient volatile Node<K,V>[] nextTable;
//为-1说明由线程在进行初始化操作;
//为-n(n>1) 使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重叠, 当有n个线程正在执行扩容时,sizeCtl在值变为 (基数 + n)
//否则,在未初始化时记录table的大小,在初始化后记录hash表的容量(n - n >>>2 n为table大小,此时该值为0.75n)
private transient volatile int sizeCtl;
上面的代码中,Node为存储数据的节点,将Node的hash值设置为MOVED 、TREEBIN 、RESERVED 可以表示不同的含义 :
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
} ...
}
//红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
...
}
//特殊的Node Hash值为MOVED,说明在扩容,存储nextTable引用
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
Hash
既然是一个HashMap,如何做Hash就是一个问题。ConcurrentHashMap 处理的一些思路非常巧妙。
首先是table的大小,会在传入的size的基础上进行处理,变成2^k。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
上面的代码先用一系列的移位和|操作把n变成二进制表示为0...01...1
的形式,然后加1,就会变成2^k了。这样做有什么好处呢?我们继续看。
//计算节点的位置 h 是 key的hash n 是table的大小
int index = (n - 1) & h;
上面我们知道了table的大小n是2^k,那么n-1就会是0..1....1
的形式,并且小于n,通过与h按位与就可以得到对应的index了。而在扩容时,ConcurrentHashMap会将table的大小变为原来的2倍,这样,计算index时就变成了(2*n-1)&h
,那么n-1为0011
的话,2*n-1就是0111
,这样扩容后的index要么等于扩容前的index,否则就等于index+n,减小了扩容时的计算量。
table初始化
table会延迟到第一次put时初始化,同过使用循环+CAS的套路,可以保证一次只有一个线程会初始化table。在table为空的时候如果sizeCtl小于0,则说明已经有线程开始初始化了,其它线程通过Thread.yield()让出CPU时间片,等待table非空即可。否则使用CAS将sizeCtl的值换为-1,置换成功则初始化table。注意table的大小为sizeCtl,初始化后将sizeCtl的值设为n - (n >>> 2)
即0.75n,这个值用来确定是否需要为table扩容。这种初始化的方式值得我们借鉴。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put操作
初始化后,使用tabAt函数通过Unsafe的getObjectVolatile方法获取index位置的值,如果为空,就进行cas设置table[index]的值,设置成功说明put完成,返回即可。不成功则重新进入for循环,进行其他逻辑。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
...
}
如果发生了table[index]非空,则可能是:1.发生了碰撞;2. 该key之前已经存在。这个时候需要对table[index]加锁。然后再检查一次index位置的node是否没有变化。这里的fh是节点f的hash值。还记得之前说过的红黑树首节点hash为TREEBIN等于-2么,这时就能区分到底是一个红黑树还是链表了,而后就可以进行链表\红黑树的插入or更新操作了。在操作成功后,如果节点对应的数据结构是一个链表且长度超过TREEIFY_THRESHOLD的话将会把该链表更改为一个红黑树。
V oldVal = null;
synchronized (f) {
if ( tabAt( tab, i ) == f )
{
if ( fh >= 0 )
{
binCount = 1;
for ( Node<K, V> e = f;; ++binCount )
{
K ek;
if ( e.hash == hash &&
( (ek = e.key) == key ||
(ek != null && key.equals( ek ) ) ) )
{
oldVal = e.val;
if ( !onlyIfAbsent )
e.val = value;
break;
}
Node<K, V> pred = e;
if ( (e = e.next) == null )
{
pred.next = new Node<K, V>( hash, key,
value, null );
break;
}
}
}else if ( f instanceof TreeBin )
{
Node<K, V> p;
binCount = 2;
if ( (p = ( (TreeBin<K, V>)f).putTreeVal( hash, key,
value ) ) != null )
{
oldVal = p.val;
if ( !onlyIfAbsent )
p.val = value;
}
}
}
}
if ( binCount != 0 )
{
//如果超过这个值,就把tab的i位置更改为红黑树哦
if ( binCount >= TREEIFY_THRESHOLD )
treeifyBin( tab, i );
if ( oldVal != null )
return(oldVal);
break;
}
最后一种情况就是节点的hash值为MOVED,-1了。这说明该节点是一个ForwardingNode,当前table正在向newTable迁移中,所以当前线程过去帮助迁移就好。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
ConcurrentHashMap的扩容
老实说,扩容这块的代码比较复杂,很多细节我还没有弄清楚,只能大概讲一讲我自己的理解,权当抛砖引玉,希望以后了解的更加深入以后再来补全。
上面我们提到,扩容时我们会将sizeCtl设置为一个与扩容前table大小n有关的数。这里的计算方式就是:
int rs = resizeStamp(n);
sizeCtl= (rs << RESIZE_STAMP_SHIFT) + 2;
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
这里可以看出,这个值肯定是一个负数,因为根据RESIZE_STAMP_SHIFT和RESIZE_STAMP_BITS的关系我们知道他的最高位肯定是1,再按位或Integer.numberOfLeadingZeros(n)的值,从直觉看,应当是一个绝对值很大的负数。具体可以带入不同的n值计算一下:
n sizeCtl
16 -2145714174
32 -2145779710
64 -2145845246
128 -2145910782
256 -2145976318
512 -2146041854
1024 -2146107390
2048 -2146172926
4096 -2146238462
8192 -2146303998
16384 -2146369534
这里可以看出jdk1.8的代码里对sizeCtl的注释是有问题的。理解了rs是干什么的,以及rs和扩容时的sizeCtl的关系,我们再来分析下面的代码:
- 当sc小于0时,说明已经开始了扩容,如果当前线程需要参与扩容,则通过CAS将sizeCtl的值加1
- sc大于0,说明还没开始扩容,则按照我们上面讲的方法计算出sizeCtl的值,初始化之,开始扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
扩容操作的中,如果nextTable为空,则初始化之,大小为之前table的2倍:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
计算一次扩容迁移的slot数目:
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
计算扩容开始和结束的index:
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
...
}
如果当前index为空则将其设置为一个FowardingNode,如果节点的hash为-1则说明已经处理过了,直接跳过就好
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
接下来就是重头戏了:如何迁移一个链表或红黑树节点:
synchronized (f) {
if ( tabAt( tab, i ) == f )
{
Node<K, V> ln, hn;
if ( fh >= 0 )
{
int runBit = fh & n;
Node<K, V> lastRun = f;
for ( Node<K, V> p = f.next; p != null; p = p.next )
{
int b = p.hash & n;
if ( b != runBit )
{
runBit = b;
lastRun = p;
}
}
if ( runBit == 0 )
{
ln = lastRun;
hn = null;
}else {
hn = lastRun;
ln = null;
}
for ( Node<K, V> p = f; p != lastRun; p = p.next )
{
int ph = p.hash; K pk = p.key; V pv = p.val;
if ( (ph & n) == 0 )
ln = new Node<K, V>( ph, pk, pv, ln );
else
hn = new Node<K, V>( ph, pk, pv, hn );
}
setTabAt( nextTab, i, ln );
setTabAt( nextTab, i + n, hn );
setTabAt( tab, i, fwd );
advance = true;
}else if ( f instanceof TreeBin )
{
TreeBin<K, V> t = (TreeBin<K, V>)f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for ( Node<K, V> e = t.first; e != null; e = e.next )
{
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
( h, e.key, e.val, null, null );
if ( (h & n) == 0 )
{
if ( (p.prev = loTail) == null )
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}else {
if ( (p.prev = hiTail) == null )
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify( lo ) :
(hc != 0) ? new TreeBin<K, V>( lo ) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify( hi ) :
(lc != 0) ? new TreeBin<K, V>( hi ) : t;
setTabAt( nextTab, i, ln );
setTabAt( nextTab, i + n, hn );
setTabAt( tab, i, fwd );
advance = true;
}
}
}
- 和put操作一样,先判断是红黑树还是链表。
- 根据前面的分析,由于扩容前table的大小为n=2^k,且扩容后的n2 = n * 2,所以节点在nextTable上的位置要么和在table上的位置一样,要么是在table上的位置加上n(比如hash&(n-1)为2,那么hash&(2*n-1)要么是2,要么是n+2),并且这个值只与n的非0bit的位置有关,所以使用
fh & n
可以将链表或红黑树上的的节点分为2类,一类是index没有改变的,一类是index变成了index+n的。 - 如果是链表,则生成2个新链表,放到对应的位置
- 如果是红黑树,则根据节点数目生成2个红黑树或链表(如果节点数目比较少则生成链表)
- 完成迁移后在原位置放一个FowardingNode标记已经处理成功
最后,如果已经完成全部迁移,则更新sizeCtl的值,用nextTable替换table。如果只是当前线程完成了自己的迁移任务,则修改sizeCtl的值。这里需要注意的是这一句:(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT
判断是否所有线程都完成了自己的操作,是的话就说明整个迁移完成了,将finishing设置为true。
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
size
ConcurrentHashMap还有一点需要注意的是其size方法其实是调用了下面的sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
这个函数返回的数据只是对所有CounterCell的数据的收集,在收集过程中很可能其他线程进行了增删等操作,所以是不准确的。想准确获取结果的话可能需要使用LongAdder或AtomicLong之类的对象在插入或删除对象时进行计数来实现了。
总结
Doug Lea 大神的代码思路很飘逸,各种CAS也是出神入化,尤其是扩容时把工作分开,多处同时执行的思路更是让人觉得代码居然还可以这么写。本文还留了一个坑,那就是为什么相同hash的节点超过一定数目后需要使用红黑树存储,红黑树有什么优势,它的实现原理是什么?鉴于本文篇幅已经很长了,我将会在下一篇文章中对红黑树进行解释。
网友评论