逅弈 欢迎转载,注明原创出处即可,谢谢!
HashMap是用的最多的来保存数据的数据结构,但众所周知,HashMap不是线程安全的。
当插入一个新的节点时,如果这个key不存在,则会判断当前内部元素是否已经达到阈值(默认是数组大小的0.75),如果已经达到阈值,会对数组进行扩容,也会对链表中的元素进行rehash。
但是在并发的情况下,进行扩容时,可能会产生循环链表,然后在执行get的时候,会触发死循环,导致CPU100%运转,所以一定要避免在并发环境下使用HashMap。
而为了解决线程安全的HashMap,可以通过Collections.synchronizedMap(Map map)来对一个HashMap进行包装,其内部是通过一个SynchronizedMap内部类重新生成了一个Map,并对每个方法加上了synchronized锁,所以可以实现线程安全的HashMap。但这并不是我们所需要的,这种方式是多线程不友好的,同一时刻同一个方法只有一个线程能执行。所以为了提高并发量,提高性能,Doug Lea为了我们提供了并发的HashMap,也即ConcurrentHashMap,该类在并发包juc中。
PS 本文分析的是JDK1.8的ConcurrentHashMap的实现
首先ConcurrentHashMap是通过数组+链表/红黑树的结构来存储数据的,通过CAS+synchronized保证并发的安全性的。
有以下重要的内部类:
/**
* 节点,保存key-value键值对
* Key-value entry. This class is never exported out as a
* user-mutable Map.Entry (i.e., one supporting setValue; see
* MapEntry below), but can be used for read-only traversals used
* in bulk tasks. Subclasses of Node with a negative hash field
* are special, and contain null keys and values (but are never
* exported). Otherwise, keys and vals are never null.
*/
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// 省略
}
/**
* 红黑树节点
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
// 省略
}
/**
* 一棵红黑树
* TreeNodes used at the heads of bins. TreeBins do not hold user
* keys or values, but instead point to list of TreeNodes and
* their root. They also maintain a parasitic read-write lock
* forcing writers (who hold bin lock) to wait for readers (who do
* not) to complete before tree restructuring operations.
*/
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
// 省略
}
/**
* 辅助节点,在扩容时使用
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
// 省略
}
重要的变量
/**
* 保存节点的数组
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* 扩容时使用的节点数组,只在扩容时不为空
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 控制标识符,在初始化和扩容时使用
* 负数表示正在初始化或扩容
* -1 表示正在初始化,此时只有一个线程可以进行初始化,其他线程通过Thread.yield()挂起
* -N 表示有N-1个线程正在进行扩容
* 正数 表示数组的大小
* 数组初始化完成后该值表示下一次扩容的元素的大小
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
- table初始化
table初始化操作会延缓到第一次put的时候执行。但是put是可以并发执行的,Doug Lea是如何实现table只初始化一次的?来看看源码的实现
sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。
当执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,
有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 当发现sizeCtl<0时,说明当前有线程正在进行表的初始化工作,则当前线程挂起
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 否则sizeCtl为初始值,则通过CAS将sizeCtl更改为-1,表示当前线程要进行表的初始化工作了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 由于n总是2的幂次方,所以sc=0.75*n
sc = n - (n >>> 2);
}
} finally {
// 初始化工作完成后,sizeCtl的值变为下一次扩容时表的大小
sizeCtl = sc;
}
break;
}
}
return tab;
}
- put操作
假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作,具体实现如下:
1 hash算法
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
2 在table中定位索引位置,n是table的大小
int index = (n-1) & hash;
3 获取table中对应索引的元素f
Doug Lea采用tabAt来获取对应索引i处的节点f
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
而tabAt中实际是调用的Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,为什么要这么复杂?
在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但table中的元素不是volatile的,不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的
4 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
如果CAS成功,说明Node节点已经插入,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。
如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
5 如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。
6 其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table为空时,执行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 根据key的hash值得到对应的节点f,和该节点在表中的位置i
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果节点f为空,则通过CAS在位置i插入一个新节点
// 如果插入成功则跳出循环,并调用addCount判断是否需要进行扩容
// 如果插入失败,则通过自旋重新尝试在位置i插入节点
if (casTabAt(tab, i, null,new Node<K,V>(hash,key,value, null)))
break; // no lock when adding to empty bin
}
// 如果节点f的hash值为-1,则说明有线程正在进行扩容,则当前线程帮助一起进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 省略内置同步锁部分
}
}
// 判断是否需要扩容
addCount(1L, binCount);
return null;
}
如果节点f未能通过CAS插入,则在节点f上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。
如果f.hash >= 0,说明f是链表结构的头结点,遍历链表,如果找到对应的node节点,则修改value,否则在链表尾部加入节点。
如果f是TreeBin类型节点,说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点。
如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。
// 使用内置同步锁的方式,进行节点的新增或更新
synchronized (f) {
// 如果位置i上还是节点f
if (tabAt(tab, i) == f) {
// 如果节点f的hash>=0,则说明f是链表结构的头节点
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现该key在链表中已经存在,则将老的节点的值更新为value
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果onlyIfAbsent==false,则用新的值覆盖旧的值,否则不添加
if (!onlyIfAbsent)
e.val = value;
break;
}
// 找到链表的尾部,将该key封装成一个新的节点插入到链表的尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,value, null);
break;
}
}
}
// 如果节点f是TreeBin的节点,则说明f节点是红黑树的根节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 在树上遍历,新增或更新该节点到红黑树中去
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 内置锁同步结束后
// 如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
- get操作
1 获得key的hash值h
2 通过tabAt获取key对应的节点e,如果节点e为空,则说明没有该值,直接返回null
3 如果节点e不为空,并且该节点的hash等于key的hash值,并且节点e的key与当前key相等,则说明该节点就是要找的节点,直接返回节点的val
4 如果节点的hash小于0,则说明该节点在红黑树上,则通过find方法在树上找到key对应的节点p,如果节点p不为空,则返回p的val
5 以上都没能找到的话,最后会从该节点开始遍历链表,直到找到key对应的节点e,如果e不为空,则返回e的val
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 获得key的hash值
int h = spread(key.hashCode());
// 通过tabAt获取key对应的节点e
// 如果节点e为空,则说明没有该值,直接返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果节点e不为空,并且该节点的hash等于key的hash值
// 并且节点e的key与当前key相等,则说明该节点就是要找的节点,直接返回节点的val
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果节点的hash小于0,则说明该节点在红黑树上
else if (eh < 0)
// 则通过find方法在树上找到key对应的节点p
// 如果节点p不为空,则返回p的val
return (p = e.find(h, key)) != null ? p.val : null;
// 否则在链表上遍历,找到key对应的节点e
while ((e = e.next) != null) {
// 返回e的val
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
更多原创好文,请关注「逅弈逐码」
网友评论