美文网首页
java.util.concurrent源码阅读 06 Conc

java.util.concurrent源码阅读 06 Conc

作者: _呆瓜_ | 来源:发表于2016-10-11 23:14 被阅读212次

    Java集合框架中的Map类型的数据结构是非线程安全, 在多线程环境中使用时需要手动进行线程同步. 因此在java.util.concurrent包中提供了一个线程安全版本的Map类型数据结构: ConcurrentMap. 本篇文章主要关注ConcurrentMap接口以及它的Hash版本的实现ConcurrentHashMap.

    要实现线程安全,就需要加锁, HashTable就是线程安全的, 但是HashTable对整张表加锁的做法非常消耗性能, ConcurrentMap的做法简单来说, 就是把哈希表分成若干段, 对其中某一段操作时, 只锁住这一段, 其他段可以不受影响. 如下图所示:

    ConcurrentHashMap存储结构

    整个ConcurrentMap由一个segment数组组成(即segments), 数组中每一个segment是一张哈希表, 哈希表中存放的是一张hashentry链表.

    本文主要分析以下两个过程:

    1.ConcurrentMap的构造方法
    2.put(K key, V value)
    

    ConcurrentMap的构造方法

    public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel);
    public ConcurrentHashMap(int initialCapacity, float loadFactor);
    
    public ConcurrentHashMap(int initialCapacity);
    
    public ConcurrentHashMap();
    
    public ConcurrentHashMap(Map<? extends K, ? extends V> m);
    

    这5种构造方法, 最终都会使用第一个构造函数来初始化ConcurrentHashMap.

    第一个构造函数的代码:

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }
    

    其中concurrencyLevel是实例化以后就不可改变的, 它决定了segments数组的大小, 同时决定了这个ConcurrentHashMap的并发能力.segmentShift和segmentMask与hash算法相关.loadFactor参数决定了Segment在何时扩容.

    ConcurrentHashMap的put方法

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
    

    过程如下:
    1.通过第一次哈希算法,计算当前key属于哪一个segment;
    2.如果映射到的segment为空, 则执行3, 否则执行4;
    3.借助UNSAFE类, 使用线程安全的方式, 构建一个新的segment, 并且放入segments中相应的位置;
    4.调用segment中的put方法, 将key, value对放入segment中.

    从中可以看到, 最终存储key,value对的是segment, 因此只要对需要插入键值对的segment上锁就可以保证线程安全.

    segment的put方法

    首先看segment的定义, 它是ConcurrentHashMap的内部类, 该类继承了ReentrantLock.

    static final class Segment<K,V> extends ReentrantLock implements Serializable {}
    

    查看它的构造方法:

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
    

    Segment的主要构成是一张哈希表(存储hashentry)和threshold(当哈希表尺寸大于threshold时扩容).

    下面是它的put方法:

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    

    put方法的主要流程如下:
    1.尝试获取当前segment的锁,若成功则执行3, 若失败则执行2;
    2.根据key在hashtable中搜索hashentry, 并且获取锁(此处会进行阻塞操作);
    3.根据key在hashtable中搜索hashentry,如果当前位置已经存在值, 则使用链接的方式解决冲突;
    4.如果当前尺寸超过了threshold,则执行rehash(将segment中的所有键值对重新hash).

    总结

    ConcurrentHashMap的实现,主要原理就是使用segments将原本唯一的hashtable分段, 增加并发能力. ConcurrentHashMap中还有比较重要的一点就是使用的两次哈希算法(第一次哈希算法找到segment, 第二次哈希算法找到hashentry), 这两次哈希算法要求能尽量将元素均匀的分布到不同的segment的hashtable中, ConcurrentHashMap使用的是single-word Wang/Jenkins哈希算法的一个变种, 关于哈希算法, 可以看此处(http://burtleburtle.net/bob/hash/doobs.html) 进一步了解.

    相关文章

      网友评论

          本文标题:java.util.concurrent源码阅读 06 Conc

          本文链接:https://www.haomeiwen.com/subject/qggbyttx.html