JDK8中的HashMap
、LinkedHashMap
、Hashtable
和ConcurrentHashMap
的超精细源码分析。
HashMap`实现(JDK8)
内部存储使用散列表实现,根据Entry的key的hash值对table数组长度求余,来决定此Entry放到哪个table。
每个table的存储结构:在发生冲突(多个Entry落入到同一个table)的时候,table内部也有其存储的方式,
- 此table的Entry数量比较少,使用单向链表实现,table引用链表头(
HashMap.Node
)。 - 此table的Entry数量比较多,使用红黑树(RBT)实现,table引用RBT的根(
HashMap.TreeNode
)。
loadFactor
:在put结束时会检查总Entry个数是否达到table数组长度乘以loadFactor时,若是则将table数组翻倍扩容。
threshold
:虽然HashMap
有loadFactor
字段,但是浮点运算相对于比较和整型运算要慢一些,所以实际上在进行初始化时,会将threshold
设置成table.length * loadFactor
,以后进行判断时只需要判断size >= threshold
即可。在table数组翻倍时,threshold
也会乘以2来保持固定的loadFactor
。
除了HashMap(Map)
构造器之外,所有其他的构造器都不会立即创建table数组,而是在第一次put
时才会创建(JDK8中的很多集合都是采用懒加载的方式,比如ArrayList
)。
HashMap
中table数组大小必须保持为2的指数,此时决定Entry的table位置的求余运算可以用掩码计算完成,因为位运算比乘除求余运算要快,但是在Hashtable
中就没有这个限制所以其只能用%
进行计算。所在在构造器中指定初始大小时会通过tableSizeFor(int)
的方式将参数变成最近的2的指数。
// 返回1 2 4 8 16 ... 中大于等于cap但离得最近的数字
// 2**(p-1)<cap<=2**p,n=cap-1
// 则2**(p-1)<=n<2**p,n的二进制表示0..0 1????(p位)
// 只需要把上面最后p-1位全部变成1,即得到2**p-1
// n 0..0 1???????
// n>>>1 0..0 01??????
// n=n|(n>>>1) 0..0 11??????
// n>>>2 0..0 0011????
// n=n|(n>>>2) 0..0 1111????
// ...
// n 0..0 11111111
// 可以看出通过此种方式可以将低位全部变成1,因为int型只有32位所以完全能够覆盖
static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
hash值:HashMap
使用的hash值不是直接的Object.hashCode()
而是做了一定的处理,即把hashCode的高16位与低16位做运算并写入到低16位,使得hash值的低16位更加的随机。因为除非此HashMap
的table大小非常大,在定位table的求余运算中只有低位才有效。
插入逻辑:对于put
、putAll
、putIfAbsent
和HashMap(Map)
等API,这些AP主要就是去调用putVal
方法。
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
// 初始化table数组
n = (tab = resize()).length;
// 计算得到具体的table引用赋值给p
if ((p = tab[i = (n - 1) & hash]) == null)
// 此table暂时还没有Entry,构造并让table引用一个链表头结点
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
// 链表头结点或RBT根节点就遇到了已存在的同样的key
// 从此处可以看出,判断K存在不存在最终还是根据equals
// 但是在TreeMap中,就只根据Comparable.compareTo或Comparator.compare方法去判断比较
e = p;
else if (p instanceof TreeNode)
// 在RBT中查找已存在的同样的key,如果没找到则插入进去
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
// 在链表中查找已存在的同样的key,如果没找到则插入进去
// 由于RBT的增删查改算法很复杂,但是链表的很简单,所以没没有再封装成方法
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
// 整个链表中没有已存在的同样的key,插入到链表尾部
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
// 插入后,如果链表长度已经够长,准备treeifyBin
// (不一定立马变成RBT,后面有解释)
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
// 在链表中找到了同样的key
break;
p = e;
}
}
// 已经找到了已经存在的同样的key,决定是否替换value
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
// afterXXX方法是LinkedHashMap扩展的,HashMap什么都没做
afterNodeAccess(e);
return oldValue;
}
}
// 自增modCount,所有增加删除Entry的操作都会导致modCount自增
// 用于遍历时,检测HashMap是否有变化,如果有则抛出ConcurrentModificationException
++modCount;
if (++size > threshold)
// 大小已经达到threshold,准备扩充table数组
resize();
afterNodeInsertion(evict);
return null;
}
初始化和扩充逻辑:这两种逻辑都是走的resize()
方法。对于初始化,就是根据参数创建table数组和threshold
值;对于扩充,就是将创建一个长度是原来2倍的table数组,然后将table数组里面的所有Entry需要重新分配到新table数组,最后丢弃原table数组使用新的。
重新分配,比如原table数组长度16,新table数组长度32,对于第n个table的Entry,会被分散到第n个和第16+n个table里面去。
table[] 0...1...15
| | |
.. 1 ..
|
33
|
17
|
49
重新分配,此时可以将原table各个Entry的hash值对着新table重新求余,重新分配。
newTa[] 0...1...17...31
| | | |
.. 1 33 ..
| |
17 49
// 请先阅读HashMap的各个构造器的源码,看一下在新建对象时设置了哪些字段,这样才好理解下面的初始化逻辑
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
// 说明已经初始化过table数组,此时将threshold翻倍即可
// 本源码分析中不考虑达到最大值的特殊情况
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 翻倍
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
// table未初始化,但是threshold已经有值,适用于通过HashMap(int, float)创建的初始化
newCap = oldThr;
else { // zero initial threshold signifies using defaults
// table未初始化,threshold也未设置,按照默认的来,适用于默认构造器的初始化
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
// 在上面else if (oldThr > 0)分支中,
// 已经将需要初始化table数组的大小赋值给了newCap等待初始化,
// 需要threshold正式设置为newCap * loadFactor
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
// 创建新table数组或初始化table数组
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
// table已经初始化过,需要遍历每一个table,重新分配里面的节点
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
// 此table有节点;对于没有节点的table,则无需处理
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
// 只有一个根节点,直接分配到对应的位置
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
// 遍历拆分RBT的各个节点
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
// 重新构造两个链表,遍历原链表的各个节点,拆分到这两个链表中,
// 最后将这两个链表赋值给新table数组的第n和第oldCap+n的位置。
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
// 此table所有Entry的hash % 16都是1
// 所以Entry % 32可能是1也有可能是17
// 如何判断是1还是17,只需要通过33 & 16是不是0就可以判断
// 33 % 32 == 1 33 & 16 != 0
// 17 % 32 == 17 17 & 16 == 0
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
此处可看出扩充很很费资源,所以若知道准确或大概的状况时,可以通过构造器指定初始table数组大小,尽量避免扩容。
treeifyBin
操作 :在putVal
操作完成,发现链表长度比较长影响查找效率,会将此table变成RBT结构,但前提是table数组大小已经达到64,否则只会进行table数组扩充。因为扩充后由于每个table里面的节点分拆,其链表长度很可能会变短,就没有必要变成RBT了。所以HashMap
的table数组扩容,不一定非要等到总Entry个数达到threshold
。
同样在删除节点时,如果发现table里面的节点已经达到6,会重新将RBT变回链表,因为RBT的增删查效率在节点比较少时还不如链表(主要在插入),而且还很费空间。
final void treeifyBin(Node<K,V>[] tab, int hash) {
int n, index; Node<K,V> e;
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
// table大小还没有到MIN_TREEIFY_CAPACITY(64),进行扩容而不是变成RBT
resize();
else if ((e = tab[index = (n - 1) & hash]) != null) {
TreeNode<K,V> hd = null, tl = null;
do {
TreeNode<K,V> p = replacementTreeNode(e, null);
if (tl == null)
hd = p;
else {
p.prev = tl;
tl.next = p;
}
tl = p;
} while ((e = e.next) != null);
if ((tab[index] = hd) != null)
hd.treeify(tab);
}
}
get
和remove
的相关操作不再分析,如果看懂了putVal
之后,那些逻辑都很清晰。
遍历操作:对于keySet()
、values()
、entrySet()
等,关键的操作都是由HashIterator
完成的。
// HashMap里面有成员属性entrySet(keySet|values),这些属性是懒加载的,只有调用对应方法时才会创建
transient Set<Map.Entry<K,V>> entrySet;
public Set<Map.Entry<K,V>> entrySet() {
Set<Map.Entry<K,V>> es;
return (es = entrySet) == null ? (entrySet = new EntrySet()) : es;
}
// 对于EntrySet|KeySet|Values,其iterator()方法返回一个EntryIterator|KeyIterator|ValueIterator
// 这些迭代器的核心逻辑都来自于HashIterator
final class EntryIterator extends HashIterator
implements Iterator<Map.Entry<K,V>> {
public final Map.Entry<K,V> next() { return nextNode(); }
}
abstract class HashIterator {
// 存放下一个待返回的值,hasNext()会判断next存在不存在
// 执行nextNode()时返回next,返回前去寻找next的下一个节点并赋值给next
Node<K,V> next; // next entry to return
// 用于记录刚遍历过的值,用于remove操作(Iterator接口有这个方法)
Node<K,V> current; // current entry
// 初始化时记录当前modCount,在putVal中有解释
int expectedModCount; // for fast-fail
// next所处的table位置下标,
// 当next.next==null时说明此table已经遍历完成,index自增,去下一个table里面找
int index; // current slot
HashIterator() {
expectedModCount = modCount;
Node<K,V>[] t = table;
current = next = null;
index = 0;
// 遍历table,找到第一个节点,标记next
/* 对于RBT结构,使用next遍历仍然是OK的,在插入到树中时同时会维护一个链式结构
即每个TreeNode(Node子类)同时有两种角色,链表的节点和RBT的节点
在查找时使用RBT,在遍历时使用链表 */
if (t != null && size > 0) { // advance to first entry
do {} while (index < t.length && (next = t[index++]) == null);
}
}
public final boolean hasNext() {
return next != null;
}
final Node<K,V> nextNode() {
Node<K,V>[] t;
Node<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
// 使用Iterator时,在调用next之前要先调用hasNext去判断还有没有数据,
// 否则在遍历到最后时会抛出NoSuchElementException
throw new NoSuchElementException();
// 将next赋给current并返回,返回前需要找到下个节点赋值给next
if ((next = (current = e).next) == null && (t = table) != null) {
do {} while (index < t.length && (next = t[index++]) == null);
}
return e;
}
public final void remove() {
Node<K,V> p = current;
if (p == null)
throw new IllegalStateException();
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
current = null;
K key = p.key;
removeNode(hash(key), key, null, false, false);
// 此时如果其他线程做了修改操作,就检测不到了,也不会抛出ConcurrentModificationException
// 这时可能会出现问题,所以在使用HashMap时,控制好线程,避免同时遍历和增删操作
// 抛出ConcurrentModificationException还算好,这种情况最可怕
expectedModCount = modCount;
}
}
更好的遍历Map的方式,forEach(BiConsumer)
(与HashIterator
的核心逻辑一样,不再分析)
// 传统方式,此时需要创建EntryIterator对象
// for (X x : c)的语法其实等同于:
// Iterator<Map.Entry<K, V>> itr = map.entrySet().iterator();
// while (itr.hasNext()) {
// Map.Entry<K, V> entry = itr.next();
// doSomething(entry.getKey(), entry.getValue());
// }
for(Map.Entry<K,V> entry : map.entrySet()) {
doSomething(entry.getKey(), entry.getValue());
}
// 使用forEach,可以使代码更加简练,而且不需要去创建EntryIterator对象
map.forEach((k, v) -> doSomething(k, v)); // 执行单个方法
map.forEach(ClassName::doSomething); // doSomething是静态方法,正好从有两个参数,类型也能匹配上
map.forEach((k, v) -> { // 执行语句块
doSomething1(k, v);
doOtherThing();
});
map.forEach(new BiConsumer<String, Object>() { // 对于不熟悉JDK8语法的,通过匿名内部类更直观一些
@Override
public void accept(String k, Object v) {
doSomething(k, v);
}
});
如果需要根据Value去判断是否更新Entry的高效简捷方式:compute
(代码与putVal
类似):
// 原本的方式,此时需要两次去定位table和定位Node,涉及到链表或RBT的搜索
V v = map.get(k);
if (judge(v)) {
map.put(k, doSomething(k, v));
}
// 使用compute的方式一步到位,
map.compute(key, (k, v) -> doSomething(k, v));
//在找到了已存在的同样的Key时,调用doSomething,将返回值替换原Value
map.computeIfPresent(key, (k, v) -> doSomething(k, v));
//在没有找到已存在的同样的Key时,调用doSomething,将<Key-返回值>插入
map.computeIfAbsent(key, (k) -> doSomething(k));
V doSomething(K k, V v) {
return xxx;
}
没有分析的部分:由于红黑树相关算法过于复杂难懂,就不再分析;对于克隆、序列化等操作由于不常用也不再分析;对于JDK8的Stream
API,HashMap
提供了供给Stream
使用的Spliterator
实现,但是本人对于这套API也不是很熟悉,暂时不做分析。但是不可否认的是,Stream
的API充分利用了多线程的技术,在处理大集合对象时会有一定好处(JDK5的Iterator
API只是提供next
和hasNext
,如果需要并行处理,需要自行去写多线程代码,或者应用Fork/Join框架,而Stream
自己会去做多线程的处理),而且Stream
提供了丰富的API去做过滤、收集、记数、Map/Reduce等操作,使得应用代码更加简洁,使用这些API以及学习内部实现逻辑还是有好处的。
LinkedHashMap
实现(JDK8)
是HashMap
的子类,增删改查基本上还是用的HashMap
的存储结构和实现逻辑。除此之外还额外维护了一个双向链表(每次插入都会将节点连接到链表尾部),在遍历时会按照插入顺序进行遍历,而不管其落入到哪个table。
// LinkedHashMap持有双向链表
transient LinkedHashMap.Entry<K,V> head;
transient LinkedHashMap.Entry<K,V> tail;
static class Entry<K,V> extends HashMap.Node<K,V> {
// after和next分别是两个链表的后继,是不一样的
// 即每个Entry同时属于两个链表,一个是table持有的,一个是LinkedHashMap持有的
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}
在HashMap
的增删改查方法中,有调用一些afterXXX
方法,这些方法在HashMap
中是空实现,其实就是给LinkedHashMap
用的。
put
操作:hashMap
操作中,需要将键值对通过newNode
方法包装成Node
存入table,而LinkedHashMap
重载了此方法,创建的是LinkedHashMap.Entry
,newNode
方法主要的一点就是将Node
链到LinkedHashMap
双向链表尾部。
Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
LinkedHashMap.Entry<K,V> p =
new LinkedHashMap.Entry<K,V>(hash, key, value, e);
linkNodeLast(p);
return p;
}
private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
LinkedHashMap.Entry<K,V> last = tail;
tail = p;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
}
对于afterNodeInsertion
方法,只有在一些非常不常用的LinkedHashMap
子类中才有意义,故不再分析。
在LinkedHashMap(int,float,boolean)
构造器中,可以指定accessOrder
,即如果通过get
等方式获取到了某个Entry,此Entry会被挪到链表尾部,导致在遍历时会被最后遍历到。
void afterNodeAccess(Node<K,V> e) { // move node to last
LinkedHashMap.Entry<K,V> last;
// 如果accessOrder为true,且不在尾节点,需要将其挪到尾节点,还要将他的前驱和后继连起来
if (accessOrder && (last = tail) != e) {
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null)
head = a;
else
b.after = a;
if (a != null)
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}
afterNodeRemoval
方法也是类似,只不过更简单一些,可以自行分析。
各种遍历方法不再用HashIterator
而是用的LinkedHashIterator
,原理类似,只不过遍历的是LinkedHashMap
的链表。
对于forEach
和containsValue
等操作也覆盖了,可以直接通过自己的链表查找遍历,逻辑都不复杂,不再分析。
Hashtable
原理与HashMap
基本一样,线程安全的方式是通过将所有入口方法通过synchronized
锁定,锁的是整个对象,相比ConcurrentHashMap
效率较低。而且没有RBT实现,在同一个table里的节点比较多时,链表长度很长(查找速度可能会变慢)。
ConcurrentHashMap实现(JDK8)
虽然不是HashMap
的子类,但是基本的操作和HashMap
是一致的,只不过加入了保证线程安全的机制。所以如果想要理解此类,在先完全理解HashMap
的基础上,会降低一些理解此类的难度。本分析只去关注跟HashMap
不一致的部分。
里面用到了大量的CAS(compareAndSet
)操作。如果某个操作CAS失败的话,说明多线程竞争失败,需要做相应的处理。所以本类中到处都是while(true)
循环,而通过某些分支的break
跳出。
同HashMap
一样,也有一个散列表数组Node[] table
,在增删查改操作中,不会直接锁table数组,而是去锁具体table里面的Node
引用,类似synchronized(table[x])
,如果table[x]
是null
时,也没法锁了,此时通过CAS设置第一个Entry。
Node
类型:
- 链表节点
Node
或RBT的非根节点TreeNode
:hash值(必须是非负数,为了与特殊节点区分开) - 转向节点
ForwardingNode
:hash=-1,在扩容时使用,当某个table里面的Entry都已经被分散到新table时,将table引用此节点。 - RBT引用节点
TreeBin
:hash=-2,HashMap
在RBT下直接引用RBT根节点,而此类使用一个特殊节点引用RBT根节点。 - 临时节点
ReservationNode
:hash=-3,在compute
时有用到,后面分析compute
时会解释,其他逻辑如果碰到了这个,则会循环等待,直到变成其他类型的节点。
sizeCtl
:值为正数时,就是threshold
;值为-1时,代表正在初始化table数组;为其他负数时,代表正在扩容table数组。
nextTable
:在HashMap
中扩容时,是遍历table数组然后一个一个去重新分配里面的节点;在此类中,允许多个线程同时执行分配(在一个线程去增删改查时发现此时其他线程正在扩容table,其自己也会进去协助)。所以有了nextTable
成员属性(而HashMap
中是方法变量,线程内可见),此时各个线程就可以同时将table
里面的节点分配到nextTable
里面去,最后收尾的线程正式将nextTable
赋值给table
并将自己指向null,扩容完成。
transferIndex
:多个线程是如何同时扩容table的?比如从16扩到32,将transferIndex
设置成16,此时某个线程先“锁定”最后4个,将transferIndex
设置成12,就负责table[12]到table[15]的处理。其他的线程看到transferIndex
是12(或者当前线程处理完了12-15,准备再次领取任务),就知道12和以上的table已经在处理或处理完了,自己也通过同样的方式再“锁定”一批、直到transferIndex
为0。
同HashMap
一样,首先分析putVal
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 和HashMap的hash()方法一样,只不过需要将最高位设置为0(变成非负数)
int hash = spread(key.hashCode());
int binCount = 0;
// 注意:此类源码到处都是死循环,需要观察break出口
// 里面的分支的状态机非常复杂
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// tabAt、setTabAt和casTabAt这一套,可以使数组的某个值有volatile的语义
// volatile只能让数组对象有语义,但是里面的值却没法通过volatile指定
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 通过CAS方式设置链表头结点,如果失败重新进入循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 说明此时正在扩容,此线程去协助扩容,完成后,重新进循环
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 锁定链表头结点、RBT引用节点或临时节点
synchronized (f) {
// 栗子:synchronized(arr[i])是锁定的arr[i]的引用,其他线程可以执行arr[i] = xxx;
// 栗子:final int[] arr = {1, 2, 3},虽然arr是final的,但是里面的内容可以随便改
// 此时此table可能已经被换成了TreeBin、ForwardingNode等各种情况
// 同样,决定好所有需要换的地方。都加上synchronized锁定,
// 那么下面的if测试通过,就是彻底安全的了
if (tabAt(tab, i) == f) {
// 里面的操作就不需要去CAS了,同样约定,table内部所有操作必须synchronized锁定
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// else 其实还有一种可能,就是ReservationNode临时节点。只不过什么都不做
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
initTable
,初始化table数组。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl小于0说明正在初始化或者在扩容,稍微等待一下
// TODO 其实此类中有很多地方都是CAS竞争失败,但是使用yield的只有这里
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 通过CAS试图将sizeCtl设置为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// threshold固定为0.75,loadFactor在此类中已经基本上没有用了
sc = n - (n >>> 2);
}
} finally {
// 上面代码应该只会出现并抛出OOM,将sizeCtl重置
sizeCtl = sc;
}
break;
}
}
return tab;
}
transfer
:扩容,对于主导线程和协助线程,最终都会进入此方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 指定每次需要处理多少个table,最少是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 实际上在调用此方法时,只有第一个线程的nextTab参数是null
// 其余协助线程传过来的nextTab,就是由第一个线程创建的this.nextTable
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 高能:此循环的分支状态机非常复杂
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 第一次进来时只可能会走下面两个分支,即首先去看transferIndex是不是大于0
// 如果已经到0说明所有的任务已经处理或者正在处理,本次不需要再请求任务,将i设置为-1
// 如果大于0则通过CAS试图锁定一批table
// 锁定失败,重进循环再次去判断,必要时锁定
// 锁定成功,i为本批需要处理的最后一个table数组下标,bound为最后一个下标
// 去下面的分支处里。完成后,再进入此循环,i自减继续处理,直到达到bound。
// 处理完bound之后仍然试图去再次请求任务
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
// 判断finishing是为了最后一个离开的线程从尾到头做一次检查用的,后面有解释
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 本线程已经不用再去申请任务,准备退出
// i < 0 上面第一个分支,自减减到了-1,说明0已经处理完成,此时transferIndex肯定是0
// 或者第二个分支将i设置成了-1
// TODO 剩余两个判断,我也不知道是什么时候出现
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 最后一个离开扩容的线程做收尾工作;其余线程不会进入此方法
if (finishing) {
// nextTable正式赋给table,自己引用null
nextTable = null;
table = nextTab;
// sizeCtl变成原大小的7/4倍,即新table大小的3/4倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 每一个线程在进入扩容或协助扩容时,会将sizeCtl加1,在准备退出时,需要减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前线程不是最后一个离开的线程,如果不是则直接退出
// 此表达式后面有解释
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 将finishing和advance设置为true,i设置为table.length,重进循环
// 下一次while (advance)时,由于finishing为true,导致一直进入第一个分支
// 所以最后一个线程会从尾到头再次检查一下是不是所有table都已经处理完成了
// TODO:为什么需要再检查一遍,难道有什么情形会导致中间有遗漏?
finishing = advance = true;
i = n; // recheck before commit
}
}
// 下面的分支都是去做table的处理
else if ((f = tabAt(tab, i)) == null)
// 可能与putVal或其他线程竞争失败,去看putVal的第一个else if
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 用于最后一个离开的线程执行检查,确认全部table都已经处理完成
advance = true; // already processed
else {
// 锁定table并再次判断,putVal已经解释了
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// 相比较HashMap,此处的算法更加流弊
// a->b->c->d->e->f->g,假如发现从d到g都应该落入到同一个table时
// 则d到g的部分就不需要重新连接,而是作为一个整体,直接b和d相连即可
// 而HashMap实现中需要重新连每一个节点
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
// 将table设置为ForwardingNode
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 由于此类的RBT也是个链表结构,而且是双向链表,所以跟上面的操作类似
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
在tryPreeze
、addCount
和helpTransfer
等方法中,都有线程进入transfer
的逻辑,而且基础代码几乎都是一样的。看懂一个,即可了解ConcurrentHashMap
是如何让线程进入扩容方法的。比如分析tryPreeze
。
// 此方法仍然包含了initTable相关的方法,因为new ConcurrentHashMap(Map)的存在
// 实际上你会发现ConcurrentHashMap里面有很多重复的代码
// 本次分析关注主要在扩容的地方
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 检查一下table中途没有改变
// 因为有可能在while判断sizeCtl大于0,但是到这里,中间其他线程可能已经完成了全部的扩容操作
else if (tab == table) {
int rs = resizeStamp(n);
// 此时sizeCtl已经小于0,说明某个其他线程已经作为第一个线程进入了扩容
if (sc < 0) {
Node<K,V>[] nt;
// (sc >>> RESIZE_STAMP_SHIFT) != rs 下面解释
/* TODO sc == rs + xxx 这两个分支我也没有看懂,但是知道是为了控制最大扩容线程数量
而且,如果写成sc == (rs << RESIZE_STAMP_BIT) + MAX_RESIZERS就很好理解了
回头想办法去做个DEBUG看一下吧
可以把ConcurrentHashMap的代码拷贝出来,自己建一个类,贴进来
由于需要将非常多的线程停在不同的地方,使用IDE的debug非常难
可以使用wait/notify、lock.Condition或其他JUC包的工具去控制 */
// nextTable == null 说明扩容已经被其他线程完成了,参考transfer的收尾
// transferIndex <= 0 说明任务都已经被处理或正在处理,本线程不用再进去了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 试图作为第一个线程进入扩容,此时transfer的nextTab参数为null
// 此处的加2与transfer的判断当前线程是否是最后一个线程相呼应
// if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
sizeCtl在扩容时如何选取:在扩容时是一个负数的戳。要理解此戳,需要理解下面的这些字段。
RESIZE_STAMP_BITS
,此字段用于生成戳,固定是16(这个字段不是final的,虽然没有任何代码修改此值)
RESIZE_STAMP_SHIFT
:固定是32 - RESIZE_STAMP_BITS
,用于生成sizeCtl
// 假设当前table.length即n是16(即10000,前面有27个0),假设RESIZE_STAMP_BITS是20
// table长度最小等于1时头部0的个数是31,table等于最大长度时头部0个数是1,所以有5位即可以标志当前table大小
// 27, 00000000 00000000 00000000 00011011
// 1<<<19 00000000 00001000 00000000 00000000
// OR 00000000 00001000 00000000 00011011
// 此处即RS的值,有两个部分,最低5位表明当前table大小
// 中间倒数第RESIZE_STAMP_BITS
// 所以RESIZE_STAMP_BITS的javadoc里提到最低需要6
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
// 在第一个线程扩容时会将sizeCtl设置为(rs << RESIZE_STAMP_SHIFT) + 2
// RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS,本例中是12
// RS左移12位正好是最高位为1,变成负数
// 10000000 00000001 10110000 00000000
// TODO 最后加上2,我也弄不清楚是为什么
// 10000000 00000001 10110000 00000010
// 最终sizeCtl为负数,中间有5位作为table.length标志
// 剩余RESIZE_STAMP_SHIFT位作为线程进入的标志,初始为2,每个线程进入则加1
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2);
// 记得上面协助扩容线程有这么一个判断(sc >>> RESIZE_STAMP_SHIFT) != rs
// 即为了确认扩容的table就是当前的table,因为可能出现或取table长度是16,在准备进入时,其实16-32的扩充早已完成,现在正在进行32-64的扩容,可以根据中间5位确定。
// 最大并发数,基本原理就是最低RESIZE_STAMP_BIT位从0000 00000010涨到1111 11111111
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
get
方法:和HashMap
类似
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// ForwardingNode、TreeBin等都对find()做了重写
// ForwardingNode会去nextTab里面找
// TreeBin会从红黑树里面找
// ReservationNode返回null
return (p = e.find(h, key)) != null ? p.val : null;
// 从链表里面寻找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
remove
方法,跟put
很类似,不再分析。
compute
方法,computeIfPresent
和computeIfAbsent
都是类似的。
public V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
if (key == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
// 在table为null时,需要先创建一个临时节点ReservationNode并让table引用
// 然后执行apply(key, null)决定新的value
// 使用并锁住临时节点的目的是为了,当前table为null,
// 同时有多个线程对同样的key做compute时能保证先后顺序
// 即第二个compute一定要拿着第一个完成的结果去做方法入参,避免出现脏读
Node<K,V> r = new ReservationNode<K,V>();
// 先锁住节点,再通过cas将临时节点放入table
// 其他线程发现后会放弃,然后再次循环,直到已经不是临时节点
// 例如:看putVal或transfer或本方法,最后都有一个else if没有闭合,其实就是这种情况
// 那一个隐含的分支什么都不做所以被省略了
synchronized (r) {
if (casTabAt(tab, i, null, r)) {
binCount = 1;
Node<K,V> node = null;
try {
if ((val = remappingFunction.apply(key, null)) != null) {
delta = 1;
node = new Node<K,V>(h, key, val, null);
}
} finally { // apply可能会抛出RuntimeException
// 此时将正式节点或null设置进去
// 此时不需要CAS了,因为没有了竞争
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 如果链表里面找到了已经存在的节点,根据apply的结果去决定替换还是删除节点
// 如果没有找到,根据apply的结果去决定增加节点还是什么都不做
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = remappingFunction.apply(key, e.val);
if (val != null)
e.val = val;
else {
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null) {
val = remappingFunction.apply(key, null);
if (val != null) {
delta = 1;
pred.next =
new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 1;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null)
p = r.findTreeNode(h, key, null);
else
p = null;
V pv = (p == null) ? null : p.val;
val = remappingFunction.apply(key, pv);
if (val != null) {
if (p != null)
p.val = val;
else {
delta = 1;
t.putTreeVal(h, key, val);
}
}
else if (p != null) {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
// 这里也有个else,就是上面提到的情况
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
break;
}
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
遍历方法:在HashMap
中,所有遍历方法的核心是HashIterator
;在ConcurrentHashMap
中使用Traverser
用来做遍历。
// 此类的逻辑比transfer扩容更加难理解,虽然是单线程的,这里先解释下大概的原理
// 创建Traverser时指定当前table数组并遍历,但是遍历过程中可能会出现扩充(有可能进行了多次扩充)
// 距离:创建迭代器时,table长度为2,但是遍历过程中已经完成了2到4的扩充,正在进行4到8的扩充
// t0 [FWD, FWD] 初始时指定的table
// t1 [NOD, FWD, FWD, FWD] this.table目前在这
// t2 [NUL, NOD, NOD, NOD, NUL, NOD, NUL, NUL] this.nextTab目前在这
// (FWD是FordwardingNode,NOD是其他Node,NUL是null)
// 这时可以以t0为基准,假如发现t0[0]是FWD,则需要去遍历t1[0]和t1[2];
// 如果再遍历t1[2]时发现又是,则需要遍历t2[2]和t2[6],t2[6]完成后需要转到t0[1]
// 所以在遍历t2[6]时,需要将t0[0]的t1[2]的进度信息保存下来,所以栈结构就非常适合
// 例如:在遍历t1[2]时,需要将table=t0,index=0入栈
static class Traverser<K,V> {
// 当前正在遍历的table
Node<K,V>[] tab; // current table; updated if resized
// 下一个待返回值
Node<K,V> next; // the next entry to use
// stack则是上面提到的栈,用链表实现,所以TableStack有个next
// sqare是另外一个栈,主要是用来对象重用
// 在遍历过程中可能需要频繁的pop和push,为了避免频繁的创建TS对象,
// 就把stack栈pop出来的TS对象push到sqare栈中,stack栈需要push时,先试图从sqare栈pop出来一个
TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
// 当前遍历的index,例如:正在遍历t2[2],index=2,下一次就需要遍历t2[6],index=6
// 如何知道要加4?直接去栈顶TS的length即可,因为栈顶TS的table是t1,t2就是直接从t1扩展过来的
int index; // index of bin to use next
// 固定是t0目前遍历到的下标,在stack为空时,index和baseIndex就是一样的
int baseIndex; // current index of initial table
// Traverser支持遍历部分table,例如:只想遍历t0[0],不管t0[1]
int baseLimit; // index bound for initial table
// TODO 初始table的length,但是不明白这个字段到底有什么用,而且还是final的
final int baseSize; // initial table size
Traverser(Node<K,V>[] tab, int size, int index, int limit) {
this.tab = tab;
this.baseSize = size;
this.baseIndex = this.index = index;
this.baseLimit = limit;
this.next = null;
}
/**
* Advances if possible, returning next valid node, or null if none.
*/
final Node<K,V> advance() {
Node<K,V> e;
// 如果next不是null,说明当前table还没遍历完成,在下面for的第一个if里面直接返回
if ((e = next) != null)
e = e.next;
for (;;) {
Node<K,V>[] t; int i, n; // must use locals in checks
if (e != null)
return next = e;
// baseIndex >= baseLimit 如果t0已经遍历完成了,就可以结束了,next设置成null
// tab == null 在Map还没有遇到第一次put进行初始化时,table为null
// tab.length <= index TODO 不清楚
// i < 0 TODO 不清楚
if (baseIndex >= baseLimit || (t = tab) == null ||
(n = t.length) <= (i = index) || i < 0)
return next = null;
if ((e = tabAt(t, i)) != null && e.hash < 0) {
// 在遍历过程中正在扩充
if (e instanceof ForwardingNode) {
// 将tab设置成下一个table,index不需要变
tab = ((ForwardingNode<K,V>)e).nextTable;
// 不能返回FWD,让下一个循环继续遍历
e = null;
// 入栈
pushState(t, i, n);
continue;
}
else if (e instanceof TreeBin)
e = ((TreeBin<K,V>)e).first;
else
// e.hash < 0 才会进来,只可能是TraverserNode了,继续循环
e = null;
}
if (stack != null)
// 说明非t0的某个table已经遍历完成,
// 判断并更新index,例如:遍历完t2[2]时,index从2变成6
// 或者出栈,例如完t2[6]时,需要回到遍历t0
recoverState(n);
else if ((index = i + baseSize) >= n) // TODO 暂且认为一直返回true
// stack为null,这时需要index和baseIndex都加1
index = ++baseIndex; // visit upper slots if present
}
}
/**
* Saves traversal state upon encountering a forwarding node.
*/
private void pushState(Node<K,V>[] t, int i, int n) {
TableStack<K,V> s = spare; // reuse if possible
// 从sqare中pop出来一个TS,如果没有则new一个
if (s != null)
spare = s.next;
else
s = new TableStack<K,V>();
// 设置当前状态
s.tab = t;
s.length = n; // 我不明白为什么还需要单独一个length,直接tab.length不就ok了?
s.index = i;
// push到stack
s.next = stack;
stack = s;
}
/**
* Possibly pops traversal state.
*
* @param n length of current table
*/
private void recoverState(int n) {
TableStack<K,V> s; int len;
// 在类的开始解释了,index根据stack栈顶length做加法,
// 如果没越界,则退出当前方法(index已经更新),继续遍历当前table
// 如果发现越界,则pop,继续判断
while ((s = stack) != null && (index += (len = s.length)) >= n) {
// len取s.length,n又取len,只能说这代码真的很难理解(当然对于某些聪明人除外)
n = len;
// 将栈顶信息恢复到Traverser
index = s.index;
tab = s.tab;
s.tab = null;
// 将stack出栈,并入栈到sqare
TableStack<K,V> next = s.next;
s.next = spare; // save for reuse
stack = next;
spare = s;
}
// 如果stack已经是null了,需要将index自增,跟advance主方法最后的逻辑一致
if (s == null && (index += baseSize) >= n)
index = ++baseIndex;
}
}
static final class TableStack<K,V> {
int length;
int index;
Node<K,V>[] tab;
TableStack<K,V> next;
}
计数:ConcurrentHashMap
使用了与LongAdder
一样的代码,LongAdder
比AtomicLong
拥有更高的并发性能,但是在多线程下会占用更高的空间。请自行分析java.util.concurrent.atomic.LongAdder
的实现。
ypji
网友评论