初始化的是什么都没做,集合都这样,节省性能。
public ConcurrentHashMap() {
}
看put操作
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 注意,这里,put不允许key和value为null,这点和hashmap不一样
if (key == null || value == null) throw new NullPointerException();
// 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 遍历node数组
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 前面都是一些局部遍历,没有并发问题
// tab没有初始化,就初始化
if (tab == null || (n = tab.length) == 0)
// 初始化,看下文
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 因为,前面没有加锁,所以前面判定没有初始化,可能到了这里,就别别的线程初始化了,所以这里判定
// 同时,while循环,自cas
while ((tab = table) == null || tab.length == 0) {
// sizeCtl这个变量,保证了可见性。这个变量的作用是,多个线程争取这
//个变量资源,挣到了就把这个变量cas变成-1,其他线程发现这个变量变成
//了-1,则自旋,循环获取这个资源。由于volatile保证了可见性,所以没有问题。
// private transient volatile int sizeCtl;
if ((sc = sizeCtl) < 0)
// 如果sizeCtl < 0,则说明,资源被别人争抢到了,让出cpu资源
Thread.yield(); // lost initialization race; just spin
// cas改变成-1,SIZECTL是sizeCtl的内存地址
// cas失败,则继续while循环
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 这里获取到资源,因为,这里还可能其他线程初始化了table,所以这里再次判定,别人初始化了,则break跳出while
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc变成了数组扩容的临界值,n为16,sc为12
// 也就是说put第12个元素的时候就会扩容
sc = n - (n >>> 2);
}
} finally {
// 初始化之后,释放资源,sc是扩容临界值
sizeCtl = sc;
}
break;
}
}
return tab;
}
总结:
1、initTable通过一个改变volatile变量,起到锁的作用。
2、初始化table长度为16,扩容临界值为12。
再回到putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算一下hash值,跟hashmap计算方式有一点不同,就是最高位置为0了,不过无所谓,反正hash值是多少都为所谓
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 前面分析到了这里,走完了这里之后,继续for循环
tab = initTable();
// 然后会走入这里
// tabAt方法是,从tab数组的i这个位置的元素, (n - 1) & hash是取模运算
// 可是,这里曾经让我很疑惑,因为,table数组是volatile,为什么不直接table[index]这样读呢
// 原因是,table是volatile,但是用了tab这个局部变量接收,而tab不是可见性的,也就是说,tab不会感知到数组的变化。
// 详情看[volatile 数组可见性问题](https://www.jianshu.com/p/0fa5451e9dd0)
// 但是其实这里,不保证可见性也没关系的,因为,下一步是cas替换,顶多替换失败,自旋重来,只是性能差了一点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 没有元素则cas替换table数组内的元素
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果数组这个的index下的元素,正在扩容,则帮助扩容
// 这个扩容时候再说啊,move是-1,因为扩容的时候,会放一个假的node占位,表示这个node部分正在扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 进入这里,说明table[index]有元素,所以下一步进行链表或红黑树添加元素
V oldVal = null;
// 先把table[index]这个元素锁住
synchronized (f) {
// 这里再判定一次,因为,可能在这锁之前,有人把table[index]元素改了。
if (tabAt(tab, i) == f) {
// fh〉0 说明这个节点是一个链表的节点 不是树的节点
// 因为树节点是treebin节点,这个类的hashcode固定死了,是-2
if (fh >= 0) {
binCount = 1;
// 遍历链表节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 这个节点的key和put的key相同,则替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 添加到链表尾节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 是树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用treeBin的putTree方法
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// 放置不成功,则替换节点。
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 当操纵链表的时候,binCount 是遍历的链表的长度,当操作树的受,binCount被重置为2了
if (binCount != 0) {
// 链表长度大于8则,树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果替换了元素,则把旧元素返回回去
if (oldVal != null)
return oldVal;
break;
}
}
}
// 元素添加进去了,size需要变,还可能需要扩容,这些东西都需要复杂操作保证线程安全性
// 这里面设计到了concurrenthashmap最难的部分,比较复杂
addCount(1L, binCount);
return null;
}
看了个大概的流程,总结一下:
1、put操作。锁的是table的一个node节点。所以并发很高。
2、同时,大量的cas操作进行自旋。
看addCount代码
private final void addCount(long x, int check) {
// countercell这个东西,当cas替换basecount的时候,basecount代表集合
// 的size,比如,再极其高的并发下,有很多个线程会cas替换basecount,自然
// 就会很多线程替换失败,所以,这个大佬就设计了这个玩意。
// 还有一个LongAdder类,思想跟这里是一样,LongAddr的作用就是跟atomicLong作用差不多,但是没有atomicLong功能丰富,但是longAddr并发性能好一些。
//大概的原理是,分段锁的思想,把cas冲突,分段,也就是分成很多个countercell,然后各个线程会在自己的countercell中cas竞争,所以就把cas的压力分担下去了。
// 详细的源代码,暂时不分析,有点复杂的
CounterCell[] as; long b, s;
// counterCells 初始为null,所以会直接cas加1替换baseCount这个值
// baseCount 是volatile修饰,存储了concurrenthashmap的size
// cas失败则起用 CounterCell统计
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 这个暂时不分析
fullAddCount(x, uncontended);
// 这里为什么直接返回,不进行检查是否扩容呢?
// 因为,concurrenthashmap,无法非常准确的知道size的大小。即使高并
// 发下,size超高了扩容临界值,也不会影响集合的使用。所以这里留给别的线
// 程去扩容,否则fullAddCount已经做了很多事情了,再去扩容的话,这一个put
//下来,消耗就有点大了
return;
}
if (check <= 1)
return;
// 统计size,这个方法后面再说
s = sumCount();
}
// check >=0 则检查是否要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// sizeCtl除了初始化table的时候用于争抢资源达到锁的效果之外(也就是sizeCtl为-1的时候)。
// sizeCtl还可能存储的是扩容的临界值,初始化为12(sizeCtl为正数)。
// sizeCtl还可能为非-1的负数,这个负数是在扩容的时候,sizeCtl用来统计多少
// 个线程正在扩容的,但是并非真的记载的多少个,而是各种运算的结果,比
// 如,只有1个线程正在扩容给,sizectl为-212454(这个是假设值)
// 大于临界值则扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 这个rs是通过位运算之后得到的一个很大的数,不管
int rs = resizeStamp(n);
// sc < 0则说明别的线程正在扩容
if (sc < 0) {
// transferIndex 记载的是扩容还剩多少个坐标,是从右往左一批一批
// 扩容的,<0 则说明扩容完了,则break,nextTable是扩容新建的数组,=null也
// 说明扩容完了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 这里统计扩容线程数,注意,这里是负数,而且并非是直接的个数就代表多少个线程
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 一起去扩容,下面会分析
transfer(tab, nt);
}
// 这里cas成功了,则说明这是第一个扩容的线程
// 看,sizectl并非是真正的线程个数的数,而是经过了位运算,得到的一个很大的负数
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
// 扩容之后统计数,因为这是在while循环里,如果这里统计的个数,while循环判断后,发现还要扩容,则还会扩容
s = sumCount();
}
}
}
总结:
1、addCount就是统计数量,以及扩容。
2、但是,因为,高并发下,concurrentHashmap并非那种直接lock住全部的那种强一致性的集合,所以,统计数量,会大量的cas和复杂的操作,而且为了减轻cas的压力还引入了CounterCell的机制。
3、concurrentHashMap,size+1的时候,如果不引入conterCell,那么只能锁住size字段,然后单线程操作size,其他线程只能等待,
这样其他线程等待是一个很浪费的操作,只要我们不获取size,就不需要实时知道size大小,也就是说,size不需要保证强一致性。
所以,可以想到的办法是:分段锁的思想,把多线程操作size,分担成多块,每块多个线程自己内部竞争+1,最后,再合并起来,
这样操作size就不需要单线程操作,其他所有线程等待了。
看源码,很大的好处就是学习这种思想。
3、扩容是多线程一起扩容的。
接下来说一下扩容。
先大概说一下扩容的原理:
1、会把table数组,分成几个stride片(不一定叫stride,反正就是分成几片)。
2、然后按片进行扩容,一个线程先领一片,执行完了再领一片,其他线程如果发现数组正在扩容,则帮助扩容,也过来领一片,知道最后,全部扩容完。
3、分片,是从数组的右边往左边分的,领片扩容也是如此。
4、扩容锁的时候,也只是会锁正在扩容的那个node。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 分片的每片大小是多少,ncpu是所有cpu的线程数,如果cpu是单核单线程的,则一片的大小就是整个table的大小,因为无法利用多线程增强性能。
// 如果是多核,则stride是,n / 8 / 线程数
// 如果分片之后,每个分片太小了,小于最小分片16,则最小分片就是16,分的太小没有意义
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab 是扩容新建的数组,=null则说明这是第一个扩容线程,前面没有新建扩容数组
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 扩容,大小扩容了2倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//transferIndex 记载的是没有被线程认领的片的坐标,记录的是从右往左的index,初始时数组的长度
transferIndex = n;
}
int nextn = nextTab.length;
// 这个节点就是用于占位的,hashcode为move也就是-1,表示这个节点正在扩容
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 控制下面while循环的,为false则跳出循环
boolean advance = true;
// 这个变量,使得,扩容之后,还会再从右往左检查一遍,是否所有的节点都用move节点标识了,有没有被忽略的。
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// 初始i=0,bound=0,所以,自然不成立
// 等后面i变成了正在线程正在扩容的分片中的node节点的下标,则 --i >= boud,则说明这个分片的扩容还没有完成,则跳出while循环,继续扩容
if (--i >= bound || finishing)
advance = false;
// transferIndex < 0,则说明扩容完了,但是并不会马上就跳出这个方法,还会检查一遍,检查完了之后,finishing置为true,则跳出方法
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 这里是线程领分片,transferindex代表还有多少没有分片的下标
// 不管是初始扩容线程,还是帮助扩容线程,第一次进入这个方法,会进入这里领取分片,线程扩容了一个分片之后,前面 --i >= boud不成立,transferindex < 0也不成立,还会进入这里获取下一个分片
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 跳出前面的while循环,则这里下去会进行真正的扩容
// i < 0则说明扩容完了,前面while循环,transferindex <=0会讲i置为-1
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 只有finishing为true,才算真正扩容完了,会return跳出方法
if (finishing) {
// 扩容完了,将这些东西重置一下
nextTable = null;
table = nextTab;
// sizeCtl又记录了扩容的临界值:n / 2 - 0.5n = 1.5n
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 第一i < 0, finishing没有被置为true,会进入这里
// 这里把i = n,然后i 又会一个一个遍历每个节点,检查一遍是否每个节点都是move
// 并且这里会把finishing变为true。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 如果这个节点是null,则置为moved节点,不让别人再put了
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 这个节点是move,则说明别的线程正在扩容这个节点,则什么都不做,找下一个节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 这个节点不为null,且没有其他线程扩容,则进入这里
// 锁住这个节点
synchronized (f) {
// 这里再判断一下,如果锁之前其他线程改变了这个节点,则跳出继续循环,什么都不做
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 则是链表
if (fh >= 0) {
// 跟hashmap一样,计算节点hashcode的高位是否为1,然后,根据高位是否为1,梳理出两条链表,一条是不需要改变坐标的,一条是需要坐标 + length的。
// 但是,这里有一点要说明:放到新链表得都是new Node节点,而不是改变Node节点得指针。这么做得目的是,不改变原来得节点,使得get方法无锁直接能获取到值。否则,无锁get会获取到null值。
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
// 每一个节点都new一个新节点
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 把这两条链表设置到新数组上去,新数组的i + n这个坐标,只有i这个节点扩容
// 之后的元素才可能放上去,所以没有并发问题
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 把旧数组的这个node节点,替换成move节点,表示正在扩容
setTabAt(tab, i, fwd);
// advance = true,继续循环,找下一个节点
advance = true;
}
// 树节点也一样
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
transfer这个方法,真的是内容比较多。
总结一下:
1、扩容是多线程的,但是不是开启多线程,而是利用cpu的多线程特性,如果其他线程操作某一个节点,发现正在扩容,则帮助扩容。
2、数组小与16不会分片多线程扩容,因为数组小,多线程没必要。
3、扩容,只会锁正在扩容的那个节点。
然后,回过头来看前面put的时候,发现这个节点是move,则帮助扩容的方法。
除了if判断控制一下并发,其实就是调用transfer帮助扩容。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 直接调用transfer方法
// nextTab是move节点类fwn的成员遍历,new fwn的时候传过来的
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
再看一下树化方法,前面说了,put之后,链表大于8,则会调用这个方法。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 会发现,链表大于8,还要判断一下,table的lenght是否>64,如果小
//于,则还不会树化,之后扩容,这个机制跟hashmap一样。看tryPresize分析
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 锁住这个节点,树化不分析
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
private final void tryPresize(int size) {
// c是准备扩容的大小,1.5size + 1,然后向上取2的n次方
// 但是前面看transfer方法,其实真正扩容,只会扩容成原来数组的两倍
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// sizeCtl记载了扩容临界值
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这里数组没有初始化则进行初始化。因为putAll方法,还没有初始化,会直接调用这个方法
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
// 这里以下都是if控制并发,最后调用的就是transfer方法。
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
到这里,put相关的方法,分析的差不多了。下面分析get。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// table已经初始化了,并且这个hash的位置的元素不为null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 这个位置的node节点的hash值正好等于get的key的hash值
if ((eh = e.hash) == h) {
// 等于则返回
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 这个节点的的hash值 < 0,则说明,要么eh = move(-1)正在扩容给,要么 eh = -2(treebin节点)是一个树节点
else if (eh < 0)
// 树节点就调用treeBin的find查找,传入hash,和key
// 扩容则调用fwn类的find查找
return (p = e.find(h, key)) != null ? p.val : null;
// e.next 1= null则说明链表下有元素,遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结:get方法全程没有锁。
因为:
1、put方法不扩容的时候,直接锁的node节点,然后直接往链表尾节点添加元素,并不会印象get方法。node节点的next指针是volatile的,保证了可见性,所以一添加,get就会看到。
2、扩容得时候,会new 新Node放到新数组上去,不会改变旧Node得next指针,所以,扩容得时候,也不会影响到get操作。
看一下,fwn的find方法。发现是找的是新nextTable数组
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
// 可以发现,fwn找的是新nextTable数组
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 如果是树节点,则调用树节点的find方法
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
看一下树的find方法,一系列树的操作。不分析了。
final Node<K,V> find(int h, Object k) {
if (k != null) {
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
e = e.next;
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
再看一下size方法
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
// 统计,basecount和各个countercell的value和
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
总结一下:
1、get操作,除了树节点,都是无锁的。树节点有cas。put操作都不会影响到get。
2、put操作,锁的是node节点。扩容操作锁的也是node节点。
3、concurrentHashMap通过move节点使得get操作无锁。
3、扩容采用的是多线程分片机制。其他和hashmap一样。
4、concurrenthashmap是弱一致性,并非强一致性。
concurrenthashmap得弱一致性体现在:
1、iterator得时候:iterator过程中,元素可以发生变动,使得跟你期望得不一样。
2、clear: clear一个数组index之后,这个数组index马上被别的线程put了元素,导致,clear之后,table还有元素。
3、get操作,get得一瞬间,把这个元素删掉了。或者 get返回null得一瞬间,添加了这个元素
4、resize:并非非常准时得扩容,比如,CounterCell哪里fullAddCount了之后。
5、size:统计得也并非实时得size,别得线程put了,但是size还没来得及+1。
总之,就是,因为get这种读操作,没有加锁,导致很多东西都是弱一致性得。
网友评论