J.U.C中实现Map接口的并容器有ConcurrentHashMap和ConcurrentSkipListMap。
ConcurrentHashMap
ConcurrentHashMap是并发容器中锁分拆的一个经典设计。
ConcurrentHashMap 内部布局:
public class MyConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/** 数据段初始容量为16,默认为 16 个数据段 */
static final int DEFAULT_INITIAL_CAPACITY = 16;
/** 默认装载因子为 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/** 默认并发级别为 16 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/** 数据段table的最小容量,避免next使用时,需要立即扩容 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/** 最数据段段数量 65536 */
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/** hash掩码*/
final int segmentMask;
/** 偏移量,与segmentMask一起定位数据段 */
final int segmentShift;
/** 数据段 */
final Segment<K, V>[] segments;
transient Set<K> keySet;// Key集合
transient Set<Map.Entry<K, V>> entrySet;// entry集合
transient Collection<V> values;// value集合
/** 元素 k-v键值对 */
static final class HashEntry<K, V> {... ...}
/** 数据段,继承自ReentrantLock简化加锁*/
static final class Segment<K, V>
extends ReentrantLock implements Serializable {... ...}
... ...
}
ConcurrentHashMap默认是分为16个数据段,每个数据段在在添加或修改数据时会各自加锁,意味着在理想的情况下可以由16个线程同时写一个ConcurrentHashMap。
内部类HashEntry和Segment是两个最重要的基础设施。
static final class HashEntry<K, V> {
final int hash;// hash码 不可变
final K key;// 键 不可变
volatile V value;// 值 可见行
volatile HashEntry<K, V> next;// 后继实体 可见性
// 构造方法
HashEntry(int hash, K key, V value, HashEntry<K, V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
// 设置后继实体next
final void setNext(HashEntry<K, V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
... ...
}
static final class Segment<K, V> extends
ReentrantLock implements Serializable {
/** 元素数组*/
transient volatile HashEntry<K, V>[] table;
/** 元素个数*/
transient int count;
/** 修改次数*/
transient int modCount;
/**rehash临界值,
* 当 table 中包含的 HashEntry 元素的个数超过本变量值时,
* 触发 table 的再散列*/
transient int threshold;
/** 负载因子*/
final float loadFactor;
/** 构造Segment,负载因子,临界条件,table */
Segment(float lf, int threshold, HashEntry<K, V>[] tab)
/** put方法*/
final V put(K key, int hash, V value, boolean onlyIfAbsent)
/** 再散列*/
private void rehash(HashEntry<K, V> node)
/** 删除元素*/
final V remove(Object key, int hash, Object value)
/** 替换元素值*/
final boolean replace(K key, int hash, V oldValue, V newValue)
... ...
}
HashEntry就是Map中的元素,也就通常说的键值对。
Segment是数据段实际存放HashEntry的地方,HashEntry放在数组table中。HashEntry是单向链表的结构,由于HASH算法是将一个大集合映射到一个小集合,所以存在多个元素映射到同一个元素的情况,这种情况叫做“hash碰撞”,也就是hash值相同,将所有hash值相同的HashEntry放到这个链表中形成一个“hash桶”中。
Segment中的put、remove就是元素的实际修改方法,对ConcurrentHashMap的put、remove操作会委托到这里。
put操作:
public V put(K key, V value) {
Segment<K, V> s;// 数据段
if (value == null)// 空值检测
throw new NullPointerException();
// 取hash值,如果key为空,这里会抛异常
int hash = hash(key);
// 根据hash码 取段定位
int j = (hash >>> segmentShift) & segmentMask;
// 从segments拿到数据段,如果段为空,进入ensureSegment 会新建一个段出来
if ((s = (Segment<K, V>)
UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
s = ensureSegment(j);
}
return s.put(key, hash, value, false);
}
首先根据key的hash码取得分段的定位,拿到分段,如果没有hash码定位的分段则新建,然后将put操作委托给分段Segment。
Segment put操作:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试获取锁,获取成功返回node为null,
//否则说明有其他线程对此数据段进行更新操作
// 进入scanAndLockForPut,进行重试,
//如果重试重试次数大于MAX_SCAN_RETRIES进行lock
// 阻塞加锁,否则一直自旋,获得锁后返回。
HashEntry<K, V> node = tryLock() ?
null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K, V>[] tab = table;
// 获取table索引
int index = (tab.length - 1) & hash;
// 获取table索引为index的第一个HashEntry
HashEntry<K, V> first = entryAt(tab, index);
// 遍历table[index]上的HashEntry链
for (HashEntry<K, V> e = first;;) {
if (e != null) { // 如果HashEntry不为null
K k;
if ((k = e.key) == key
|| (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 如果存在key且hash相等,
// onlyIfAbsent为false,则更新旧值为value
e.value = value;
++modCount;// 修改数+1
}
break;
}
e = e.next;
} else {
// 节点不为null,
// 将node放在table[index]的HashEntry链的头部
if (node != null) {
node.setNext(first);
// 节点为null,
// 建新的HashEntry,放在链头,next指向链的原始头部
} else {
node = new HashEntry<K, V>(hash, key, value, first);
}
//元素数量
int c = count + 1;
//元素数量大于临界条件,且小于最大容量,
//对HashTable扩容,创建2倍原始容量的Hashable
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 添加node到table
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
首先尝试获取锁,如果获取锁失败,在scanAndLockForPut()方法中线性探测哈希桶,探测不到key对应的HashEntry,则创建一个新的HashEntry,一直重试MAX_SCAN_RETRIES次,还没有获取锁则放弃自旋使用阻塞方式获取锁。
如果在桶中找到KEY相同的节点,进行值变更,否则将新节点插入当前链表头。如果元素数量大于rehash临界值,进行重新hash新建一个为当前容器容量两倍的容器,将 table 指向新容器。
get操作:
public V get(Object key) {
Segment<K,V> s;//数据段
HashEntry<K,V>[] tab;//数据段中hash表
int h = hash(key.hashCode());//获取hash码
long u = (((h >>> segmentShift) &
segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
//段不为空且hash表不为空
!= null && (tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>)
UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h))
<< TSHIFT) + TBASE);
e != null; e = e.next) {//遍历hash桶
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
先用hash定位段,再用hash定位段中的table,如果HashEntry不存在hash桶,只需要两步就能取出非常高效。
而且过程不需要加锁,put/remove都是针对HashEntry内的变量和指针进行原子性的赋值操作。getObjectVolatile方法以Volatile方式获取变量,能确保变量的可见性,取出的都是最新值。
ConcurrentHashMap的并发性主要体现在锁分拆上,分段加锁使ConcurrentHashMap具有更高的吞吐量,落在每把锁上的请求频率、持有时间会降低。
ConcurrentSkipListMap
SkipList跳表是一种随机化的数据结构,基于并联的链表,其效率可比拟于二叉查找树。
图
跳表中的节点具有右向与下向指针。从第一层开始遍历,如果右端的值比期望的大,那就往下走一层,继续往前走,所以在列表中的查找可以快速的跳过部分列表,并因此得名。
ConcurrentSkipListMap就是基于SkipList结构实现的map,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。能像ConcurrentHashMap一样在并发环境下使用,又能像TreeMap一样使Key按照的自然顺序排序或者按照compareTo方法排序。
在并发环境下ConcurrentSkipListMap的性能比加锁的TreeMap高,逊于ConcurrentHashMap。调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个方法慎用。
码字不易,转载请保留原文连接https://www.jianshu.com/p/a7f50d1ed33f
网友评论