by shihang.mai
1. HashMap
1.1 jdk1.7
数据结构:数组+链表
在new的时候并没有分配内存空间,在put的时候再分配。
默认数组长度:16(1<<4),可以随便设置,会将值向上取最近的 2^n 值(函数:roundUpToPowerOf2)
为什么必须为2^n?
- 在key的hash的时候,做了各种操作,就是为了减少hash碰撞。获取到key的hash值后,做取模运算获取到数组下标。但实际上源码用的
/*
* 如21,放到长度为16的数组中,21%16=5
* 21的2进制表示:00010101
* 15的2进制表示:00001111
* 做&运算结果: 00000101
*/
int h = hash(key)
return h&(length-1)
这样做和取模效果一样,效率更高
- 扩容为2倍本来长度,并且扩容时将原来的元素重新分配到新的hashMap中,在重新分配的过程中,只需要判断2进制的最高位是0还是1即可。
- 如果是0,表示新数组和旧数组的位置不变,直接插入
- 如果是1,只需将旧的数组位置+旧的数组长度=新数组的下标
还是上面例子,16扩容到32,原本元素下标值为5
21的2进制表示:0001(最高位)0101
31的2进制表示:00011111
做&运算结果:00010101 即21,符合:当最高位为1时,原本元素下标值(5)+原本数组长度(16)=新数组下标(21)
1.1.1 死循环
插入元素时,使用头插法,导致死循环,死循环会导致CPU100%。
在扩容时,原来链表A->B->C 会变为C->B->A。
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
//第一步
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[I];
newTable[i] = e;
e = next;
}
}
}
出现死循环原因如下图
jdk7死循环
1.2 jdk1.8
数据结构:数组+链表+红黑树
插入采用尾插法,解决死循环问题
//大于等于这个值-1转为红黑树。取这个值的原因:泊松分布
static final int TREEIFY_THRESHOLD = 8;
//少于等于这个值-1转为链表
static final int UNTREEIFY_THRESHOLD = 6;
在获取key的hash时
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
使用扰动函数(让高位参与运算,减少哈希碰撞)
2. ConcurrentHashMap
2.1 jdk1.7
sXyWLj.pngConcurrentHashMap由一个Segment数组组成,每个Segment里包含一个HashEntry数组,每个HashEntry又包含下一个HashEntry。即每个Segment都是一个HashMap
2.1.1 初始化:
Segment默认数组大小=16,HashEntry默认数组大小=2
2.1.2 put
static final class Segment<K,V> extends ReentrantLock implements Serializable
Segment继承了ReentrantLock,即带有锁功能,当我们put值进去时,需要两次hash定位
- 第一次hash,定位Segment数组的位置
- 第二次hash,定位HashEntry数组的位置
在定位到位置后,找到需要插入数据的位置(链表的尾端),在插入数据前,tryLock()去获取Segment的锁
- 成功获取锁,插入数据
- 获取锁失败,表明有其他线程获取到Segment的锁,那么当前线程用CAS的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒
2.1.4 get
大体过程:经过两次hash定位,然后遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null
public V get(Object key) {
int hash = hash(key); // throws NullPointerException if key null
return segmentFor(hash).get(key, hash);
}
transient volatile int count;
V get(Object key, int hash) {
if (count != 0) { // ①
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null) // ②
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
- 先判断一下 count != 0,count变量表示当前Segment中存在HashEntry的个数。如果为0直接返回null(
count用了volatile 修饰,保证了可见性
) - 找到了HashEntry数组的位置后,顺着链表一直比较下去找到该entry。当找到entry的时候,先做了一次比较:if (v != null),
- 如果不等于空,直接返回值,
- 如果等于空,那么就加锁去读
没有锁同步的话,靠什么保证同步呢?
通过HashEntry数据结构保证的,volatile value,其他全部是final的
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
考虑一下,如果这个时候,另一个线程恰好add or del or update 了HashEntry,会如何?
- 在get代码的①和②之间,另一个线程修改了一个HashEntry的value
因为有volatile修饰value,保证了内存可见性,所以没关系 - 在get代码的①和②之间,另一个线程新增了一个entry
因为每个HashEntry中的next也是final的,没法对链表最后一个元素增加一个后续entry所以新增一个entry的实现方式只能通过头结点来插入了
插入值
C对象是通过 new HashEntry(K k , V v, HashEntry next) 来创建的。new 这个C对象时,当前线程来get它。因为没有同步,就可能会出现当前线程得到的C对象是一个没有完全构造好的对象引用(详情看DCL
)
所以才需要判断一下:if (v != null) 如果确实是一个不完整的对象,则使用锁的方式再次get一次
有没有可能会put进一个value为null的entry? 不会的,已经做了检查,这种情况会抛出异常 -
在get代码的①和②之间,另一个线程删除了一个entry
sXboDI.png
当我么删除C的时候,因为HashEntry中的next是final的,那么必须创建一个新的链表,遍历C之前的元素,头插法,所以B,A交换了位置,A的next指向D
如果我们get的也恰巧是e3,这时另一个线程就执行了删除e3的操作,而我们线程还会继续沿着旧的链表找到e3返回。这里没有办法实时保证了。
因为其他线程的“改”和“删”对我们的数据都不会造成影响,所以只有对“新增”操作进行了安全检查,就是②处的非null检查这样做减少了使用互斥锁对并发性能的影响
2.1.5 size
1、第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的
2、第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回
2.2 jdk1.8
整个看起来就像是优化过且线程安全的HashMap
-
改进一:取消Segments(虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本),直接采用 volatile Node数组保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
-
改进二:数组+单向链表------>数组+单向链表+红黑树的结构
并发控制使用Synchronized和CAS来操作,
Node:它是一个链表,并且只提供了find功能
TreeNode:是红黑树的数据的存储结构
TreeBin:TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制
2.2.1 初始化
什么都没做
public ConcurrentHashMap() {
}
2.2.2 put
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//没条件循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//1.
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//2.
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//3.扩容
tab = helpTransfer(tab, f);
else {
//4. 插入值
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//5.转红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
这个put的过程很清晰,对当前的Node数组进行无条件自循环直到put成功
- 如果没有初始化就先调用initTable()方法来进行初始化过程
- 如果没有hash冲突就直接CAS插入
- 如果还在进行扩容操作就调用helpTransfer()
- 如果存在hash冲突,就加sync来保证线程安全,这里有两种情况,
- 一种是链表形式就直接遍历到尾端插入
- 一种是红黑树就按照红黑树结构插入
- 最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环
- 如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容
2.2.2.1 initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 变量sizeCtl如果<0,表明有其他线程在初始化,那么挂起当前线程
- 如果没其他线程在做初始化,那么cas设置sizeCtl=-1
- sc是记录下次扩容的大小
- 默认初始化容量=16
2.2.2.2 helpTransfer()
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
将Node数组按线程数量划分区域处理,每个线程倒序处理,并把处理过的节点标记为ForwardingNode,其中处理时,会分两类处理,一并迁移.
- 当高位为0时,则位置不变
- 当高位为1时,位置=原位置+旧数组长度
2.2.3 treeifyBin()
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
如果整个Node数组的长度小于64,就扩容至原来的一倍,不转红黑树了
2.2.4 get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 计算hash值,定位到该Node数组位置,如果是首节点符合就返回
- 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
- 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null
2.2.5 size
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
直接返回数量
3. HashMap的哈希函数设计
jdk1.8
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
hash 函数是先拿到通过 key 的 hashCode ,是 32 位的 int 值,然后让 hashCode 的高 16 位和低 16 位进行异或操作。
jdk1.7
static int hash(int h) {
// This function ensures that hashCodes that differ only by
// constant multiples at each bit position have a bounded
// number of collisions (approximately 8 at default load factor).
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
分析文章:https://www.iteye.com/topic/709945
无论是哪个版本,目的都是:
- 一定要尽可能降低 hash 碰撞,越分散越好;
- 算法一定要尽可能高效,因为这是高频操作, 因此采用位运算;
4. 解决hash冲突方法
- 再哈希法
顾名思义,就是对hash出来的值再hash - 开放地址法
如果冲突,那么通过算法,在算出的值的前后若干位置寻找位置。这个和上面的方法差别不大 - 建立公共溢出区
就是把冲突的值放到另外一块区域 - 拉链法
jdk就是用这种的。冲突的话,直接以链表的方式存储
5. 计算hash时,用异或运算的好处
保证了对象的 hashCode 的 32 位值只要有一位发生改变,整个 hash() 返回值就会改变。尽可能的减少hash碰撞,让链表尽量短
6. HashMap的put过程
Hashmap的put过程.png版本 | 插入方式 | 扩容后处理方式 |
---|---|---|
1.7 | 尾插法 | 按扩容后的规律计算,高位为0保留原位置,高位为1,新位置=原位置+原数组长度 |
1.8 | 头插法 | 重新计算 |
7. 当链表长度为8转换为红黑树的意义
红黑树的平均查找长度是log(n),长度为8的时候,平均查找长度为3,如果继续使用链表,平均查找长度为8/2=4,所以,当链表长度 >= 8时 ,有必要将链表转换成红黑树
8. 使用红黑树而不是用二叉树
- 二叉查找树,在极端情况下,会变成线性结构,即一边特别长,遍历查找会非常慢
- 红黑树在插入新数据后可能需要通过左旋,右旋、变色这些操作来保持平衡,引入红黑树就是为了查找数据快
- 红黑树属于平衡二叉树,但是为了保持“平衡”是需要付出代价的,但是该代价所损耗的资源要比遍历线性链表要少,所以当长度大于8的时候,会使用红黑树,如果链表长度很短的话,根本不需要引入红黑树,引入反而会慢
9. 红黑树的性质
红黑树是一种自平衡的二叉查找树,是一种高效的查找树
所以它具备二叉树的性质:
- 若任意节点的左子树不空,则左子树上所有结点的值均小于它的根结点的值;
- 若任意节点的右子树不空,则右子树上所有结点的值均大于它的根结点的值;
- 任意节点的左、右子树也分别为二叉查找树。
- 没有键值相等的节点
红黑树特有的性质:
- 每个结点要么是红的要么是黑的
- 根结点是黑的
- 所有叶子都是黑色(叶子是NIL节点)
- 每个红色节点必须有两个黑色的子节点
- 从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点
10. 采用 hashcode 的高 16 位和低 16 位异或能降低 hash 碰撞
原因,例用反正法:
如果不这样的话,用hashcode&(length-1),而length是2的N次方,2进制低位都是1,那么
- 做&运算,只有hashcode的地位参与了运行,那么冲突的概率会增加
- 如果将高位也参与运算,混合原始哈希码的高位和低位,以此来加大低位的随机性。而且混合后的低位掺杂了高位的部分特征,这样高位的信息也被变相保留下来。
参考:
https://blog.csdn.net/xingxiupaioxue/article/details/88062163
网友评论