简介
众所周知,在 HashMap 作为共享变量的场景下,是线程不安全的。在 Java8 下,我们可以使用Collections.synchronizedMap(Map<K,V> m)
将 HashMap 对象包装为线程安全,其实就是在 HashMap 每个方法都加上 synchronized 加锁保证线程安全。但是也有一些问题,读写操作都会加锁,在读并发量大的情况下性能并不高,所以这篇文章介绍另外一种保证 Map 在多线程环境下线程安全的并发类 — — ConcurrentHashMap
以下内容只显示部分常用方法的源码和部分代码逻辑,因为没有必要每一行每一个方法都要细读,用到的时候再看,否则一开始就精读每个方法的源码,会给你学习 ConcurrentHashMap 带来不少阻碍。By the way,理解 ConcurrentHashMap 的源码,建议首先看 HashMap 的源码,因为很多代码逻辑都是相似,只不过就是在 HashMap 基础上添加支持并发的逻辑,如果熟悉 HashMap 的源码之后,阅读 ConcurrentHashMap 的源码难度就不大啦 ~ 若不熟悉 HashMap 源码的朋友,可以查看我的拙作 图解 Java8 HashMap 常用方法源码(附常见面试题和答案),或者参考其它优秀的博客。
ConcurrentHashMap 和 HashMap 两者的异同:
相似之处:
- 底层数据结构相同,都是
数组 + 链表 + 红黑树
结构 - 都实现了 Map 接口,继承了 AbstractMap 抽象类,所以两者大部分使用都是相同的,只不过就是 ConcurrentHashMap 保证了线程安全
不同之处:
- HashMap 键值对允许 null 值,而 ConcurrentHashMap 键值对都不允许 null 值
- 红黑树的结构不一样。HashMap 红黑树节点为 TreeNode,而 ConcurrentHashMap 红黑树节点为 TreeNode + TreeBin
- ConcurrentHashMap 扩容是通过 ForwardingNode 节点保证扩容时线程安全。
首先,先从 ConcurrentHashMap 的 Demo 入手吧!
示例:
public static void main(String[] args) {
ConcurrentHashMap<Integer,Integer> concurrentHashMap = new ConcurrentHashMap(16);
for (int i = 0; i < 10; i++) {
concurrentHashMap.put(i,i);
}
System.out.println(concurrentHashMap.get(1));
System.out.println(concurrentHashMap.remove(2));
System.out.println(concurrentHashMap.size());
System.out.println(concurrentHashMap.mappingCount());
}
输出结果:
1
2
9
9
ConcurrentHashMap 源码剖析
数据结构
ConcurrentHashMap 基本数据结构为数组,当有碰撞的时候会在同一个 hash 位置上形成一个链表,当链表长度大于 8 且数组的容量大于 64 则将链表转化为红黑树。
链表节点、树节点和转移节点
首先了解 ConcurrentHashMap 几个节点的声明和作用
// 链表节点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// 下面还有一坨,先省略
}
// 红黑树节点,维护节点属性和查找功能
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
// 下面还有一坨,先省略
}
// 维护红黑树结构,负责根节点的加锁和解锁
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
static final int WRITER = 1; // 已经获得写锁状态
static final int WAITER = 2; // 等待写锁状态
static final int READER = 4; // 增加数据时读锁的状态
// 下面还有一坨,先省略
}
// 转移节点,扩容时保证线程安全
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
// 下面还有一坨,先省略
}
构造函数
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 在JDK1.8之前本质是 ConcurrentHashMap 分段锁总数,表示同时更新 ConcurrentHashMap 且不产生锁竞争的最大线程数;
* 在JDK1.8中,仅在构造器中确保初始容量>=concurrentLevel,为兼容旧版本而保留;
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
我们可以根据构造方法自定义 ConcurrentHashMap 数组的容量、阀值、加载因子。建议在开发过程中不要更改加载因子,最好能够根据业务情况给 ConcurrentHashMap 数组适当的容量,避免频繁扩容,影响性能。
初始化 ConcurrentHashMap 数组:initTable()
通过自旋 + CAS + 双重 check
保证了数组初始化时的线程安全
// 初始化 table
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 自旋保证初始化成功
while ((tab = table) == null || tab.length == 0) {
// 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
if ((sc = sizeCtl) < 0)
Thread.yield();
// CAS 赋值保证当前只有一个线程在初始化,-1 代表当前只有一个线程能初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再次判断 table 是否被初始化,因为其他线程可能已经抢先初始化
if ((tab = table) == null || tab.length == 0) {
// 初始化数组大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
添加数组节点 put(K key, V value)
// 计算key的hash值,与HashMap相似,hashCode值的高16位和低16位进行异或运算,这样可以尽量避免一些hash值出现冲突
// 同时和 HASH_BITS 与运算是为了保证结果是正数(因为hashCode的结果有可能是负数)
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
// key 和 value 都不可以为null
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 或 value 为null则抛出异常
if (key == null || value == null) throw new NullPointerException();
// 取得key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组未被初始化则初始化数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 当前数组索引位置节点为null,则可以直接插入新增节点
// 无锁的CAS操作插入节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED) // 当前Map在扩容,先协助扩容,再更新值。
tab = helpTransfer(tab, f); // 协助扩容
else { // 有 hash 冲撞
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { // 再次判断 f 节点是否为第一个节点,防止其他线程已修改f节点(可能扩容后索引位置的节点不是 f 了)
if (fh >= 0) { // 节点结构为链表
binCount = 1;
// 循环链表,将节点放入链表中,与 HashMap 的逻辑相似
// 如果节点的hash和key与参数的hash和key一样则进行覆盖操作,否则就在链表尾部插入新节点
// 和 HashMap.put 源码不一样的是,还有一个binCount变量累加,这是判断链表的节点数是否超出8,超出的话则转为红黑树
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 节点的结构为树
Node<K,V> p;
binCount = 2;
// 以红黑树的算法插入节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 链表长度超出阀值转为红黑树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果是覆盖操作,则返回被覆盖的值
if (oldVal != null)
return oldVal;
break;
}
}
}
// 检查当前容量是否需要进行扩容
addCount(1L, binCount);
return null;
}
put 方法源码比较长,以下结合源码画出一张流程图协助理解。
简单来说,执行添加节点方法包括几个核心操作:
1.检查数组是否初始化
2.根据索引位置的节点判断是否为空节点,如果是空节点则直接添加节点,否则则判断该节点是链表节点还是红黑树的节点;在迭代链表节点或红黑树的节点过程中,如果找到 key 相同的节点则执行覆盖操作,否则添加到链表尾部或红黑树子节点
3.如果当前线程执行添加操作过程中,遇到其它线程正在扩容则会停下来协助扩容,扩容结束后再继续执行插入操作
4.如果链表长度超过 8 且数组节点数量超过 64 则会转化为红黑树结构
5.新增完成后,检查是否需要扩容,如果需要扩容的话执行扩容方法
获取节点 get(Object key)
// 根据参数 key 获取节点
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 判断数组是否被初始化、数组长度是否大于0,并且根据计算出来的 hashCode 寻址找到不为 null 的节点
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
// 判断节点的hash与根据参数计算出来的hash是否一样,因为节点的hash值在扩容的时候,hash值会变成小于0
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) // 如果当前hash值<0,说明正在扩容,调用ForwardingNode的find方法来定位到nextTable
return (p = e.find(h, key)) != null ? p.val : null;
// 循环链表或者红黑树
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
相比于 put 方法,get 方法就简洁多了。核心逻辑就是判断数组中是否存在 key 、数组是否正在扩容、节点是否为链表或红黑树结构。
获取 ConcurrentHashMap 的元素个数 —— size() 和 mappingCount()
注意:由于可能多个线程操作,所以返回的元素个数只是估计值,不一定准确!
// 实际上调用 sumCount() 获取元素个数
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
// 如果 counterCells 对象不是 null,则遍历 counterCells 数组,baseCount 累加对象的值
// 大致可以这样理解,baseCount 是统计当前的节点总数,但是当多个线程同时执行 CAS 修改 baseCount值,
// 失败的线程会将值放到 CounterCell 中,所以为了尽量返回接近实际值,就将 baseCount 和 CounterCell 的所有元素加上
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
* 大意就是建议使用 mappingCount() 代替 size() 方法;
* 因为 ConcurrentHashMap 包含的映射数量可能超出 int 类型的最大范围(size方法的返回值是int,mappingCount方法的返回值是long)
* 由于存在并发,所以返回的值不一定准确
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
不难发现,无论是使用 size() 还是 mappingCount() 方法其核心都是调用 sumCount() 方法。通过计算 bashCount 和 CounterCell 的元素总和返回 ConcurrentHashMap 的节点个数(不一定准确)。
删除节点 remove(Object key) 和 clear()
// 删除数组节点
public V remove(Object key) {
return replaceNode(key, null, null);
}
// 删除节点的核心逻辑
final V replaceNode(Object key, V value, Object cv) {
// 计算key的hashCode
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组还没初始化或者数组不存在key直接跳出循环返回null
if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED) // 如果数组正在扩容,会先协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
// 节点加锁,保证节点的删除是原子性。过程与 HashMap 的逻辑差不多。
// 设置 f 的前驱节点 pred 的后继节点为 f 的后继节点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) { // 节点结构为树
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
// 节点数量减1
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
// 清除所有的数组节点
public void clear() {
long delta = 0L;
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
if (f == null)
++i;
else if ((fh = f.hash) == MOVED) {// 先协助扩容,再删除所有元素。这是为了避免删除元素之后,又有新的节点添加进来
tab = helpTransfer(tab, f);
i = 0;// 重新遍历数组
}
else {
// 节点加锁,保证节点的删除具有原子性,无论是链表还是红黑树,都是将数组对应的索引值设置为null
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null);
while (p != null) {
--delta;
p = p.next;
}
setTabAt(tab, i++, null);
}
}
}
}
// 更新数组节点数量
if (delta != 0L)
addCount(delta, -1);
}
remove方法和clear方法逻辑并不难,看源码注释结合流程图应该就能够理解。
好了,接下来就是ConcurrentHashMap的核心 —— 扩容啦!
粤语,大意是事情的进展非常顺利ConcurrentHashMap 的核心 —— 扩容(addCount、helpTransfer、tryPresize、transfer)
// 计算数组中的节点个数是否大于阈值,如果超出阀值则协助扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// CAS 更新 baseCount
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 多线程 CAS 发生失败时执行
if (as == null
|| (m = as.length - 1) < 0
|| (a = as[ThreadLocalRandom.getProbe() & m]) == null
|| !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 循环更新 basecount
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 检查是否需要扩容,默认 check 是 1
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 数组的节点数量大于等于阈值 sizeCtl,并且数组不为空,数组长度小于最大容量则扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
// 根据数组长度得到一个标识
int rs = resizeStamp(n);
// sc < 0 表示其他线程已经在扩容,尝试帮助扩容
if (sc < 0) {
// 判断扩容是否结束,如果结束则跳出循环
// (sc >>> RESIZE_STAMP_SHIFT) != rs 代表 sizeCtl 发送了变化
// sc == rs + 1 代表扩容结束,不再有线程进行扩容
// sc == rs + MAX_RESIZERS 协助线程数已经达到最大
// (nt = nextTable) == null 代表扩容结束
// transferIndex <= 0 代表转移状态发生了变化
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 其他线程正在扩容,sc + 1 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
} else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 执行到这一步,代表当前对象没有正在扩容,CAS 更新 sizectl,向左移16+2位,变成负数
// 仅当前线程在扩容
transfer(tab, null);
s = sumCount();
}
}
}
// 协助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 节点数组不为空,而且 f 节点的类型是 ForwardingNode 表明数组正在扩容
// (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null 表示迁移完成了,详见transfer()
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 表示在扩容
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
// sc >>> RESIZE_STAMP_SHIFT) != rs 说明扩容完毕或者有其它协助扩容者
// sc == rs + 1 表示只剩下最后一个扩容线程了,其它都扩容完毕了
// transferIndex <= 0 扩容结束了
// sc == rs + MAX_RESIZERS 到达最大值
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// CAS 更新 sizeCtl=sizeCtl+1,表示新增加一个线程辅助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
// 扩容
private final void tryPresize(int size) {
// 如果给定的容量>=MAXIMUM_CAPACITY的一半,直接扩容到允许的最大值,否则调用函数计算合适的容量值
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 如果 sizeCtl 小于0,说明数组还没有被初始化
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
// CAS将SIZECTL状态置为-1,表示正在进行初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 确认其他线程没有对table修改
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);// 无符号右移2位,此即0.75*n
}
} finally {
sizeCtl = sc; // 更新扩容阀值
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY) // 若欲扩容值不大于原阀值,或现有容量>=最值,什么都不用做了
break;
else if (tab == table) { // table不为空,且在此期间其他线程未修改table
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
// 扩容
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 变量 stride 代表每个线程负责的迁移的数量
int n = tab.length, stride;
// 计算每个线程需要负责的迁移数量。
// 如果 cpu 数量大于 1,迁移数量为 n >>> 3(也就是除以8) / cpu 个数,否则就数组长度。
// 如果小于 MIN_TRANSFER_STRIDE 就直接设置线程需要负责的迁移数量为MIN_TRANSFER_STRIDE
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 初始化新数组
if (nextTab == null) {
try {
// 新数组的容量为原数组的两倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
// 新数组的长度
int nextn = nextTab.length;
// 创建一个 ForwardingNode 节点,用于标记
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean advance = true;
// 完成状态,如果为true,就结束方法
boolean finishing = false;
//通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化i和bound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) { //transferIndex<=0表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出
i = -1;
advance = false;
}
// CAS 操作,为当前线程分配数组索引区间
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i < 0 ,表示数据迁移已经完成
// i >= n 和 i + n >= nextn 表示最后一个线程也执行完成了,扩容完成了
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果 finishing=true,说明扩容所有工作完成,然后跳出循环
if (finishing) {
// 删除成员变量,方便GC
nextTable = null;
// 更新table
table = nextTab;
// 更新阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 表示一个线程退出扩容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 说明还有其他线程正在扩容
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 当前线程为最后一个线程,负责再检查一个整个队列
finishing = advance = true;
i = n;
}
}
else if ((f = tabAt(tab, i)) == null)// 第一个线程获取i处的数据为null,
advance = casTabAt(tab, i, null, fwd);// 设置当前节点为 fwd 节点
else if ((fh = f.hash) == MOVED)// 如果当前节点为 MOVED,说明已经处理过了,直接跳过
advance = true;
else {
// 节点不为空,锁住i位置的头结点
synchronized (f) {
// 再次核对,防止其他线程对该hash值进行修改
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 说明该位置存放的是普通节点,以下的操作类似 HashMap,节点的hash和数组长度与运算
// 如果为0则直接将原先的索引位置插入到新数组,否则重新计算数组索引位置并插入
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// CAS 操作保证节点插入是原子性的
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 节点是红黑树结构
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
扩容部分的代码真的好长好长,如果你能够看到这里,我敬你是一条汉子!说实话,我看着都脑壳疼
扩容是 ConcurrentHashMap 的核心源码,其复杂度也大于 ConcurrentHashMap 其他的方法,所以理解难度也提高不少。如有疑问或者错误,麻烦在评论区告知我一下~
总结
HashMap 与 ConcurrentHashMap 在实际开发和面试都是高频的出现的,所以需要我们对其有深入的了解,这样才能避免入坑。同时还需要知道,ConcurrentHashMap 在JDK1.8之后作了优化,采用了 Synchronized + CAS + Node + Unsafe 取代了 JDK1.8 之前的 Segment + HashEntry + Unsafe 实现。用 Synchronized + CAS 代替 Segment ,这样锁的粒度更小,降低实现的复杂度。
参考资料:
https://www.jianshu.com/p/5dbaa6707017
https://blog.51cto.com/14220760/2395683?source=dra
网友评论