美文网首页
转:ConcurrentHashMap JDK 7 源码分析

转:ConcurrentHashMap JDK 7 源码分析

作者: Java旅行者 | 来源:发表于2017-08-27 20:29 被阅读0次

    参考文章:http://www.jianshu.com/p/bd972088a494

    数据结构

    image

    构造函数

    put 的实现

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        //定位Segment,让Hash右移动segmentShift位,默认情况下就是28位(总长32位),之后和segmentMask(0XFF)做与操作,得到段的索引
        int j = (hash >>> segmentShift) & segmentMask;
        //利用UNSAFE.getObject中的方法获取到目标的Segment。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            //如果没有取到目标Segment,所以需要保证能取到这个Segment,没有的话创建一个Segment
            s = ensureSegment(j);
        //代理到Segment的put方法
        return s.put(key, hash, value, false);
    }
    

    首先是(Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)), UNSAFE这种用法是在JDK1.6中没有的,主要是利用Native方法来快速的定位元素。

    还有一个点是:ensureSegment()

    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //getObjectVolatile是以Volatile的方式获得目标的Segment,Volatile是为了保证可见性。
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            //如果没有取到,那么证明指定的Segment不存在,那么需要新建Segment,方式是以ss[0]为镜像创建。
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // 再次检查
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//创建新Segment
                //以CAS的方式,将新建的Segment,set到指定的位置。
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
    

    上面的代码就是保证,在put之前,要保证目标的Segment是存在的,不存在需要创建一个Segment。
    put方法代理到了Segment的put方法,Segment extends 了ReentrantLock,以至于它能当做一个Lock使用。那么我们看一下Segment的put的实现:

    进入Segment的put操作时先进行加锁保护。如果加锁没有成功,调用scanAndLockForPut方法(详细步骤见下面scanAndLockForPut()源码分析)进入自旋状态,该方法持续查找key对应的节点链中是已存在该机节点,如果没有找到,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入加锁等待状态,自旋结束并返回节点(如果返回了一个非空节点,则表示在链表中没有找到相应的节点)。对最大尝试次数,目前的实现单核次数为1,多核为64。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        //因为put操作会改变整体的结构,所以需要保证段的线程安全性,所以首先tryLock
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            //新建tab引用,避免直接引用Volatile导致性能损耗,
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            //Volatile读,保证可见性
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    //遍历HashEntry数组,寻找可替换的HashEntry
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    //如果不存在可替换的HashEntry,如果在scanAndLockForPut中建立了此Node直接SetNext,追加到链表头
                    if (node != null)
                        node.setNext(first);
                    else
                        //如果没有则新建一个Node,添加到链表头
                        node = new HashEntry<K,V>(hash, key, value, first);
                    //容量计数+1
                    int c = count + 1;
                    //如果容量不足,那么扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        //以Volatile写的方式,替换tab[index]的引用
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    

    put方法是做了加锁操作的,所以不用过多的考虑线程安全的问题,但是get操作为了保证性能是没有加锁的,所以需要尽量的保证数据的可见性,能让get得到最新的数据。

    让我们看看 scanAndLockForPut(key, hash, value) 在做什么:

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) {
                    if (node == null) // speculatively create node
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }
    

    当put操作尝试加锁没成功时,它不是直接进入等待状态,而是调用了scanAndLockForPut()操作,该操作持续查找key对应的节点链中是已存在该机节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数操作限制,才真正进入等待状态,计所谓的自旋等待。对最大尝试次数,目前的实现单核次数为1,多核为64。

    在这段逻辑中,它先获取key对应的节点链的头,然后持续遍历该链,如果节点链中不存在要插入的节点,则预创建一个节点,否则retries值资增,直到操作最大尝试次数而进入等待状态。这里需要注意最后一个else if中的逻辑:当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,并重置retries值为-1,重新为尝试获取锁而自旋遍历。

    为什么要这么做呢?为了事先做数据的缓存,让这些数据缓存在CPU的cache中,这样后续在使用时能避免Cache missing。ps:scanAndLockForPut有个孪生兄弟scanAndLock,作用都差不多。

    和 JDK 1.6 的实现的不同:

    V put(K key, int hash, V value, boolean onlyIfAbsent) {  
        lock();  
        try {  
            int c = count;  
            if (c++ > threshold) // ensure capacity  
                rehash();  
            HashEntry<K,V>[] tab = table;  
            int index = hash & (tab.length - 1);  
            HashEntry<K,V> first = tab[index];  
             HashEntry<K,V> e = first;  
             while (e != null && (e.hash != hash || !key.equals(e.key)))  
                 e = e.next;  
       
             V oldValue;  
             if (e != null) {  
                oldValue = e.value;  
                 if (!onlyIfAbsent)  
                    e.value = value;  
             }  
             else {  
                oldValue = null;  
                 ++modCount;  
                tab[index] = new HashEntry<K,V>(key, hash, first, value);  
                count = c; // write-volatile  
            }  
            return oldValue;  
        } finally {  
             unlock();  
        }  
    }
    

    JDK 1.6 的实现和 JDK 1.7 的实现比较相似,但是主要区别是,没有使用一些 UNSAFE 的方法去保证内存的可见性,而是通过一个Volatile变量——count去实现。在开始的时候读count保证lock的内存语意,最后写count实现unlock的内存语意。
    但是这里存在一个问题,new HashEntry操作存在重排序问题,导致在getValue的时候tab[index]不为null,但是value为null。所以在 get 的时候会有一步重新检查的步骤,如果 value 为 null 的话就把 segment 段加锁在重新 get 一次。具体原因后面详细说明。

    get 的实现

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
    

    可以看出来,get方法很简单,同时get是没有加锁的,那么get是如何保证可见性的呢?首先获取指定index的Segment,利用getObjectVolatile获取指定index的first HashEntry,之后遍历HashEntry链表,这里比较关键的是HashEntry的数据结构:

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }
    

    两个变量是volatile的,也就是说,两个变量的读写能保证数据的可见性。
    所以在变量HashEntry时,总能保证得到最新的值。

    JKD1.6的get方法的实现:

    V get(Object key, int hash) {  
        if (count != 0) { // read-volatile 当前桶的数据个数是否为0 
            HashEntry<K,V> e = getFirst(hash);  得到头节点
            while (e != null) {  
                if (e.hash == hash && key.equals(e.key)) {  
                    V v = e.value;  
                    if (v != null)  
                       return v;  
                    return readValueUnderLock(e); // recheck  
                }  
                e = e.next;  
            }  
        }  
        return null;  
    }
    

    首先是读取count变量,因为内存的可见性,总是能返回最新的结构,但是对于getFirst可能得到的是过时的HashEntry。接下来获取到HashEntry之后getValue。但是这里为什么要做一个value的判空,原因就是上一步put的重排序问题,如果为null,那么只能加锁,加锁之后进行重新读取。但是这样确实会带来一些开销。

    为什么 JDK 1.6 的实现是弱一致性的?

    这里比较重要的一点就是,为什么JDK1.6的是弱一致性的?因为JDK1.6的所有可见性都是以count实现的,当put和get并发时,get可能获取不到最新的结果,这就是JDK1.6中ConcurrentHashMap弱一致性问题,主要问题是 tab[index] = new HashEntry<K,V>(key, hash, first, value); 不一定 happened before getFirst(hash);

    如下图可以说明:

    执行put的线程 执行get的线程
    ⑧tab[index] = new HashEntry<K,V>(key, hash, first, value)
    ③if (count != 0)
    ②count = c
    ⑨HashEntry e = getFirst(hash);

    对volatile的count的读时间上发生在对count的写之前,我们无法得出② hb ⑨这层关系了。因此,通过count变量,在这个轨迹上是无法得出⑧ hb ⑨的。

    而JDK1.7的实现,对于每一个操作都是Volatile变量的操作,能保证线程之间的可见性,所以不存在弱一致性的问题。(对这句话持有保留观点)

    remove 的实现(具体实现以及和 1.6 对比要补上)

    final V remove(Object key, int hash, Object value) {
        if (!tryLock())
            scanAndLock(key, hash);
        V oldValue = null;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> e = entryAt(tab, index);
            HashEntry<K,V> pred = null;
            while (e != null) {
                K k;
                HashEntry<K,V> next = e.next;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    V v = e.value;
                    if (value == null || value == v || value.equals(v)) {
                        if (pred == null)
                            setEntryAt(tab, index, next);
                        else
                            pred.setNext(next);
                        ++modCount;
                        --count;
                        oldValue = v;
                    }
                    break;
                }
                pred = e;
                e = next;
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
    

    这里的 remove 实现我在公司里看到一篇文章,但是记不清在哪里看到的了。大致上说的是,remove 操作的时候不需要向 JDK 1.6 的那样,将要删除的节点赋值一遍,然后删除对应的元素, 再将最后一个元素的 next 元素连接上去。 而是用了 CAS 的操作实现的删除。

    size 的实现

    public int size() {
        // 主要的方法是先遍历 2 次 Segment,如果两次遍历的结果得到的 size 相同
        //那么就认为 size 的结果是准确的,否则就要对每一个 Segment 加锁重新计算 size 的大小
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }
    

    主要的方法是先遍历 2 次 Segment,如果两次遍历的结果得到的 size 相同,那么就认为 size 的结果是准确的,否则就要对每一个 Segment 加锁重新计算 size 的大小。获取锁的操作在这里

    if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            ensureSegment(j).lock(); // force creation
    }
    

    JDK 6 和 7 的区别

    总体来说变化挺多,不过总体的结构并没有发生改变,还是采用了 segment 分断锁的实现。 1.6 主要用 count 的 volatile 语义来实现put、get。1.7 主要使用 unsafe 的 CAS 操作来实现。

    相关文章

      网友评论

          本文标题:转:ConcurrentHashMap JDK 7 源码分析

          本文链接:https://www.haomeiwen.com/subject/yveidxtx.html