jdk1.8 ConcurrentHashMap
1,去除了Segment 分段锁
2,synchronized+cas 保证node节点线程安全问题
HashMap1.8与jdk1.8 ConcurrentHashMap 数据结构模型是一样的
concurrentHashMap 数组+链表+红黑树
concurrentHashMap对我们 index 下标对应的node 节点上锁
ConcurrenthashMap锁的竞争:
多个线程同时落到put key的时候。如果 多个key都落入到同一个index node节点的时候;
如果没有落到一个index node 节点不需要做锁的竞争。
注意:
只需要计算一次index的值。
区别:
hashtable 对我们整个Table 数组上锁。
jdk1.7 ConcurrentHashmap
1,基于Segment 分段锁设计
2,lock+cas 保证node节点线程安全问题
ConcurrenthashMap1.5 synchronized性能没有做优化 tryLock方法
需要计算2次index 值: 第一次计算存放哪个Segement 对象中。
第二次计算Segement 对象中哪个hashEntry<k,v>[]table;
使用传统的hashTable 保证线程安全问题,是采用synchronized 锁将整个hashTable中的数组锁住,在多个线程中只允许一个线程访问put或者get,效率非常低,但是能保证线程安全。
[图片上传失败...(image-c18fcf-1601685672405)]
jdk官方不推荐在多线程的情况下使用hashtable 或者hashmap 建议使用Concurrenthashmap 分段锁,效率非常高。
Concurrenthashmap 将一个大 的 hashmap 集合拆成n多个不同的小的hashtable(segement)。默认的情况下分成16个不同的segment每个segment中都有独立的hashEntry<k,v>[]table;
简单实现Concurrenthashmap:
package com.taotao.hashmap001;
import java.util.Hashtable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
*@author tom
*Date 2020/9/27 0027 7:19
*concurrentHashmap底层原理
*/
public class MyConcurrentHashMap<K, V> {
/**
* segments
*/
private Hashtable<K, V>[] segments;
public MyConcurrentHashMap() {
segments = new Hashtable[16];
}
public void put(K k, V v) {
//第一次计算index, 计算key存放在那个hashtable
int segmentIndex = k.hashCode() & (segments.length - 1);
Hashtable<K, V> segment = segments[segmentIndex];
if (segment == null) {
segment = new Hashtable<>();
}
segment.put(k, v);
segments[segmentIndex]=segment;
}
public V get(K k) {
//第一次计算index, 计算key存放在那个hashtable
int segmentIndex = k.hashCode() & (segments.length - 1);
Hashtable<K, V> segment = segments[segmentIndex];
if (segment != null) {
return segment.get(k);
}
return null;
}
public static void main(String[] args) {
MyConcurrentHashMap<String, String> myConcurrentHashMap = new MyConcurrentHashMap<>();
ConcurrentHashMap map=new ConcurrentHashMap();
for (int i = 0; i < 10; i++) {
myConcurrentHashMap.put(i + "", i + "");
}
for (int i = 0; i < 10; i++) {
myConcurrentHashMap.get(i + "");
System.out.println( myConcurrentHashMap.get(i + ""));
}
}
}
核心源码分析:
1,无参构造函数分析:
initialCapacity --16
loadFactor hashEntry<K,V>[]table; 加载因子 0.75
concurrnetcyLevel并发级别 默认是16
2,并发级别是能够大于2的16次方
if(concurrentlevel > MAX_SEGMENTS)
concurrentlevel =MAX_SEGMENTS;
3,sshift 左移位的次数 ssize 作用,记录segment数组大小。
int sshift=0;
int ssize=1;
while(ssize < concurrentcyLevel){
++sshift=0;
int ssize=1;
while(ssize<concurrencyLevel){
++sshift;
ssize<<=1;
}
}
###4,segmentShift segmentMask: ssize-1 做与运算的时候能够将key均匀存放:
this.segmentSHift=32-shift;
this.segmentmask=ssize-1;
###5.初始化 Segment0 赋值下标0 的位置
Segment<K,V>s0=new Segment<K,V>(loadFactory,(int)(cap *loadFactor),(hashEntry<K,V>[])new hashEntry[cap]);
####6 采用cas修改复制给Segment数组;
UNSAFE.putOrderedObject(ss,SBASE,s0);//or
put方法底层实现:
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
###计算key存放那个Segment数组下标位置;
int hash = hash(key);
int j = (hash >>> segmentShift 2 8) & segmentMask 15;
###使用cas 获取Segment[10]对象 如果没有获取到的情况下,则创建一个新的segment对象
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
### 使用lock锁对put方法保证线程安全问题
return s.put(key, hash, value, false);
0000 0000 00000 0000 0000 0000 0000 0011
0000 0000 00000 0000 0000 0000 0000 0011
深度分析:
Segment<K,V> ensureSegment(int k)
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
## 使用原型模式 将下标为0的Segment设定参数信息 赋值到新的Segment对象中
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
#### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
###创建一个新的Segment对象
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
###使用CAS做修改
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
###尝试获取锁,如果获取到的情况下则自旋
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
###计算该key存放的index下标位置
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
###查找链表如果链表中存在该key的则修改
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
###创建一个新的node结点 头插入法
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
###如果达到阈值提前扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
###释放锁
unlock();
}
return oldValue;
}
核心Api
GetObjectVolatile
此方法和上面的getObject功能类似,不过附加了'volatile'加载语义,也就是强制从主存中获取属性值。类似的方法有getIntVolatile、getDoubleVolatile等等。这个方法要求被使用的属性被volatile修饰,否则功能和getObject方法相同。
tryLock() 方法是有返回值的,他表示用来尝试获取锁,如果获取成功,则返回true,
如果获取失败(其他线程已经获取到锁),则返回false,这个方法无论如何都会立即返回,在拿不到锁的时候不会一直在哪里等待。
[图片上传失败...(image-748a07-1601689703313)]
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//计算该key存放的index下标位置 查找node节点 如果没有发生index冲突的
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//有多个线程同时修改 没有发生index冲突下标位置 cas修改。
//如果cas修改成功的话在退出自旋
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//synchronized 的时候:index已经发生冲突 使用node节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//转红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
binCount //记录数组长度 ,如果长度>8转为红黑树。
for (Node<K,V>[] tab = table;;) {} // 自旋
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //如果table为空 则初始化
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果发现有其他线程正在扩容的时候,当前线程释放cpu执行权
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//使用cas 修改当前的sizeCtl 为-1 sizeCtl 默认为0,-1表示其他的线程已经在扩容,-2表示2个其他线程再扩容
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//对table 默认长度为16
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
image.png
[图片上传失败...(image-572d22-1601775904418)]
网友评论