本系列文章所描述的所有类或接口都是基于 JDK 1.7的源码,在其它 JDK 的实现方式中可能会有所不同。
一、实现方式
ConcurrentHashMap 是线程安全的 HashMap 的实现,其实现方式如下。
二、创建 ConcurrentHashMap
和 HashMap 一样,它同样有 initialCapacity 和 loadFactor 属性,不过还多了一个 concurrencyLevel 属性,在调用空构造器的情况下,这三个属性的值分别设置为 16、0.75及16。
在设置了以上三个属性后,基于以下方式计算 ssize 值:
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
在 concurrencyLevel 为 16 的情况下,最终计算出的 ssize 为 16,并使用此 ssize 作为参数传入 Segment 的 newArray 方法,创建大小为 16 的 Segment 对象数组,接着采用如下方法计算 cap 变量的值:
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
根据以上参数值,计算出的 cap 为 2,最后为 Segment 对象数组创建 Segment 对象,传入的参数为 cap 和 loadFactory。Segment 对象继承 ReentrantLock ,在创建 Segment 对象时,其所做的动作为创建一个指定大小为 cap 的 HashEntry 对象数组,并基于数组的大小以及 loadFactory 计算 threshold 的值:threshold=(int)(cap * loadFactor)。
三、保存值:put(Object key, Object value)
ConcurrentHashMap 并没有在此方法上加上 synchronized,首先判断 value 是否为 null,如为 null 则抛出 NullPointerException,如不为 null,则继续下面的步骤。
和 HashMap 一样,首先对 key 进行 hash 操作,得到 key 的 hash 值。hash 操作的算法和 HashMap 也不同。
根据此 hash 值计算并获取其对应的数组中的 Segment 对象,方法如下:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
在找到了数组中的 Segment 对象后,接着调用 Segment 对象的 put 方法来完成当前操作。
当调用 Segment 对象的 put 方法时,首先进行 lock 操作,接着判断当前存储的对象个数加1后是否大于 threshold。如大于,则将当前的 HashEntry 对象数组大小扩大两倍,并将之前存储的对象重新 hash,转移到新的对象数组中。在确保了数组的大小足够后,继续下面的步骤。
接下去的动作则和 HashMap 基本一样,通过对 hash 值和对象数组大小减 1 的值进行按位与操作后,得到当前 key 需要放入数组的位置,接着寻找对应位置上的 HashEntry 对象链表是否有 key、hash 值和当前 key 相同的。如有,则覆盖其 value,如没有,则创建一个新的 HashEntry 对象,复制给对应位置的数组对象,并构成链表。
完成了上面的步骤后,释放锁,整个 put 动作得以完成。
根据以上分析可以看出,ConcurrentHashMap 基于 concurrencyLevel 划分出了多个 Segment 来对 key-value 进行存储,从而避免每次 put 操作都得锁住整个数组。在默认的情况下,最佳情况下可以允许 16 个线程并发无阻塞的操作集合对象,尽可能地减少并发时的阻塞现象。
四、删除操作:remove(Object key)
首先对 key 进行 hash 操作,基于得到 hash 值找到对应的 Segment 对象,调用其 remove 方法完成当前操作。
Segment 的 remove 方法进行加锁操作,然后对 hash 值和对象数组大小减 1 的值进行按位与操作,获取数组上对应位置的 HashEntry 对象,接下去遍历此 HashEntry 对象及其 next 对象,找到和传入的 hash 值相等的 hash 值,以及 key 和传入的 key equals 的 HashEntry 对象。如未找到,则返回 null。如找到,则重新创建 HashEntry 链表中位于删除元素之前的所有 HashEntry,位于其后的元素则不用处理。
最后释放锁,完成 remove 操作。
五、获取操作:get(Object key)
首先对 key 进行 hash 操作,基于其值找到对应的 Segment 对象,调用其 get 方法完成当前操作。
Segment 的 get 方法首先判断当前 HashEntry 对象数组中已存储的对象是否为 0 ,如为 0 ,则直接返回 null,如不为 0 ,则继续下面的步骤。
对 hash 值和对象数组大小减 1的值进行按位与操作,获取数组上对应位置的 HashEntry 对象,接下去遍历此 HashEntry 及其 next 对象,寻找 hash 值相等及 key equals 的 HashEntry 对象。在找到的情况下,获取其 value,如 value 不为 null,则直接返回此 value,如为 null,则调用 readValueUnderLock 方法。readValueUnderLock 方法首先进行 lock 操作,然后直接返回 HashEntry 的 value 属性,最后释放锁。
经过以上步骤,get 操作就完成了,从上面 Segment 的 get 步骤来看,仅在寻找到的 HashEntry 对象的 value 为 null 时,才进行锁操作。其它情况下并没有锁操作,也就是可以认为 ConcurrentHashMap 在读数据时大部分情况下是没有采用锁的,那么它如何保证并发场景下数据的一致性呢?
对上面的实现步骤进行分析,get 操作首先通过 hash 值和对象数组大小减 1 的值进行按位与操作来获取数组上对应位置的 HashEntry 。在这个步骤中,可能会因为对象数组大小的改变,以及数组上对应位置的 HashEntry 产生不一致性,那么 ConcurrentHashMap 是如何保证的?
对象数组大小的改变只有在 put 操作时有可能发生,由于 HashEntry 对象数组对应的变量是 voliate 类型的,因此可以保证如 HashEntry 对象数组大小发生改变,该操作时可看到最新的对象数组大小。
在 put 和 remove 操作进行时,都有可能造成 HashEntry 对象数组上对应位置的 HashEntry 发生改变。如在读操作已获取到 HashEntry 对象后,有一个 put 或 remove 操作完成,此时读操作尚未完成,那么这时会造成读的不一致性,但这种几率相对而言非常低。
在获取到了 HashEntry 对象后,怎么能保证它及其 next 属性构成的链表上的对象不会改变呢?这点 ConcurrentHashMap 采用了一个简单的方式,即 HashEntry 对象中的 hash、key 以及 next 属性都是 final 的,这也就意味着没办法插入一个 HashEntry 对象到 HashEntry 基于 next 属性构成的链表中间或末尾。这样可以保证当获取到 HashEntry 对象后,其基于 next 属性构建的链表是不会发生变化的。
至于为什么要判断获取的 HashEntry 的 value 是否为 null,原因在于 put 操作创建一个新的 HashEntry 时,并发读取时有可能 value 属性尚未完成设置,因此将读取到默认值,不过具体出现这种现象的原因还未知。
六、containsKey(Object key)
它和 get 操作一样,没有进行加锁操作,整个步骤和 get 相同,只是当寻找到有 key、hash 相等的 HashEntry 时,才返回 true,否则返回 false。
七、keySet().iterator()
它其实也是个类似的读取过程,只是要读取所有分段中的数据,ConcurrentHashMap 采用的方法即为遍历每个分段中的 HashEntry 对象数组,完成集合中所有对象的读取。这个过程也是不加锁的,因此遍历 ConcurrentHashMap 不会抛出 ConcurrentModificationException,这点和 HashMap 不同。
从上面的分析可以看出,ConcurrentHashMap 默认情况下采用将数据分为 16 个段进行存储,并且 16 个段分别持有各自的锁,锁仅用于 put 和 remove 等改变集合对象的操作,基于 volatile 及 HashEntry 链表的不变性实现读取的不加锁。这些方式使得 ConcurrentHashMap 能够保持极好的并发支持,尤其是对于读远比插入和删除频繁的 Map 而言,而它采用的这些方法也可谓是对于 Java 内存模型、并发机制深刻掌握的体现,是一个设计的非常不错的支持高并发的集合对象,不过对于 ConcurrentHashMap 而言,由于没有一个全局锁,size 这样的方法就比较复杂了。在计算 size 时,ConcurrentHashMap 采取的方法为:
在不加锁的情况下遍历所有的段,读取其 count 以及 modCount,这两个属性都是 volatile 类型的,并进行统计,完毕后,再遍历一次所有的段,比较 modCount 是否有改变。如有改变,则再尝试两次以上动作。
如执行了三次上述动作后,仍然有问题,则遍历所有段,分别进行加锁,然后进行计算,计算完毕后释放所有锁,从而完成计算动作。
以上的方法使得 size 方法在大部分情况下也可通过不加锁的方式计算出来。
网友评论