为什么使用ConcurrentHashMap?
首先要明白HashTable容器是通过使用关键字synchronized来保证线程安全的,但是HashTable的效率并不高,因为当线程1添加元素时,线程2不能获取和添加元素了,就是锁的细粒度不高,而ConcurrentHashMap容器是将容器进行分段,在每个段Segment上面上锁,这样细粒度就会增高,提高了并发的效率。
继承与实现关系
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable
ConcurrentHashMap的结构
20170912112223306.jpg解释:
ConcurrentHashMap是由Segment数组组成的,而Segment对象里面是包含HashEntry数组。而Segment类似于HashMap,是数组加链表的组合结构。所以如果我们想要修改HashEntry数组中的元素时,首先必须获得与它对应的Segment的锁。可以理解为ConcurrentHashMap为多个加锁的HashMap组成的。
ConcurrentHashMap的全局变量
//默认初始化容量为16
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认的加载因子为0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认并发级别为16(锁的个数)
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量为2^30次幂
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的分段容量为2(需要为2的n次幂)
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大分成2^16段
static final int MAX_SEGMENTS = 1 << 16;
//同步重试的次数
static final int RETRIES_BEFORE_LOCK = 2;
//片段偏移量
final int segmentMask;
//片段掩码
final int segmentShift;
//每个上锁的片段数组
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
HashEntry数据结构
static final class HashEntry<K,V> {
final int hash;//节点对应的hash值
final K key;//节点对应的键
volatile V value;//节点对应的值
volatile HashEntry<K,V> next;//后继结点
//初始化节点的构造器
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//设置当前节点的后继节点
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
}
Segment结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
//在准备锁定段操作之前,在预锁之前对预存器进行最大锁定次数
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//片段上的桶数组
transient volatile HashEntry<K,V>[] table;
//片段中桶的数量
transient int count;
//修改片段的次数
transient int modCount;
//临界值,如果实际大小超过了临界值,就使用容量*加载因子重新计算来扩容
transient int threshold;
//加载因子
final float loadFactor;
//通过加载因子,临界值,HashEntry节点来初始化片段
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}
Segment中的功能方法
rehash函数:将表的大小扩增,重新计算hash值,然后将指定的节点添加到新表中
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
//获取旧表
HashEntry<K,V>[] oldTable = table;
//获取旧表的容量
int oldCapacity = oldTable.length;
//计算新表的容量为旧表的2倍
int newCapacity = oldCapacity << 1;
//获取临界值,新容量乘以加载因子
threshold = (int)(newCapacity * loadFactor);
//构造新的HashEntry数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
//计算新的长度掩码
int sizeMask = newCapacity - 1;
//遍历旧容量
for (int i = 0; i < oldCapacity ; i++) {
//获取旧表的节点HashEntry ,也可以说获取每个桶
HashEntry<K,V> e = oldTable[i];
//如果该节点不是尾节点
if (e != null) {
//获取当前遍历节点的后继节点
HashEntry<K,V> next = e.next;
//通过当前节点的hash值与长度掩码的按位与运算获取下标索引
int idx = e.hash & sizeMask;
//如果后继节点为空,就是当前节点的后继节点为空,那么就是尾节点
if (next == null)
//给新的HashEntry数组对应的下标设置值
newTable[idx] = e;
else { //如果后继节点不为空
//初始化最后运行的节点为当前节点e
HashEntry<K,V> lastRun = e;
//初始化最后运行的节点为当前下标idx
int lastIdx = idx;
//遍历当前的桶
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
//计算当前的下标索引
int k = last.hash & sizeMask;
//如果当前下标不等于旧表下标
if (k != lastIdx) {
//设置最新遍历到的下标索引
lastIdx = k;
//设置最新遍历到的HashEntry节点
lastRun = last;
}
}
//新表设置最后运行的节点
newTable[lastIdx] = lastRun;
//遍历当前的桶,构造新的HashEntry单链表(头插法)
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
//获取HashEntry的值
V v = p.value;
//获取HashEntry的hash值
int h = p.hash;
//计算当前的下标索引
int k = h & sizeMask;
//获取下标索引对应的HashEntry节点值
HashEntry<K,V> n = newTable[k];
//构造新的HashEntry节点,作为下次的后继节点
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//获取节点的下标索引
int nodeIndex = node.hash & sizeMask;
//设置节点的后继结点
node.setNext(newTable[nodeIndex]);
//设置新表的下标索引nodeIndex值为node
newTable[nodeIndex] = node;
table = newTable;
}
put函数:往片段Segment存入元素的操作
//往片段Segment存入元素的操作
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//如果片段的锁已经被其他线程持有了,那么将获取null节点
//如果片段的锁没有被其他线程持有,那么将直接获取节点链中对应的节点
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//获取片段上的桶数组(旧)
HashEntry<K,V>[] tab = table;
//计算下标索引
int index = (tab.length - 1) & hash;
//获取给定表tab中指定下标index对应的头节点first
HashEntry<K,V> first = entryAt(tab, index);
//遍历节点链表
for (HashEntry<K,V> e = first;;) {
//如果遍历到的节点不为空
if (e != null) {
K k;
//如果键相同,或者hash值相同和键值相等
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//获取键对应的节点值
oldValue = e.value;
//只有当值在节点链中存在的时候
if (!onlyIfAbsent) {
//新值覆盖旧值
e.value = value;
//修改片段次数加1
++modCount;
}
//跳出循环
break;
}
//遍历下个后继节点
e = e.next;
}
else {//如果遍历到的节点为空
//如果node节点不为空
if (node != null)
//设置node节点的后继结点为first
node.setNext(first);
else//如果node节点为空
//重新构建一个HashEntry节点
node = new HashEntry<K,V>(hash, key, value, first);
//将片段中桶的数量加1
int c = count + 1;
//如果片段的容量大于了临界值并且表的长度小于最大容量
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//将表的大小扩增,重新计算hash值,然后将指定的节点添加到新表中
rehash(node);
else//为指定表的指定下标设置节点值node
setEntryAt(tab, index, node);
//片段的修改次数加1
++modCount;
//重新设置片段中的桶的数量
count = c;
//释放临时变量
oldValue = null;
//跳出循环
break;
}
}
} finally {
//将当前线程获取的锁释放
unlock();
}
//返回替换的值
return oldValue;
}
编写思路:
①首先判断当前片段是否被其他线程所持有,然后获取节点链HashEntry的头节点,准备遍历单链表,然后分为没有遍历到尾节点情况和遍历到尾节点的情况。
②没有遍历到尾节点;如果出现键相同或者键值相同并且hash值相同,那么就覆盖节点链中的旧值,全局变量修改片段次数modCount加1,跳出循环。如果出现键不同或者键值不同,那么将继续往下遍历节点链(桶)。
③遍历到尾节点;如果当前片段并没有被其他线程持有,那么将节点node插入到头节点前面(头插法)。如果当前片段为空,那么就需要重新构建一个节点链了。然后判断片段Segment中桶的数量是否大于临界值(threshold = capicity乘loadFactor),如果大于了临界值,将桶的容量扩大为原来的2倍。(值得注意的是先添加元素,再判断容量是否超出,如果达到临界值,而无元素添加,就会产生一次无效的扩容)。
ConcurrentHashMap的构造方法
//构造一个带有默认初始化容量、默认加载因子、默认并发级别的空映射
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//构造一个带有指定容量、默认加载因子、默认并发级别的空映射
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//构造一个带有指定容量、指定加载因子、默认并发级别的空映射
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
//通过一个指定的映射关系Map来构造一个映射
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
//构造一个指定容量、指定加载因子、指定并发级别的新映射
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//参数校验,如果不满足条件就抛出非法参数异常
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//如果并发级别大于最大分段数,那么并发级别设置为最大分段数(2的16次方)
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
//变量sshift用于记录ssize增加的次数
int sshift = 0;
//变量ssize用于记录小于锁的数量的最大偶数
int ssize = 1;
//循环遍历将ssize设置为2的sshift次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//设置片段掩码
this.segmentShift = 32 - sshift;
//设置片段偏移量
this.segmentMask = ssize - 1;
//如果初始化容量大于最大容量,将最大容量设置为初始化容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//初始化容量除以锁的数量得到分段数量
int c = initialCapacity / ssize;
//设置分段数量,如果 initialCapacity / ssize有余数,那么c将会额外加1一次
if (c * ssize < initialCapacity)
++c;
//获取最小分段数量
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//如果最小分段数量小于分段数量将循环,最小分段数量将以2倍的方式递增
while (cap < c)
cap <<= 1;
//初始化数组中每个元素对应的片段Segment对象
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
//有ssize个锁,那么就构建ssize个Segment片段
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
ConcurrentHashMap中常用的方法
注意:按位与运算
0&0=0; 0&1=0; 1&0=0; 1&1=1;
按位与运算只有两位同时为1时才为1
二次hash函数:
/**
* 当我们存入元素时需要定位到对应的Segment时,而一次hash的结果是通过按位与运算。
* 我们知道按位与运算的特点就是如果hash值的低位相同,无论高位是什么,结果总是一样的。
* 这个函数是为了二次hash计算,这样就会减少冲突
*/
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
解释:在前面的Segment的rehash函数中,我们发现key的计算是通过HashEntry节点的hash值和长度掩码sizeMask(sizeMask低位都是1,高位都是0)的按位与运算获得。hash值和长度掩码的与运算最后的结果完全都依赖于低位了,散列值冲突严重,采用二次hash运算是为了避免这种冲突,保持hash冲突的结果降低。
put方法:将指定的键映射到该表中指定的值
//将指定的键映射到该表中指定的值
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
//参数校验,如果存入的值value为空,那么将抛出空指针异常
if (value == null)
throw new NullPointerException();
//通过键key生成hash值
int hash = hash(key);
//hash值右移片段偏移量个位,然后按位与片段掩码
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//通过给定的索引获取片段
s = ensureSegment(j);
//将元素存入到片段
return s.put(key, hash, value, false);
}
get方法:如果映射不包含键的映射,则返回指定键被映射到的值
//如果映射不包含键的映射,则返回指定键被映射到的值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
//通过键二次hash生成hash值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//遍历片段中的节点链HashEntry
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//如果键存在于节点HashEntry链中,那么就返回HashEntry节点值
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
阅读总结
(1)ConcurrentHashMap是一种采用分段锁的方式来实现的。
(2)ConcurrentHashMap默认初始化容量为16,默认加载因子为0.75。
(3)ConcurrentHashMap中HashEntry节点链采用单链表数据结构。
(4)ConcurrentHashMap的put方法是采用头插法,先插入值,再判断是否超出临界值。如果超出了临界值,将扩容为原来的2倍。
--------------------------------------该源码为jdk1.7版本的
网友评论