ConcurrentHashMap和CopyOnWriteArrayList都是java.util.concurrent
包下的类,分别解决了HashMap和ArrayList的线程安全问题。
本文以对这两个类进行简析,并不作详尽分析。
相关文章
【Java】HashMap原理及相关面试题
【Java】ArrayList、LinkedList原理及相关面试题
本文源码版本为Java14。
一、ConcurrentHashMap
ConcurrentHashMap的数据结构大部分与HashMap相同,依旧是数组+链表/红黑树的结构,每一个单元称为一个桶。不同点在于,ConcurrentHashMap使用了TreeBin
类来表示红黑树,而不是树的根节点。
1. put
这里大略的看一下put
方法,见微知著;重要的地方我都标了注释。
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算哈希值
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 变量:f => 桶的头节点;n => 数组长度
// i => 节点所在桶序号;fh => 桶头节点的哈希值
// fk => 桶头节点的key;fv => 桶头节点的value
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 懒加载,使用CAS保证线程安全
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 当前桶为空,通过CAS创建根节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
else if ((fh = f.hash) == MOVED)
// 当前resize过程正在进行,则本线程进行辅助,多线程同时执行
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
// 如果设置了`onlyIfAbsent`,且这个键已经存在,就什么也不做
return fv;
else {
V oldVal = null;
// 当前桶不为空,对头节点加锁执行
synchronized (f) {
// tabAt(tab, i):利用Unsafe.getReferenceAcquire实现一致性
if (tabAt(tab, i) == f) { // 再次确认,保证元素没有被别的线程改变
if (fh >= 0) { // 哈希值大于0,说明这是个链表
binCount = 1;
// 遍历链表,找到对应节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) // putIfAbsent()方法
e.val = value; // 修改值
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 没有找到对应节点,添加在末尾
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
// 这是一棵树
Node<K,V> p;
binCount = 2;
// 在树中寻找节点,如果没找到就添加,找到就修改值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
// 保留节点,占位用,只在compute和computIfAbsent方法中使用
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
// 长度大于阈值,转树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 哈希表节点计数+1,使用CAS保证线程安全
return null;
}
首先要明确的是,线程不安全的根本原因是线程共享变量的使用。
下面来一一分析put
方法的步骤。
0. 计算哈希值
首先计算put传入k-v对的哈希值。
1. 接下来将开始无限循环,每次循环将进入以下分支之一
1.1 如果table
数组为空
如果table
数组为空,则使用CAS初始化table
数组。
与HashMap不同,ConcurrentHashMap中的table
数组不会随着对象的初始化而初始化,而是在第一次使用的时候才进行懒加载。
执行完这一步之后,继续下一轮循环。
1.2 如果对应的桶为空
如果对应的桶为空,创建对应的节点作为链表的头节点,放入这个桶中。
这里的f = tabAt(tab, i)
方法使用了Unsafe的getReferenceAcquire
确保了读取数组时的一致性。
节点入桶的操作不是通过tab[i] = node
进行赋值,而是通过CAS来完成,以确保线程安全;如果CAS失败,则继续下一轮循环。
1.3 如果当前链表正在扩容
如果当前链表正在进行扩容(其他线程干的),那么本线程将会协助一起进行扩容。
1.4 对应的桶不为空(else分支)
这个步骤基本和HashMap一致,如果是链表那就遍历链表查找节点,找到了就修改、没找到就添加在尾部,长度超过阈值就转树;如果是树的话就按照红黑树的逻辑添加节点。如果修改或者添加成功了,就跳出循环。
本步骤基本靠synchronized加锁完成线程安全,不过有两点需要注意。
第一点,在进入同步块后,又进行了if (tabAt(tab, i) == f)
判断,确保对应桶的头节点没有被其他线程更改;
第二点,synchronized锁定的是f
,也就是这个桶的头节点。也就是说,当本线程运行这个同步块代码时,其他线程如果要修改其他桶中的节点,是不受影响的,这样有助于提高效率。用网上的话说,就是降低了锁的粒度。
最后,节点计数+1,并检查扩容。扩容的机制特别复杂,而且是多线程协同操作(每个线程在一个时刻负责一个桶)。不过扩容的基本逻辑和HashMap是一致的,参见【Java】HashMap原理及相关面试题 ,其中线程安全是使用CAS与synchronized保证的。
2. get
读数据和HashMap基本一致,除了获取头节点的时候使用了tabAt()
来保证一致性。
首先找到对应哈希值的头节点,然后遍历链表找到对应节点;或者按照红黑树的方式查找节点。
而之所以get
方法不需要加锁就能保证线程安全,是因为Node的val
和next
都是用volatile
修饰的,这样就保证了本线程对其修改的可见性,在遍历链表的时候,不会因为其他线程修改变了节点而导致误读。
3. remove
remove基本就是put的逆过程,这里就不再赘述。
4. 小结
- ConcurrentHashMap保证线程安全的方式在Java7及之前版本使用的分段锁Segment,在Java8之后使用Unsafe类+synchronized。
- ConcurrentHashMap避免对
table
数组直接引用和修改,因为数组中的元素无法使用volatile来保证一致性。所以,采用了读使用tabAt
方法,写使用casTabAt
、setTabAt
方法,以此保证线程安全。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
// —————— Unsafe ——————
public final Object getReferenceAcquire(Object o, long offset) {
return getReferenceVolatile(o, offset);
}
public final void putReferenceRelease(Object o, long offset, Object x) {
putReferenceVolatile(o, offset, x);
}
- 事实上,使用
tabAt
+casTabAt
可以在不加锁的情况下确保对数组修改的线程安全。而setTabAt
总是在同步代码块中调用,仅仅起到防重排的作用,这一点在ConcurrentHashMap内部的注释中也提到了。 - ConcurrentHashMap使用synchronized锁定桶的头节点来保证修改数据时的线程安全,这一点既保证了线程安全,又提高了效率,因为其他桶是不受影响的。
- ConcurrentHashMap的扩容可以多线程并发执行,一个线程同一时间负责一个桶的转移,锁定桶的头节点保证线程安全。
二、CopyOnWriteArrayList
CopyOnWriteArrayList相对来说比较简单,直接看它的get
和add
方法:
private transient volatile Object[] array;
final Object[] getArray() { return array; }
final void setArray(Object[] a) { array = a; }
public E get(int index) { return elementAt(getArray(), index); }
static <E> E elementAt(Object[] a, int index) { return (E) a[index]; }
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
public void add(int index, E element) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException(outOfBounds(index, len));
Object[] newElements;
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(es, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(es, 0, newElements, 0, index);
System.arraycopy(es, index, newElements, index + 1,
numMoved);
}
newElements[index] = element;
setArray(newElements);
}
}
CopyOnWriteArrayList在读数据的时候,直接返回数组中的元素,没有做任何处理;
在写数据的时候,会复制当前数组,在对应位置添加元素,然后将内部数组array
设置为复制后的新数组,整个写的过程是加锁的。
可以看出,CopyOnWriteArrayList添加元素的效率是极低的,但是获取元素的效率很高。同时,由于读取数据的时候是直接返回的array
数组中的数据,所以ConcurrentModificationException不能保证读数据时的一致性。
所以,CopyOnWriteArrayList适用范围是:并发线程数高,读数据的次数高,且极少进行修改的情况。如果这个List中的内容经常修改的情况,CopyOnWriteArrayList就不适用了。
再有一点就是,CopyOnWriteArrayList在迭代遍历的过程中是可以进行修改的,不会抛出ConcurrentModificationException异常。
网友评论