ConcurrentHashMap源码分析

作者: Justlearn | 来源:发表于2017-04-13 19:07 被阅读904次

前言

JDK中的Hashtable是一个线程安全的K-V形式的容器,它实现线程安全的原理十分简单,就是在所有涉及对该哈希表操作的方法上都加上了synchronized关键字,进行加锁操作。这么做实现了线程安全,但是效率非常低。

//通过synchronized在每次进入方法时获取hashmap的锁,高并发情况下会出现争用冲突
public synchronized V get(Object key) {
        Entry tab[] = table;
        int hash = hash(key);
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                return e.value;
            }
        }
        return null;
    }

ConcurrentHashMap使用了分段锁的设计,只有在每个段上才会存在并发争用,不同的段之间不存在锁的竞争,提高了该hashmap的并发访问效率。但也是由于ConcurrentHashMap的分段设计,某些需要扫描所有段的操作如size()等方法实现上比较复杂,并且并不能保证强一致性,这个在某些情况下需要注意。由于ConcurrentHashMap在各个JDK版本下的实现是有区别的,特说明本文基于jdk1.7源码,我们下面一起通过其源码来分析它的实现原理。

ConcurrentHashMap的组成

查看源码,我们可以通过ConcurrentHashMap的成员变量,了解该数据结构的基本组成。

    final int segmentMask;//作为查找segments的掩码,前几个bit用来选择segment

    final int segmentShift;

    final Segment<K,V>[] segments;//每个segment都是一个hashtable 

ConcurrentHashMap内部有一个segments数组,每个segment都是一个hashtable ,由于Segment继承自ReentrantLock ,因此Segment本身就是锁,对每个Segment的加锁就是调用自身的acquire()方法。Segment的基本组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile HashEntry<K,V>[] table;//存入ConcurrentHashMap的k-v数据都存在这里
        transient int count;//该segment内的HashEntry计数(如put/remove/replace/clear)
        transient int modCount;//对该segment的修改操作数量
        final float loadFactor;//hashtable的负载因子
. . . . . . . . .
}

HashEntry是最终存储每一对k-v的最底层数据结构,它的结构为:

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
}

value为volatile,在多线程环境下可以保持可见性,因此可以不加锁直接读取。
ConcurrentHashMap的整体数据结构如图


ConcurrentHashMap源码分析

本文前言中,我们说了‘ConcurrentHashMap通过分段锁技术提高了该hashmap的并发访问效率’,我们接下来就通过ConcurrentHashMap的get/set/remove等方法来说明,ConcurrentHashMap在保证线程安全的情况下,是如何做到这些的。

  • ConcurrentHashMap的初始化
    concurrencyLevel 是预估会对ConcurrentHashMap进行并发修改的线程数,也可以理解为支持的最大并发访问ConcurrentHashMap且不产生锁的争用的线程数,也就是Segment的数量,也即分段锁的个数,默认为16。
//loadFactor是每个segment的负载因子,concurrencyLevel是估计的并发修改线程,默认为16,所以创建16个segment
  public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;//默认16
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);//初始化一个segment
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
  • 创建分段锁
    如上面ConcurrentHashMap的初始化,ConcurrentHashMap采用延迟初始化的模式,首先只new一个Segment,剩下的Segment只在使用时初始化。每次put操作时,定位到key所在的Segment后,会先去判断Segment是否初始化了,没有的话由新建一个Segment并通过循环CAS设置Segment数组对应位置引用。
 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
  • get()
    get方法不需要加锁操作,因为HashEntry中的value是volatile,可以保证线程间的可见性。由于segments非volatile,通过UNSAFE的getObjectVolatile方法提供volatile读语义来遍历获得对应链表上的节点。但没有锁可能会导致在遍历 的过程中几点 被其它线程修改,返回的val可能是过时数据,这部分是ConcurrentHashMap非强一致性的体现,如果对强一致性有要求,那么就只能去使用Hashtable或Collections.synchronizedMap这种通过synchronized关键字进行加锁提供强一致性的Map容器。containsKey操作与get是 一样 的。
public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//u是该key在segments数组的下标,可以定位该key是在哪一个Segment
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
//(tab.length - 1) & h)定位key在Segment的HashEntry数组中的下标
//遍历HashEntry链表,找到与key和key的hash值一致的HashEntry元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
  • put/putIfAbsent/putAll操作
    数据插入操作,在这里才能看到ConcurrentHashMap基于分段锁提高高并发环境下的处理能力,增大ConcurrentHashMap吞吐量的实现技巧。
//该方法是ConcurrentHashMap所有put操作的核心
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//首先尝试加锁(调用ReentrantLock的tryLock方法,对当前Segment实例加锁),若没有获取成功则找到与该Key相同的Node,若没有则new一个。返回时必须获取锁,
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;//
        int index = (tab.length - 1) & hash;//当前该Node的插入点
        HashEntry<K,V> first = entryAt(tab, index);//获取tbale中第index个元素
        //在链表上遍历对比节点有相同的key则修改对应value,没有则放在链表尾部
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {//key不存在的情况下才会设置value
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                //当当前segment的元素数量超过阈值时rehash
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);//超过阈值进行扩容
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
}
  • size()/containsValue()
    先不加锁进行循环相加计算,最少进行两次对所有Segment的modCount与count进行求和,如果前后两次modCount求和的值相等,认为这期间ConcurrentHashMap的HashEntry数量没有变化,就 直接返回count求和的值,认为这就是ConcurrentHashMap的大小。如果前后两次modCount求和的值相等一直不相等,会循环进行 这样 的尝试。默认是两次,超过两次就会认为可能现在在ConcurrentHashMap上正在 进行着密集的操作(会 影响ConcurrentHashMap的大小),于是会强制对 每个Segment进行加锁,然后在重复 前面的计数操作,最终返回 计算的ConcurrentHashMap的大小。因此多线程环境下这种实现返回 的 ConcurrentHashMap大小是近似的,不准确的,这个 需要在使用ConcurrentHashMap时做出取舍。containsValue与size操作原理类似。
public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)//判断前后modCount和相等
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

总结

以上就是jdk1.7中ConcurrentHashMap重要操作的源码实现分析。我们可以对 ConcurrentHashMap做出 以下判断:

  • 通过分段锁的设计,减小了各个分段之间的锁的争用冲突,各个分段本省 就是一个独立的哈希表。这样的设计提高 了ConcurrentHashMap的 吞吐量,提高了hashmap的并发访问效率。
  • HashEntry中value是volatile类型,使得ConcurrentHashMap的get方法可以无锁操作获取value,提高了get的效率。但需要注意的是,这也导致了ConcurrentHashMap的 弱一致性问题,在对get操作 有强一致性要求的场景下,必须选用其它支持map强一致性的容器。
  • size等操作在多线程环境下的返回值是不精确的,只可做近似值,建议多线程环境下不要依赖size()。

参考

相关文章

网友评论

    本文标题:ConcurrentHashMap源码分析

    本文链接:https://www.haomeiwen.com/subject/ffovottx.html