前言
JDK中的Hashtable是一个线程安全的K-V形式的容器,它实现线程安全的原理十分简单,就是在所有涉及对该哈希表操作的方法上都加上了synchronized关键字,进行加锁操作。这么做实现了线程安全,但是效率非常低。
//通过synchronized在每次进入方法时获取hashmap的锁,高并发情况下会出现争用冲突
public synchronized V get(Object key) {
Entry tab[] = table;
int hash = hash(key);
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return e.value;
}
}
return null;
}
ConcurrentHashMap使用了分段锁的设计,只有在每个段上才会存在并发争用,不同的段之间不存在锁的竞争,提高了该hashmap的并发访问效率。但也是由于ConcurrentHashMap的分段设计,某些需要扫描所有段的操作如size()等方法实现上比较复杂,并且并不能保证强一致性,这个在某些情况下需要注意。由于ConcurrentHashMap在各个JDK版本下的实现是有区别的,特说明本文基于jdk1.7源码,我们下面一起通过其源码来分析它的实现原理。
ConcurrentHashMap的组成
查看源码,我们可以通过ConcurrentHashMap的成员变量,了解该数据结构的基本组成。
final int segmentMask;//作为查找segments的掩码,前几个bit用来选择segment
final int segmentShift;
final Segment<K,V>[] segments;//每个segment都是一个hashtable
ConcurrentHashMap内部有一个segments数组,每个segment都是一个hashtable ,由于Segment继承自ReentrantLock ,因此Segment本身就是锁,对每个Segment的加锁就是调用自身的acquire()方法。Segment的基本组成如下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;//存入ConcurrentHashMap的k-v数据都存在这里
transient int count;//该segment内的HashEntry计数(如put/remove/replace/clear)
transient int modCount;//对该segment的修改操作数量
final float loadFactor;//hashtable的负载因子
. . . . . . . . .
}
HashEntry是最终存储每一对k-v的最底层数据结构,它的结构为:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
value为volatile,在多线程环境下可以保持可见性,因此可以不加锁直接读取。
ConcurrentHashMap的整体数据结构如图
ConcurrentHashMap源码分析
本文前言中,我们说了‘ConcurrentHashMap通过分段锁技术提高了该hashmap的并发访问效率’,我们接下来就通过ConcurrentHashMap的get/set/remove等方法来说明,ConcurrentHashMap在保证线程安全的情况下,是如何做到这些的。
- ConcurrentHashMap的初始化
concurrencyLevel 是预估会对ConcurrentHashMap进行并发修改的线程数,也可以理解为支持的最大并发访问ConcurrentHashMap且不产生锁的争用的线程数,也就是Segment的数量,也即分段锁的个数,默认为16。
//loadFactor是每个segment的负载因子,concurrencyLevel是估计的并发修改线程,默认为16,所以创建16个segment
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;//默认16
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);//初始化一个segment
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
- 创建分段锁
如上面ConcurrentHashMap的初始化,ConcurrentHashMap采用延迟初始化的模式,首先只new一个Segment,剩下的Segment只在使用时初始化。每次put操作时,定位到key所在的Segment后,会先去判断Segment是否初始化了,没有的话由新建一个Segment并通过循环CAS设置Segment数组对应位置引用。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
- get()
get方法不需要加锁操作,因为HashEntry中的value是volatile,可以保证线程间的可见性。由于segments非volatile,通过UNSAFE的getObjectVolatile方法提供volatile读语义来遍历获得对应链表上的节点。但没有锁可能会导致在遍历 的过程中几点 被其它线程修改,返回的val可能是过时数据,这部分是ConcurrentHashMap非强一致性的体现,如果对强一致性有要求,那么就只能去使用Hashtable或Collections.synchronizedMap这种通过synchronized关键字进行加锁提供强一致性的Map容器。containsKey操作与get是 一样 的。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//u是该key在segments数组的下标,可以定位该key是在哪一个Segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//(tab.length - 1) & h)定位key在Segment的HashEntry数组中的下标
//遍历HashEntry链表,找到与key和key的hash值一致的HashEntry元素
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- put/putIfAbsent/putAll操作
数据插入操作,在这里才能看到ConcurrentHashMap基于分段锁提高高并发环境下的处理能力,增大ConcurrentHashMap吞吐量的实现技巧。
//该方法是ConcurrentHashMap所有put操作的核心
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//首先尝试加锁(调用ReentrantLock的tryLock方法,对当前Segment实例加锁),若没有获取成功则找到与该Key相同的Node,若没有则new一个。返回时必须获取锁,
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;//
int index = (tab.length - 1) & hash;//当前该Node的插入点
HashEntry<K,V> first = entryAt(tab, index);//获取tbale中第index个元素
//在链表上遍历对比节点有相同的key则修改对应value,没有则放在链表尾部
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//key不存在的情况下才会设置value
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//当当前segment的元素数量超过阈值时rehash
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);//超过阈值进行扩容
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
}
- size()/containsValue()
先不加锁进行循环相加计算,最少进行两次对所有Segment的modCount与count进行求和,如果前后两次modCount求和的值相等,认为这期间ConcurrentHashMap的HashEntry数量没有变化,就 直接返回count求和的值,认为这就是ConcurrentHashMap的大小。如果前后两次modCount求和的值相等一直不相等,会循环进行 这样 的尝试。默认是两次,超过两次就会认为可能现在在ConcurrentHashMap上正在 进行着密集的操作(会 影响ConcurrentHashMap的大小),于是会强制对 每个Segment进行加锁,然后在重复 前面的计数操作,最终返回 计算的ConcurrentHashMap的大小。因此多线程环境下这种实现返回 的 ConcurrentHashMap大小是近似的,不准确的,这个 需要在使用ConcurrentHashMap时做出取舍。containsValue与size操作原理类似。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)//判断前后modCount和相等
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
总结
以上就是jdk1.7中ConcurrentHashMap重要操作的源码实现分析。我们可以对 ConcurrentHashMap做出 以下判断:
- 通过分段锁的设计,减小了各个分段之间的锁的争用冲突,各个分段本省 就是一个独立的哈希表。这样的设计提高 了ConcurrentHashMap的 吞吐量,提高了hashmap的并发访问效率。
- HashEntry中value是volatile类型,使得ConcurrentHashMap的get方法可以无锁操作获取value,提高了get的效率。但需要注意的是,这也导致了ConcurrentHashMap的 弱一致性问题,在对get操作 有强一致性要求的场景下,必须选用其它支持map强一致性的容器。
- size等操作在多线程环境下的返回值是不精确的,只可做近似值,建议多线程环境下不要依赖size()。
参考
- 《Java并发编程的艺术》
- ConcurrentHashMap总结
网友评论