ConcurrentHashMap源码分析

作者: Justlearn | 来源:发表于2017-04-13 19:07 被阅读904次

    前言

    JDK中的Hashtable是一个线程安全的K-V形式的容器,它实现线程安全的原理十分简单,就是在所有涉及对该哈希表操作的方法上都加上了synchronized关键字,进行加锁操作。这么做实现了线程安全,但是效率非常低。

    //通过synchronized在每次进入方法时获取hashmap的锁,高并发情况下会出现争用冲突
    public synchronized V get(Object key) {
            Entry tab[] = table;
            int hash = hash(key);
            int index = (hash & 0x7FFFFFFF) % tab.length;
            for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
                if ((e.hash == hash) && e.key.equals(key)) {
                    return e.value;
                }
            }
            return null;
        }
    

    ConcurrentHashMap使用了分段锁的设计,只有在每个段上才会存在并发争用,不同的段之间不存在锁的竞争,提高了该hashmap的并发访问效率。但也是由于ConcurrentHashMap的分段设计,某些需要扫描所有段的操作如size()等方法实现上比较复杂,并且并不能保证强一致性,这个在某些情况下需要注意。由于ConcurrentHashMap在各个JDK版本下的实现是有区别的,特说明本文基于jdk1.7源码,我们下面一起通过其源码来分析它的实现原理。

    ConcurrentHashMap的组成

    查看源码,我们可以通过ConcurrentHashMap的成员变量,了解该数据结构的基本组成。

        final int segmentMask;//作为查找segments的掩码,前几个bit用来选择segment
    
        final int segmentShift;
    
        final Segment<K,V>[] segments;//每个segment都是一个hashtable 
    

    ConcurrentHashMap内部有一个segments数组,每个segment都是一个hashtable ,由于Segment继承自ReentrantLock ,因此Segment本身就是锁,对每个Segment的加锁就是调用自身的acquire()方法。Segment的基本组成如下:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
            transient volatile HashEntry<K,V>[] table;//存入ConcurrentHashMap的k-v数据都存在这里
            transient int count;//该segment内的HashEntry计数(如put/remove/replace/clear)
            transient int modCount;//对该segment的修改操作数量
            final float loadFactor;//hashtable的负载因子
    . . . . . . . . .
    }
    

    HashEntry是最终存储每一对k-v的最底层数据结构,它的结构为:

    static final class HashEntry<K,V> {
            final int hash;
            final K key;
            volatile V value;
            volatile HashEntry<K,V> next;
    }
    

    value为volatile,在多线程环境下可以保持可见性,因此可以不加锁直接读取。
    ConcurrentHashMap的整体数据结构如图


    ConcurrentHashMap源码分析

    本文前言中,我们说了‘ConcurrentHashMap通过分段锁技术提高了该hashmap的并发访问效率’,我们接下来就通过ConcurrentHashMap的get/set/remove等方法来说明,ConcurrentHashMap在保证线程安全的情况下,是如何做到这些的。

    • ConcurrentHashMap的初始化
      concurrencyLevel 是预估会对ConcurrentHashMap进行并发修改的线程数,也可以理解为支持的最大并发访问ConcurrentHashMap且不产生锁的争用的线程数,也就是Segment的数量,也即分段锁的个数,默认为16。
    //loadFactor是每个segment的负载因子,concurrencyLevel是估计的并发修改线程,默认为16,所以创建16个segment
      public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;//默认16
            // Find power-of-two sizes best matching arguments
            int sshift = 0;
            int ssize = 1;
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;
            }
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);//初始化一个segment
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }
    
    • 创建分段锁
      如上面ConcurrentHashMap的初始化,ConcurrentHashMap采用延迟初始化的模式,首先只new一个Segment,剩下的Segment只在使用时初始化。每次put操作时,定位到key所在的Segment后,会先去判断Segment是否初始化了,没有的话由新建一个Segment并通过循环CAS设置Segment数组对应位置引用。
     private Segment<K,V> ensureSegment(int k) {
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                int cap = proto.table.length;
                float lf = proto.loadFactor;
                int threshold = (int)(cap * lf);
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
        }
    
    • get()
      get方法不需要加锁操作,因为HashEntry中的value是volatile,可以保证线程间的可见性。由于segments非volatile,通过UNSAFE的getObjectVolatile方法提供volatile读语义来遍历获得对应链表上的节点。但没有锁可能会导致在遍历 的过程中几点 被其它线程修改,返回的val可能是过时数据,这部分是ConcurrentHashMap非强一致性的体现,如果对强一致性有要求,那么就只能去使用Hashtable或Collections.synchronizedMap这种通过synchronized关键字进行加锁提供强一致性的Map容器。containsKey操作与get是 一样 的。
    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//u是该key在segments数组的下标,可以定位该key是在哪一个Segment
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
    //(tab.length - 1) & h)定位key在Segment的HashEntry数组中的下标
    //遍历HashEntry链表,找到与key和key的hash值一致的HashEntry元素
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }
    
    • put/putIfAbsent/putAll操作
      数据插入操作,在这里才能看到ConcurrentHashMap基于分段锁提高高并发环境下的处理能力,增大ConcurrentHashMap吞吐量的实现技巧。
    //该方法是ConcurrentHashMap所有put操作的核心
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    //首先尝试加锁(调用ReentrantLock的tryLock方法,对当前Segment实例加锁),若没有获取成功则找到与该Key相同的Node,若没有则new一个。返回时必须获取锁,
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;//
            int index = (tab.length - 1) & hash;//当前该Node的插入点
            HashEntry<K,V> first = entryAt(tab, index);//获取tbale中第index个元素
            //在链表上遍历对比节点有相同的key则修改对应value,没有则放在链表尾部
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {//key不存在的情况下才会设置value
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    //当当前segment的元素数量超过阈值时rehash
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);//超过阈值进行扩容
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    }
    
    • size()/containsValue()
      先不加锁进行循环相加计算,最少进行两次对所有Segment的modCount与count进行求和,如果前后两次modCount求和的值相等,认为这期间ConcurrentHashMap的HashEntry数量没有变化,就 直接返回count求和的值,认为这就是ConcurrentHashMap的大小。如果前后两次modCount求和的值相等一直不相等,会循环进行 这样 的尝试。默认是两次,超过两次就会认为可能现在在ConcurrentHashMap上正在 进行着密集的操作(会 影响ConcurrentHashMap的大小),于是会强制对 每个Segment进行加锁,然后在重复 前面的计数操作,最终返回 计算的ConcurrentHashMap的大小。因此多线程环境下这种实现返回 的 ConcurrentHashMap大小是近似的,不准确的,这个 需要在使用ConcurrentHashMap时做出取舍。containsValue与size操作原理类似。
    public int size() {
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            final Segment<K,V>[] segments = this.segments;
            int size;
            boolean overflow; // true if size overflows 32 bits
            long sum;         // sum of modCounts
            long last = 0L;   // previous sum
            int retries = -1; // first iteration isn't retry
            try {
                for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    sum = 0L;
                    size = 0;
                    overflow = false;
                    for (int j = 0; j < segments.length; ++j) {
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null) {
                            sum += seg.modCount;
                            int c = seg.count;
                            if (c < 0 || (size += c) < 0)
                                overflow = true;
                        }
                    }
                    if (sum == last)//判断前后modCount和相等
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return overflow ? Integer.MAX_VALUE : size;
        }
    

    总结

    以上就是jdk1.7中ConcurrentHashMap重要操作的源码实现分析。我们可以对 ConcurrentHashMap做出 以下判断:

    • 通过分段锁的设计,减小了各个分段之间的锁的争用冲突,各个分段本省 就是一个独立的哈希表。这样的设计提高 了ConcurrentHashMap的 吞吐量,提高了hashmap的并发访问效率。
    • HashEntry中value是volatile类型,使得ConcurrentHashMap的get方法可以无锁操作获取value,提高了get的效率。但需要注意的是,这也导致了ConcurrentHashMap的 弱一致性问题,在对get操作 有强一致性要求的场景下,必须选用其它支持map强一致性的容器。
    • size等操作在多线程环境下的返回值是不精确的,只可做近似值,建议多线程环境下不要依赖size()。

    参考

    相关文章

      网友评论

        本文标题:ConcurrentHashMap源码分析

        本文链接:https://www.haomeiwen.com/subject/ffovottx.html