ConcurrentHashMap中put()这个方法很容易引起并发操作的问题,现在来研究下put()方法的实现
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
//onlyIfAbsent跟HashMap一样,就是判断是否要覆盖,默认为false,覆盖
final V putVal(K key, V value, boolean onlyIfAbsent) {
//这句话可以看出,ConcurrentHashMap中不允许存在空值,这个是跟HashMap的区别之一
//通过这个机制,我们可以通过get方法获取一个key,如果抛出异常,说明这个key不存在
if (key == null || value == null) throw new NullPointerException();
//ConcurrentHashMap中的hash值计算方法,跟HashMap中的差不多,不过最后的结果要
//与0x7fffffff进行与操作,这个地方我还不明白为什么有这个与操作
int hash = spread(key.hashCode());
//binCount=0说明首节点插入,未进行链表或红黑树操作,因为后面会对这个值进行更改
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组为空或者长度为0,进行初始化工作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果获取位置的节点为空,说明是首节点插入情况
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash值等于MOVEN(默认-1),说明是协助扩容,这个在后边讲解ForwardingNode类
//进行进一步解析
else if ((fh = f.hash) == MOVED)
//协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//对桶的首节点进行加锁
synchronized (f) {
//双重判定,为了防止在当前线程进来之前,i地址所对应对象已经更改
if (tabAt(tab, i) == f) {
//为什么fh一定要大于等于0,这个原因在于TreeBin的hash值的设定,TreeBin类型
//的hash值默认设置为了-2
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//在当前桶中找到位置跳出
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//当到桶的结尾还没找到,则新增一个Node
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果hash小于0,判断是否是TreeBin的实例
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果binCount值不等于0,说明进行了链表或者红黑树操作
if (binCount != 0) {
//如果binCount大于8则进行树化,但真正的转换成红黑树不是8的长度
//当长度超过64才会真正的树化,处于8-64之间的还只是数组扩容
if (binCount >= TREEIFY_THRESHOLD)
//这个方法我在红黑树一节详细讲解了,不清楚的可以看红黑树那节
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//计数方法
addCount(1L, binCount);
return null;
}
我对这个方法进行了注释,可以直观的看代码进行了解,主要分析一下三个方法
1、数组初始化方法initTable()
2、线程协助扩容方法helpTransfer()
3、计数方法addCount()
数组初始化initTable()
我们来看下这个方法的代码:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//判断是否数组为空或者数组长度为0(未初始化)
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl值小于0,则每个进入到这里的线程要作线程让步操作
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//SIZECTL是地址偏移量,和前边我们讲到的tabAt()那三个方法的地址偏移量一样
//如果SIZECTL对应地址的值与sc相等,说明当前的线程是第一个到达这条语句的
//线程,那么就会将SIZECTL地址所对应的值替换成-1,而SIZECTL地址偏移量对应的
// 对象就是sizeCtl。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次进行判断,防止在进行U.compareAndSwapInt(this, SIZECTL, sc, -1)的时候
//有其他线程并发进入方法,导致出错,使用双重锁
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
//如果未初始化,会将sizeCtl设置成为当前容量类似hashmap中阈值threshold
sizeCtl = sc;
}
break;
}
}
return tab;
}
这个地方有点疑问,sizeCtl到底指代什么,是阈值还是容量,在传入容量的构造方法中,是直接将cap赋值给sizeCtl,也就是说在这里的sizeCtl是容量大小,而数组初始化的时候,又将容量的0.75倍赋值给了sizeCtl,找了下网上的资料
引用这个链接(https://www.jianshu.com/p/c0642afe03e0)的说法:
sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
**-1 **代表table正在初始化
**-N **表示有N-1个线程正在进行扩容操作
其余情况:
1、如果table未初始化,表示table需要初始化的大小。
2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍,居然用这个公式算0.75(n - (n >>> 2))。
也就是说,sizeCtl的值有这几种情况,跟hashmap中单一指代阈值和容量的不同
线程协助扩容方法helpTransfer()
讲这个方法之前,我们需要先了解一些其他的类与方法,首先是ForwardingNode,一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
//MOVEN默认值-1
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k){...}
}
然后是方法resizeStamp(),这个方法是计算校验码的
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
好,现在我们来看下helpTransfer()方法的使用:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
首先判断是否能够协助扩容的条件
1、数组不为空
2、传入的首节点是ForwardingNode的实例
3、该实例的nextTable类型不为空
在这三个条件的前提下,进行下一步,通过resizeStamp()方法拿到一个校验码,然后再判断sizeCtl的值是否小于0,因为sizeCtl负数表示正在扩容。
停止扩容的四个条件(这部分我还没理解,等后面理解了回来补充):
1、(sc >>> RESIZE_STAMP_SHIFT) != rs
2、sc == rs + 1
3、 sc == rs + MAX_RESIZERS
4、transferIndex <= 0
如果符合扩容条件,就使用U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)在sc上加一,然后调用transfer()方法进行数据迁移,这个方法比较复杂,到时候拿出来单独讲.
参考链接:
https://blog.csdn.net/u011392897/article/details/60479937
计数方法addCount()
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//计数部分
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//扩容部分
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
这个方法分为两部分,第一部分是计数,第二部分是扩容
第一部分计数,
当counterCells为空并且U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)返回true,则直接到下一部分是否扩容的判断,否则 CAS 失败后转入更新 counterCells ,防止CAS 自旋,使用的方法是fullAndCount(),这个方法取自striped64的LongerAdd的方法。
关于striped64的解析详情可以看以下链接:
http://brokendreams.iteye.com/blog/2259857
说到这个,我就不得不提一下这个类CounterCell,这个类代码如下:
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
我们看到这个类用@sun.misc.Contended进行了注释,这个注释是自动填充缓存行用的,为什么要自动填充缓存行?
因为这样可以防止伪共享的情况,那么什么是伪共享呢?
我解释一下,就是当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享,也就是避免头结点和为节点加载到同一个缓存行,使头尾节点在修改时不会互相锁定,基于此,我们需要自动填充缓存行。
这个类中我们可以看到只有一个volatile修饰符的变量value,也就是单个counterCell对象累加值
第二部分扩容,
满足当前容量大于sizeCtl值并且数组不为空,数组长度小于最大容量值的时候,就开始扩容,跟helpTransfer()方法中一样,这里也进行了同样的判定,当线程刚进来的时候,sc是正的,所以执行else if的语句,U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2)
这条语句将sizeCtl直接赋值成了一个负数,如果赋值成功,则调用数据迁移方法transfer().
在代码的最后有个s = sumCount()的语句,这个是ConcurrentHashmap内部计数完反馈给我们的值,在size()中也有相关调用:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
我们来看下sumCount()方法的内部实现:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看出这个计算是分为两部分,基础部分是baseCount部分,然后还要加上countCells数组的所有值,通过之前的分析我们知道,baseCount是CAS成功后会自动累加的值,而countCells数组是在CAS失败,也就是出现并发的情况下,进行累加的数组,这个数组类似segmenet一样,采用分段锁,最后将二者的值相加从而得到一个近似值。
网友评论