本文会从以下三个方面理解ConcurrentHashMap的底层实现,2020.08.06 21:22
- 红黑树(简单介绍,自平衡的实现真心看不懂)
- CAS,UnSafe类
- ConcurrentHashMap的put操作,get操作,扩容机制
红黑树
红黑树是一个拥有红黑节点的,可以自平衡的二分查找树;
特点:
- 只有红色和黑色节点
- 根节点是黑色节点
- 每一个红色节点的两个孩子一定都是黑色
- 任意一个节点到叶子节点的任意路径,经过的黑色节点数相同
- 每一个叶子节点都是黑色(JAVA表示的红黑树的叶子节点是NULL节点)
红黑树的自平衡:
红黑树的自平衡是通过左旋,右旋,变色
来实现的:
左旋:
对P节点进行左旋- P节点的右孩子变成自己的父节点
- P节点的右孩子的左节点变成自己的右孩子
右旋:
- P节点的左孩子作为自己的父节点
- P节点的左孩子的右节点变成自己的左节点
红黑树的查找:
红黑树的是一个二叉查找树,所以查找过程和二叉查找树的过程是一样的,将查找的树和当前节点进行比较,如果当前节点小就查右边的子树,如果当前节点大就查左边的子树;
红黑树的最坏情况是O(2 lgn)
红黑树的插入:
- 先当成二插搜索树插入当前节点
- 当前节点设置为红色,(保证根节点到叶子所有的黑色节点数不变,至少可以保证这一特性,减少自平衡的难度)
- 通过旋转和变色保持红黑数的五个特性;
其实只要满足(红色节点的孩子一定是黑色节点),因为其他的特性都不会被违背
红黑树的删除:
- 当成二叉搜索数删除
- 通过旋转和变色进行自平衡
CAS
Compare And Swap , 也叫自旋锁,乐观锁;
线程在对资源(未加锁)
进行操作时,会将自己线程的副本和内存的实际值进行比较,如果相等就操作,如果不相等就不操作,更新副本的值,然后下一次循环再次尝试,如果一直失败,那么当前线程就会一直循环,所以叫自选锁;
UnSafe类:
Java提供了UnSafe类可以绕过jvm对机器内存进行直接操作,我们知道,对象是存在堆内存里的,这只是在jvm层面去看,如果从操作系统的角度出发,实际上对象是存在机器内存上的,jvm只不过是一个帮我们管理java进程的伪内核而已;
使用UnSafe直接操作内存的注意点:
-
需要像C语言一样手动释放内存,绕过jvm操作内存意味着jvm不会帮我们自动GC
-
在操作内存的时候需要提供对象的原始地址和对象成员变量的偏移量,需要手动计算(UnSafe类有获取原始地址和偏移量的方法),如果操作不当会导致jvm崩溃(内核把jvm当作一个触发了
abort中断(不可修复的错误)
的进程,给他干掉) -
直接操作内存,更快
这里介绍一下ConcurrentHashMap中涉及到的UnSafe操作,关于UnSafe的详细操作可以参考这篇文章:Java中的Unsafe
Unsafe中提供了int,long和Object的CAS操作:
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
-
var1:
需要修改的那个值所属的对象 -
var2:
需要修改的那个对象的偏移量(在内存中和对象内存的相对位置,通过UnSafe的一个方法获取) -
var3:
期望值,线程副本中的值 -
var4:
修改的值
UnSafe提供的获取类属性的偏移量:
public native long staticFieldOffset(Field var1);
public native long objectFieldOffset(Field var1);
public native Object staticFieldBase(Field var1);
public native int arrayBaseOffset(Class<?> var1);
public native int arrayIndexScale(Class<?> var1);
- Field是反射中的Field
ConcurrentHashMap
本文是基于JDK1.8的源码分析的;
ConcurrentHashMap的底层数据结构:
transient volatile Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
ConcurrentHashMap1.8和HashMap一样,都是采用数组+链表;1.7之前使用的Segment,每一个Segment里面是一个链表数组;
ConcurrentHashMap的put:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 分析1
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 分析2
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 分析3
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 分析4
synchronized (f) {
if (tabAt(tab, i) == f) {
// 分析4.1
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 分析4.2
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 分析5
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 分析6
addCount(1L, binCount);
return null;
}
- 分析1:如果是第一次put,需要初始化数组,为了防止数组被多次初始化,这边使用CAS去尝试修改自己的一个字段,如果当前线程可以通过CAS将这个字段修改为-1,就可以创建数组;
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
}
}
}
return tab;
}
-
分析2:通过key的hash值去找数组下标,当前下标的数组元素是没有空的,就是null的话,使用CAS向这个下标里放一个Node节点
-
分析3:ConcurrentHashMap内部有一个字段用来表示当前是否在进行扩容,如果有一个线程正在扩容,那么这个字段就是
MOVE状态
,如果另一个线程发现当前的ConcurrentHashMap处于MOVE状态,就会帮助它扩容; -
分析4: 如果之前的代码都没有走,那么就表示当前的数组元素是有值的,需要添加到链表或者红黑树中,这个操作是需要加synchronized 锁的,保证插入链表(红黑树)的线程安全,这里锁的对象是头节点
这里需要补充一下,在HashMap中,红黑树的节点是TreeNode,而在ConcurrentHashMap中红黑树是以TreeBin的形式出现在数组里的,这个TreeBin里面放的是TreeNode,这里解释一下为什么,因为红黑树在插入后会进行自平衡,会旋转,所以头结点可能会发生改变,那么就使用TreeBin这个对象对红黑树在进行一层封装,这个对象是不会发生改变的
- 分析4.1:如果数组中元素是一个Node,那么就添加到链表的最后一个
- 分析4.2:如果数组中元素的节点是一个TreeBin,就将当前节点插入到红黑树中;
-
分析5:判断插入节点后是否需要树化,树化的判断条件时链表的长度超过8
-
分析6:size++,判断是否需要扩容;
在1.7的版本中,ConcurrentHashMap锁的是Segment,分段锁,在扩容的时候各个Segment之间互不影响,而1.8的锁的粒度更小了,锁的是Node节点,这样在扩容的时候就会影响到数据的读写,为了解决1.8扩容影响性能的问题,引入了ForwardingNode
;
转发节点ForwardingNode
:fwd只有在扩容的时候起作用, 将旧数组的数据移动到新数组后,会在旧数组上放置一个fwd对象,当读线程读取到fwd时会到新数组的对应位置读,当写线程发现是fwd时就帮助扩容;
ConcurrentHashMap的transfer:
看了三天了,没看懂,我还是去复习网络吧;
网友评论