ConcurrentHashMap 采用的是分段锁的机制来保证并发安全的,个中有着Segment、HashEntry等重要属性。
ConcurrentHashMap会有多个segment,每个segment性质都是一把锁。
Segment继承自ReentrantLock,它就是一把锁,里面有个final Segment<K,V>[] segments;这样的数组,即使用Segment数组来进行并发安全的,后续会讲,这里不展开。
而HashEntry类似于HashMap的Entry,它们都有key,value,next、hash这四个属性,其中HashEntry中的value、next都是使用volatile修饰的,为其增加内存可见性的功能。
代码结构一览

结构模型

重要属性
segment
scanAndLockForPut中自旋循环获取锁的最大自旋次数
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
存储着一个HashEntry类型的数组
transient volatile HashEntry<K,V>[] table;
存放元素的个数,这里没有加volatile修饰,所以只能在加锁
或者确保可见性(如Unsafe.getObjectVolatile)的情况下进行访问,不然无法保证数据的正确性
transient int count;
存放segment元素修改次数记录,由于未进行volatile修饰,所以访问规则和count类似
transient int modCount;
当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
transient int threshold;
/负载因子
final float loadFactor;
hashEntry
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
//只列出主要部分
源码:
初始化
concurrencyLevel并发级别 默认16,loadFactor负载因子默认0.75,initialCapacity默认初始化大小是16
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//初始化segment[0],后续其他segment下标会以它的地址为基准
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//SBASE,即Segment[0]的地址,内存偏移量
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
查看其初始化代码,最终还是调用 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel),这里ssize是决定segment数组大小的,ssize又是由concurrentLevel决定的,由上面的代码可知,ssize是等于靠近concurrentLevel的最小的2次幂数值。如果concurrent=15,那么它就是16,如果是31就是32,33就是64。
sshift记录就是ssize的左移次数。
这两参数后续有用,用于put数据
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
cap就是Segment底下的数组长度,它是2次幂数,threshold也是cap * loadFactor,和Hashmap相近。
从源码最后可以看,这里会初始化Segment[0],用作参考,其初始化后还用Unsafe类,注释写明: ordered write of segments[0]。
put
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//进行hash扰动,得到hash值
int hash = hash(key);
// 位运算得到key到底是属于哪个下标
int j = (hash >>> segmentShift) & segmentMask;
//s就是所在的segment[j],可以看到根据SBASE+偏移量得到了内存地址从而拿到segment[j]
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//如果segment[j]为null,就初始化它
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
可见看到,value是不能为null,否则扔异常,这点和HashMap有区别,需要注意。将key的进行hash,得到其hash值,再根据这个hash值进行位运算,得到下标index,再根据index偏移内存地址得到segment[j],如果这个segment[j]==null,就创建它,执行ensureSegment(j);
private Segment<K,V> ensureSegment(int k) {
//k 就是传进来的j,下标index
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//注意这个是getObjectVolatile,保证读线程的内存可见性的,确保获取是最新,外面的只是getObject
//如果该segment[k]等于空,就初始化,它的threshold、hashEntry[]、capacity都是和segment[0]一样的,所以才说segment[0]是用于后续index创建做参考的
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//这里再一次判断,为求正确初始化,所以多次检查,下面还是自旋+cas的方式
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
这个方法确保该segment[j]创建成功,但这里需要注意的是,seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u),这里使用的是UNSAFE.getObjectVolatile,而不是外面方法所用的UNSAFE.getObject,多了个volatile,保证多个线程之间共享数据的可见性,保证在主内存中拿数据,而UNSAFE.getObject是不保证多线程之间的可见性的。这里是防止其他线程已经初始化好而又重新初始化,覆盖掉数据。这里还是多次检查,因为是在无锁状态。
创建好之后就返回该segment[j],执行return s.put(key, hash, value, false);
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//拿到锁,node是空,拿不到锁之后说,这里先不展开,
//总之拿不到锁,
//① 要么就是new Node等到拿到锁跳出来进行头插法;
//② 要么就是key是相等的,拿到锁出来替换
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//熟悉的算法获取下标index
int index = (tab.length - 1) & hash;
//根据index计算偏移量,得到这个segment下的hashEntry[index]的头结点
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//当前结点不为空,看是不是相同key,
/*① 如果是,onlyIfAbsent是false就是替换返回null,如果onlyIfAbsent是true就返回旧值,
因为有个putIfAbsent方法,如果缺失才会补位。回归正题,之后结束并返回,最后解锁。*/
/*② 如果不是,接着遍历这个hashEntry[index]的单链表*/
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
/*当前节点是空,先说拿到锁的,node==null,新建一个hashEntry 在这个hashEntry[index]的单链表的头部挂上*/
else {
if (node != null)
node.setNext(first);
else
//目前先分析node==null的情况,这里fitst=e,e成了next,头插法
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//大于threshold阈值就扩容
rehash(node);
else
//该hashEntry[index]所在的内存偏移地址就是new HashEntry
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
获取HashEntry[i]的内存偏移地址
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
将新的hashEntry==e ,设置在HashEntry[i]的内存偏移地址上的值,即头部
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
总的流程就是:权当是首次就拿到了锁(已经确定是哪个segment了),根据key的hash计算得到hashEntry是哪一个,记hashEntry[index],
① 如果上面没有数据,要么就new HashEntry挂上去,要么相同key替换。
②如果有数据,只有一个数据相同就替换,多于一个那就是单链表,还是遍历相同就替换,不同就是头插法新增。
偏移量模型图:
segment[idx]和 HashEntry[index]的区别如下:

接着说首次拿不到锁的
进入 scanAndLockForPut(key, hash, value); 方法
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
//传入hash,计算index,获取这个hashEntry[index]的headNode内存偏移地址
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
HashEntry<K,V>[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//entryHash方法上面说过,获取这个hashEntry[index]的headNode内存偏移地址
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node、
//拿不到锁死循环
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
/* hashEntry[index]的headNode是null,就是新建一个hashEntry,继续循环,此时retries=0 ,进入else */
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
/* headNode不是null,但key和传入的一样,retires=0,继续循环;
如果不一致,retires不动,继续下一个,无非就是挨个查下这个单链表有无相同key的。
知道找不到,就去外面的else */
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
/* 如果重试次数 大于 最大重试次数,MAX_SCAN_RETRIES单核和1,多核为64,
如果64次都拿不到锁,就lock,入同步队列,这是很极端的情况。*/
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
/* 重试次数还没有大于最大重试次数,如果retries为偶数,就重新获取HashEntry链表的头结点*/
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
//拿到锁就出去,node无非就是null,或者newNode,null是遇到相同key,否则就是newNode
return node;
}
简述一下流程,里面还是尝试获取锁,如果拿锁失败,就执行以下动作,看看这个hashEntry[index]:
① 如果上面没有数据,要么就new HashEntry,之后不断重试尝试拿锁,拿到就带着 newNode 出去。
②如果有数据,遍历是否有相同key:
- 有:啥都不干,等待拿锁;
- 没有:还是newNode,等到拿锁出去。
出去后还是还是回到原来上面的代码,这里再贴一次。记得返回的Node无非就是 null、或者 new HashEntry
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//拿到锁,node是空,拿不到锁之后说,这里先不展开,
//总之拿不到锁,
//① 要么就是new Node等到拿到锁跳出来进行头插法;
//② 要么就是key是相等的,拿到锁出来替换
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//熟悉的算法获取下标index
int index = (tab.length - 1) & hash;
//根据index计算偏移量,得到这个segment下的hashEntry[index]的头结点
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//当前结点不为空,看是不是相同key,
/*① 如果是,onlyIfAbsent是false就是替换返回null,如果onlyIfAbsent是true就返回旧值,
因为有个putIfAbsent方法,如果缺失才会补位。回归正题,之后结束并返回,最后解锁。*/
/*② 如果不是,接着遍历这个hashEntry[index]的单链表*/
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
/*当前节点是空,先说拿到锁的,node==null,新建一个hashEntry 在这个hashEntry[index]的单链表的头部挂上*/
else {
if (node != null)
//node不是null,还是头插法
node.setNext(first);
else
//node==null的情况,new HashEntry,这里fitst=e,e成了next,头插法
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//大于threshold阈值就扩容
rehash(node);
else
//该hashEntry[index]所在的内存偏移地址就是new HashEntry
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
出来了还是先拿到HashEntry[index]的headNode,相同key就替换,不同就直接else,headNode是不会空的,因为之前都抢锁失败了,所以早就被别的线程设置好了,所以直接到这里,执行头插法即可。
if (node != null)
//node不是null,还是头插法
node.setNext(first);
else
//node==null的情况,new HashEntry,这里fitst=e,e成了next,头插法
node = new HashEntry<K,V>(hash, key, value, first);
扩容
如果当前segment元素数量操作阈值且数组长度小于最大值的时候,就要进行扩容,记住这个操作,是已经拿到锁的了。
/**
*扩容方法
*/
private void rehash(HashEntry<K,V> node) {
// 旧的table
HashEntry<K,V>[] oldTable = table;
// 旧的table的长度
int oldCapacity = oldTable.length;
// 扩容原来capacity的一倍
int newCapacity = oldCapacity << 1;
// 新的阈值
threshold = (int)(newCapacity * loadFactor);
// 新的table
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码
int sizeMask = newCapacity - 1;
// 遍历旧的table
for (int i = 0; i < oldCapacity ; i++) {
// table中的每一个链表元素
HashEntry<K,V> e = oldTable[i];
if (e != null) { // e不等于null
HashEntry<K,V> next = e.next; // 下一个元素
int idx = e.hash & sizeMask; // 重新计算位置,计算在新的table的位置
if (next == null) // Single node on list 证明只有一个元素
newTable[idx] = e; // 把当前的e设置给新的table
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e; // 当前e
int lastIdx = idx; // 在新table的位置
for (HashEntry<K,V> last = next;
last != null;
last = last.next) { // 遍历链表
int k = last.hash & sizeMask; // 确定在新table的位置
if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
lastIdx = k; // 记录变化位置
lastRun = last; // 记录变化节点
}
}
// 以下把链表设置到新table分为两种情况
// (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
// (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
// ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
// 下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 处理扩容时那个添加的节点
// 计算位置
int nodeIndex = node.hash & sizeMask; // add the new node
// 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
node.setNext(newTable[nodeIndex]);
// 设置位置
newTable[nodeIndex] = node;
// 新table替换旧的table
table = newTable;
}
流程还是创建一个newTab,大小为oldTab的2倍,之后转移数据,遍历oldTab,如果oldTab的headNode为null,即它只有一个元素,计算好基于newTab的hash,挂上去newTab[index]的headNode即可。多于一个的话,迭代找到和headNode不同的index,其实这里仅仅会分为2种index,1种是oldTab的原位置,另1种是oldTab位置+oldTab.length,和hashmap1.8是何其相似,就像高低位单链表一样。,最后将newNode的key计算好index,添加进HashEntry即可。
这里展开说一下,2个问题:
-
oldTab的headNode为null,即它只有一个元素,计算好基于newTab的hash,挂上去newTab[index]的headNode即可,为什么?
-
为什么要记录lastRun、lastIdx,最后将不是lastRun的节点遍历,头插法连起来?
首先,我们需要明确一下,这里不同于hashmap1.7,扩容的时候,oldTab的Node中的key是需要重新调用用hash()方法来获得新的hash。concurrentHashMap直接就是用初始已经设置好的hash属性来进行计算index的,即hash & (newTab.length -1)。
试想一下,无论oldTab.length 还是 newTab.length ,它们都是2次幂,我们就打个比方,oldTab.length = 16 , 那么它的二进制数就是:0000 1111 ,低4位全是1,它的table就只有16个栅位,所以它低4位就是用来参与运算,计算出index的,同理newTab也是,不过它是 0001 1111 ,它的table有32个栅位,它是用低5位来参与运算的 。
我们的HashEntry的hash是不变的,比如它在oldTab的位置是9,那么它低4位用二进制表示就是1001,第5位 不是0 就是 1,而现在 (newTab.length -1) = 0001 1111 ,进行 & 运算,要么结果还是9,要么就是9 + 16 = 25。
所以啊,现在回答第1个问题:假如HashEntry[] = 2,扩容后 hashEntry[] = 4 ,原本index=0的元素,要么就是分配到index=0,要么分配到index=2,而当前index上只有1个元素, 所以直接挂就行了,根本不会出现覆盖的可能。
关于第2个问题:这时候因为不是只有一个headNode,而是单链表了,按照前面说的分为高低链,headNode的位置Idx是一类,lastIdx是另一类。lastRun是另一类的最后一个节点,这时候两类的头结点都找出来了,并赋予到各自在newTab[index]和 newTab[index+oldTab.length] 中,之后从该HashEntry的头结点开始,除去lastRun,进行头插法转移数据。这样应该效率会提高,提前找出了各自在newTab中的位置。
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
get
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
没有加锁,效率高
注意:get方法使用了getObjectVolatile方法读取segment和hashentry,保证是最新的,具有锁的语义,可见性
分析:为什么get不加锁可以保证线程安全
(1) 首先获取value,我们要先定位到segment,使用了UNSAFE的getObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment.
(2) 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table.
(3) 然后我们获取具体的hashentry,也时使用了UNSAFE的getObjectVolatile具有读的volatile语义,然后遍历查找返回.
(4) 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可.
size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// RETRIES_BEFORE_LOCK = 2,从-1开始,重复操作3次,进行修改次数modCount的判断
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
//通过下标 j 获得 segment[j] 的内存偏移地址
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
}
这里有个重要的属性modCount用于判断,modCount在replace、put、remove、clear方法中都会有(++modCount)的操作,表明对segment做出过修改。遍历segment[],尝试3次,如果modCount都没发生变化,即证明尝试阶段没有线程修改这个concurrentHashMap,就不用加锁了,能提高性能。最后能得出concurrentHashMap的总size。
执行流程
(1) 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum.
(2) 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环.
(3) 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环.
(4) 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size.(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞).
总结
- ConcurrentHashMap是如何保证线程安全的?
- 从读(get)的层面,利用Unsafe.getObjectVolatile保证取到的是最新的Segment&HashEntry<K,V>[i],保证写线程对读线程的可见性,从而保证线程安全
- 从写(put)的层面,利用ReentrantLock 可重入锁保证每次只有一个线程对Segment[i]做数据插入,从而保证了线程安全。
- 从扩容(rehash)的层面,因为只有put的时候才会调用到rehash,同样利用ReentrantLock 可重入锁保证线程之间的互斥性,从而保证线程安全。
- 从计数(size)层面,put的时候进行自增,统计的时候先尝试不加锁统计,当计数期间结果改变的时候再采用ReentrantLock 可重入锁保证统计期间其它线程无法更改数据,从而实现线程安全。
- ConcurrentHashMap什么情况下才会触发扩容?
- 当前Segment[i]的元素数量超过阈值(负载因子*初始数量),且数组长度小于最大长度时
- 是否支持并发扩容?
- 不支持,扩容的时候定位到相同Segment,准备做数据更改的线程会阻塞。
- 扩容的时候是否支持并发读写?
- 扩容的时候支持并发读。
- 扩容的时候不支持并发写(同一个Segment),此时其它往这个Segment写数据的线程会阻塞。
参考:
ConcurrentHashMap 源码浅析 1.7
并发编程之ConcurrentHashMap源码解读-1.7
网友评论