并发情况的哈希表
前文中 JDK容器三大将之--哈希表(HashMap) 提到过哈希表,Java中单线程情况时采用了HashMap这样的数据结构通过对数组的封装实现了哈希表数据结构,并且被各个软件广泛使用,也提到过HashMap的modCount字段用来对并发情况抛出快速失败。
既然HashMap不支持并发的读取和修改,那么肯定需要另外一种数据结构来提供保证线程安全的哈希表结构,笔者了解到的有三种方式。
- HashTable,通过对put/get等方法加锁实现互斥功能
- Collections.synchronizedMap(map),通过调用JDK提供的方法将已有的HashMap封装为支持并发的SynchronizedMap(装饰器模式),本质上和HashTable比较类似
- ConcurrentHashMap
其中 HashTable 和 Collections.synchronizedMap(map)方法生成的SynchronizedMap 都是简单粗暴的对put/get方法加上了synchronized锁来控制线程安全,
例如:有一个哈希表map,先后有三个线程分别执行了put(1,1)
, put(1,2)
, put(10086,666)
的请求,那么这三个操作只能够顺序执行,如下图所示
因为每次只能有一个线程可以执行,这就违背了线程并发的初衷,多线程加锁的原则之一就是减少锁的竞争,而ConcurrentHashMap使用了锁分段的技术提高了多线程哈希表的并发度,下面我们就来分析下ConcurrentHashMap锁分段的实现原理以及分段后面临的问题和解决方式。
ConcurrentHashMap
ConcurrentHashMap各个变量介绍
- table,是一个Node[]数组,同HashMap一样,通过数组+拉链法存储实际的value数据
- nextTable,一般为null,只有在ConcurrentHashMap执行resize迁移的时候有值
- baseCount,没有锁竞争时直接返回该哈希表的大小
- sizeCtl,控制table的初始化和resize状态,
- -1,初始化中
- -(1+活跃中的resize任务个数),存储扩容中的任务个数
- table为null,保存table将来进行第一次初始化时的大小
- table已经初始化成功后,保存着下一次扩容的目标大小
- transferIndex,下次扩容需要从table该下标开始分裂
- cellsBusy,扩容和计数时需要用到的自旋锁
- counterCells,CounterCell[]数组,CounterCell是基于LongAdder改造的用于多线程计数功能的实现,LongAdder也是AtomicLong的底层实现
put()方法的实现与分段锁
ConcurrentHashMap的put过程伪代码可以描述为如下
def put(key, value):
hash = hash(key) # 根据key计算hash
if table == null: # 1.table为空时初始化
table = initTable() # 初始化后重新尝试获取key
if table != null and table[hash] == null: # 2.table不空 且 key所在的桶尚没有值,则通过cas存入值
casTabAt(table, hash, key, value)
if table != null and table[hash] == MOVED: # 3.table不空 且 key所在位置标识为迁移中,则执行helpTransfer帮助迁移
table = helpTransfer(table, table[hash]) # 迁移后重新尝试获取key
if table != null and table[hash] != MOVED: # 4.table不空 且 key所在位置有哈希冲突,则对该段加锁执行传统HashMap的拉链法+红黑树处理逻辑
hashCollisionWithSynchronized(table, hash, key, value)
addCount(1, binCount) # hash节点数+1
put操作的整个流程如下
put操作过程流程图
下面依次详细介绍initTable、casTabAt、helpTransfer方法的执行过程,拉链法+红黑树的哈希冲突处理方式除了加了synchronized锁和double-check功能之外与普通HashMap区别不大,本文就不再赘述了,感兴趣的读者可以从HashMap的哈希冲突处理了解。
(和上图对比下同样put操作的流程)
initTable--并发情况下table的初始化(CAS)
当table数组为空的时候,会调用initTable方法初始化table数组,这里用到了上文提到的ConcurrentHashMap的变量sizeCtl
,方法的代码并不长,这里就不再贴了,方法执行的流程如下
Java中通过 sun.misc.Unsafe#compareAndSwapInt
的native方法实现cas的调用(系统调用保证比较和替换操作的原子性)
方法的作用是,读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较。
相等就把x值赋值给offset位置的值。方法返回true。
不相等,就取消赋值,方法返回false。
这也是CAS的思想,即比较并交换。用于保证并发时的无锁并发的安全性。
casTabAt--cas乐观锁
调用的是 sun.misc.Unsafe#compareAndSwapObject
compareAndSwapXXX接收四个参数,分别是
- 要被进行swap操作的对象
- 偏移量
- expect
- 目标value
该方法是native调用,系统调用保证了比较和替换过程的原子性。CAS是Java乐观锁的实现方式,乐观锁,顾名思义,是假设有很大概率能够获得锁而产生的逻辑,乐观锁的本质即 自旋的while循环 + CAS操作,抢不到就继续自旋。
helpTransfer--ConcurrentHashMap的扩容过程
helpTransfer的扩容过程详细逻辑涉及到ConcurrentHashMap出于对硬件性能的考虑,代码的实现比较晦涩难懂,本文会屏蔽掉这些数学计算的详细细节,旨在大体介绍下ConcurrentHashMap的扩容过程。
helpTransfer方法的伪代码如下
def helpTransfer(table, f): # table即ConcurrentHashMap的旧table[]数组,f即上文的ForwardingNode
if table != null and f instanceof ForwardingNode: # 经典double-check
while(f.nextTable == this.nextTable and this.table == table and sizeCtl < 0): # 还是double-check
if noNeedMoreHelperThread(sizeCtl): # 判断帮助迁移的线程不需要人手了,返回
return
if cas(this, sizeCtl, sizeCtl+1): # cas操作sizeCtl,准备帮忙
transfer(table, nextTable) # 具体的迁移过程
return
# 需要帮忙,但是cas没抢到,自旋
可以看到helpTransfer方法的主要作用是判断迁移工作还需不需要自己参与,这个是根据sizeCtl和table.length计算得到的,读者可以自己深追细节。接下来就是迁移过程的核心方法transfer
,伪代码如下:
def transfer(table, nextTable):
stride = calcStride(table.length) # 根据当前CPU核数和旧table大小计算【跨度】
if nextTable == null:
nextTable = new Node[n << 1] # 扩容一倍
advance = true # advance是一个状态标识,可以理解为【揽活中】
for():
while(advance):
if transfering() || finishing:
advance = false # 如果正在迁移中或者迁移结束了,不需要再揽活
else
bound, i = calcBoundByCas(transferIndex, stride) # 通过transferIndex变量和前面算出来的跨度,计算出当前要揽活多少,table[bound -> i]
advance = fasle # 揽活结束,开始干活
if noMoreTodo():
doFinish() # 通过下标判断没有更多任务了,则进行收尾工作
return # 退出,这里是出口
if hasMoreTodo() and tabAt(table, i) == null:
casTabAt(table, i, null, fwd) # 置为fwd节点
continue # 继续for循环
if hasMoreTodo() and tabAt(table, i) == MOVED: # 已经是fwd节点
advance=true # 说明别的线程在施工,循环重新揽活
continue # 继续for循环
if hasMoreTodo() and tableAt(table, i) != MOVED: # 普通节点
transferSynchronized() # synchronized加锁执行迁移,迁移的过程中也会将施工中的节点置为fwd节点,迁移细节和普通HashMap一样,大致就是根据节点的hash值算出low和high,然后分别落到新table的不同索引位置
几个概念
- stride,跨度,根据硬件性能评估出每个线程一批能处理多少范围的桶
- advance模式,揽活模式,这时候正在算当前线程要处理table[bound, i]的迁移工作
- finishing模式,收尾工作,table置为nextTable,nextTable置为null,sizeCtl赋值
整个扩容过程的执行模式如下图
节点可能的状态:
- 普通节点,没有进行迁移时的普通节点,分为拉链节点、红黑树节点
-
路由节点ForwardingNode,在迁移中状态时安放在旧table上的特殊节点,用作路由
施工中
路由节点ForwardingNode
与单线程的HashMap不同的是,多线程的ConcurrentHashMap的迁移过程必须要考虑到迁移过程中有其它线程的读操作,因此我们只能有两种选择:
- 在迁移的过程中将整个表锁住,所有读取的操作都因为等锁而挂起,这种方式性能上是不可接受的
- 在迁移的过程中,一定会存在部分桶迁完了、部分桶没迁完的中间状态,在这种状态下进行get、put操作时,为了能够保证迁移完的都走新table,就因此产生了ForwardingNode节点的概念,大致流程如下图所示(注意在某个桶的迁移过程中还是通过synchronized加锁了的,这和我们前面讨论的分段锁的用法是一致的)
forward
addCount--ConcurrentHashMap的size()方法与LongAdder
addCount
ConcurrentHashMap的addCount方法的实现借鉴了LongAdder的实现原理,大致可以粗略理解为:
- 如果没有发现竞争(通过CAS),返回 baseCount + x(很简单的
baseCount+=x
) - 如果发现竞争,采用了类似分段锁的思想,根据当前线程的hash定位到了
CounterCell[index]
,然后通过CAS给这个CounterCell累加(有ConcurrentHashMap分段锁那味了),最后获取总数时是 baseCount + cells[0-n]
以下是大致实现流程图,本文没有深入实现的细节(例如CounterCell数组初始化、扩容等),想要深入了解的读者可以参考这篇文章 深入剖析LongAdder是咋干活的,写的挺好的
ConcurrentHashMap的size()方法
基于上面第LongAdder的大致实现原理的理解,size方法的流程可以直接贴源码了
sum = baseCount + counterCells[0] + counterCells[1] + ... + counterCells[n]
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
从源码可以看到,ConcurrentHashMap的size()方法不能保证结果的完全准确,这个方法完全没有加锁,也就是baseCount、counterCells[i]都有可能被并发修改,但是如果执行sumCount的时候加锁会影响性能,这也是牺牲准确性换取性能的一种折衷吧
总结
从源码中可以看出,Java为了最大能力的优化哈希表在并发情况下的性能尽可能做了最大的努力,包括:
- 锁是避免不了的,但是锁住的数据量要尽可能的少(少锁住点) -- 分段锁
- 在一些不太可能出现竞争,只是不加锁又有可能会有线程安全风险的地方,尽量使用CAS乐观锁,不要挂起线程(能别锁就别锁)
- 在put等操作时发现正在迁移的过程中,顺便一起帮帮忙,可以,这很团结
只能说不愧是你,Doug Lea。
Reference
[1] 《Java并发编程实战》
[2] JDK容器三大将之--哈希表(HashMap)
[3] 深入剖析LongAdder是咋干活的
网友评论