美文网首页程序员
ConcurrentHashMap-jdk1.7

ConcurrentHashMap-jdk1.7

作者: justlinzhihe | 来源:发表于2018-03-22 23:50 被阅读0次

    先上一张简单的ConcurrentHashMap的内部存储结构图


    简单结构示意图

    简单的说下流程:
    1)先根据key的哈希散列值分配切片segment,这样减少线程竞争
    2)将对应的key,value存储到分配到的segment的table数组中,这个其实和hashmap类似,只不过用了lock来保证线程安全

    接下来详细分析

    1.默认构造函数初始化

    public ConcurrentHashMap() {
            this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
     }
    
        static final int DEFAULT_INITIAL_CAPACITY = 16;
    
        /**
         * The default load factor for this table, used when not
         * otherwise specified in a constructor.
         */
        static final float DEFAULT_LOAD_FACTOR = 0.75f;
    
        /**
         * The default concurrency level for this table, used when not
         * otherwise specified in a constructor.
         */
        static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    
        public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
            int sshift = 0;
            int ssize = 1;
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;
            }
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }
    

    segments 数组初始化大小为16
    Segment的table数组初始化大小为2;负载因子0.75

    2.根据hash散列值分配获取切片,如果对应的segment还没有初始化,就new出来cas放到segments数组

        public V put(K key, V value) {
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            int hash = hash(key);
            int j = (hash >>> segmentShift) & segmentMask;
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            return s.put(key, hash, value, false);
        }
    
        private Segment<K,V> ensureSegment(int k) {
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                int cap = proto.table.length;
                float lf = proto.loadFactor;
                int threshold = (int)(cap * lf);
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
        }
    

    3.获取segment后,竞争非公平独占锁

          final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    

    这里加锁用的是ReentrantLock的非公平锁

       public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    

    如果cas获取独占锁不成功就进行tryLock

            private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
                HashEntry<K,V> first = entryForHash(this, hash);
                HashEntry<K,V> e = first;
                HashEntry<K,V> node = null;
                int retries = -1; // negative while locating node
                while (!tryLock()) {
                    HashEntry<K,V> f; // to recheck first below
                    if (retries < 0) {
                        if (e == null) {
                            if (node == null) // speculatively create node
                                node = new HashEntry<K,V>(hash, key, value, null);
                            retries = 0;
                        }
                        else if (key.equals(e.key))
                            retries = 0;
                        else
                            e = e.next;
                    }
                    else if (++retries > MAX_SCAN_RETRIES) {
                        lock();
                        break;
                    }
                    else if ((retries & 1) == 0 &&
                             (f = entryForHash(this, hash)) != first) {
                        e = first = f; // re-traverse if entry changed
                        retries = -1;
                    }
                }
                return node;
            }
    

    tryLock过程中如果table对应下标位置的元素为null,那就会生成一个node节点,在获取锁的时候返回。
    tryLock次数为1或者64,取决于虚拟机数,如果超过次数就进行lock操作,可能挂起

            static final int MAX_SCAN_RETRIES =
                Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    

    tryLock过程中如果table中对应下标位置的元素发生变化,那就重新tryLock

    4.获取独占锁之后插入操作也和hashmap类似,还有扩容操作,X2扩容,因为这里能保证是一个线程执行,所以不会出现死循环的问题

     private void rehash(HashEntry<K,V> node) {
                /*
                 * Reclassify nodes in each list to new table.  Because we
                 * are using power-of-two expansion, the elements from
                 * each bin must either stay at same index, or move with a
                 * power of two offset. We eliminate unnecessary node
                 * creation by catching cases where old nodes can be
                 * reused because their next fields won't change.
                 * Statistically, at the default threshold, only about
                 * one-sixth of them need cloning when a table
                 * doubles. The nodes they replace will be garbage
                 * collectable as soon as they are no longer referenced by
                 * any reader thread that may be in the midst of
                 * concurrently traversing table. Entry accesses use plain
                 * array indexing because they are followed by volatile
                 * table write.
                 */
                HashEntry<K,V>[] oldTable = table;
                int oldCapacity = oldTable.length;
                int newCapacity = oldCapacity << 1;
                threshold = (int)(newCapacity * loadFactor);
                HashEntry<K,V>[] newTable =
                    (HashEntry<K,V>[]) new HashEntry[newCapacity];
                int sizeMask = newCapacity - 1;
                for (int i = 0; i < oldCapacity ; i++) {
                    HashEntry<K,V> e = oldTable[i];
                    if (e != null) {
                        HashEntry<K,V> next = e.next;
                        int idx = e.hash & sizeMask;
                        if (next == null)   //  Single node on list
                            newTable[idx] = e;
                        else { // Reuse consecutive sequence at same slot
                            HashEntry<K,V> lastRun = e;
                            int lastIdx = idx;
                            for (HashEntry<K,V> last = next;
                                 last != null;
                                 last = last.next) {
                                int k = last.hash & sizeMask;
                                if (k != lastIdx) {
                                    lastIdx = k;
                                    lastRun = last;
                                }
                            }
                            newTable[lastIdx] = lastRun;
                            // Clone remaining nodes
                            for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                                V v = p.value;
                                int h = p.hash;
                                int k = h & sizeMask;
                                HashEntry<K,V> n = newTable[k];
                                newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                            }
                        }
                    }
                }
                int nodeIndex = node.hash & sizeMask; // add the new node
                node.setNext(newTable[nodeIndex]);
                newTable[nodeIndex] = node;
                table = newTable;
            }
    

    5.有个细节需要注意,就是对于segmants和table这两个数组的读取和存储操作都用的UNSAFE操作。
    这个是因为Java数组在元素层面的元数据设计上的缺失,无法表达元素是final、volatile等语义,所以开了后门,使用getObjectVolatile用来补上无法表达元素是volatile的坑,@Stable用来补上final的坑,数组元素就跟没有标volatile的成员字段一样,无法保证线程之间可见性。参考https://www.jianshu.com/p/5808db3e2ace

    相关文章

      网友评论

        本文标题:ConcurrentHashMap-jdk1.7

        本文链接:https://www.haomeiwen.com/subject/iwgzqftx.html