美文网首页java
ConcurrentHashMap

ConcurrentHashMap

作者: n油炸小朋友 | 来源:发表于2018-04-29 16:09 被阅读1次

    HashTable的线程安全是通过synchronized锁住整张表实现的,ConcurrenthashMap实现了锁分离,允许有多把锁对表的不同部分进行修改,ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的Hashtable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

    结构.png

    Segment

    从图中可以看到,ConcurrentHashMap内部分为很多个Segment,
    static final class Segment<K,V> extends ReentrantLock implements Serializable
    Segment继承了ReentrantLock,表明每个segment都可以当做一个锁。这样对每个segment中的数据需要同步操作的话都是使用每个segment容器对象自身的锁来实现。只有对全局需要改变时锁定的是所有的segment。

    HashEntry

    Segment下面包含很多个HashEntry列表数组。对于一个key,需要经过三次hash操作,才能最终定位这个元素的位置,这三次hash分别为:

    • 对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);

    • 将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;

    • 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。

    HashEntry代表每个hash链中的一个节点,其结构如下所示:

    static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         volatile HashEntry<K,V> next;  
     }
    

    HashEntry中的next指针也定义为final,并且每次插入将新添加节点作为链的头节点;
    每次删除一个节点时,会将删除节点之前的所有节点 拷贝一份组成一个新的链,而将当前节点的上一个节点的next指向当前节点的下一个节点,从而在删除以后有两条链存在,因而可以保证即使在同一条链中,有一个线程在删除,而另一个线程在遍历,它们都能工作良好,因为遍历的线程能继续使用原有的链。因而这种实现是一种更加细粒度的happens-before关系,即如果遍历线程在删除线程结束后开始,则它能看到删除后的变化,如果它发生在删除线程正在执行中间,则它会使用原有的链,而不会等到删除线程结束后再执行,即看不到删除线程的影响。

    初始化

    构造函数的源码如下:

    public ConcurrentHashMap(int initialCapacity,
                                 float loadFactor, int concurrencyLevel) {
    
    //验证参数的合法性,如果不合法,直接抛出异常
            if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
    
    //concurrencyLevel也就是并发的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值。
            if (concurrencyLevel > MAX_SEGMENTS)
                concurrencyLevel = MAX_SEGMENTS;
            // Find power-of-two sizes best matching arguments
    
    //使用循环找到大于等于concurrencyLevel的最小2次幂数ssize,ssize就是Segment数组的大小,并记录一共向左按位移动的次数sshift。
            int sshift = 0;
            int ssize = 1;
            while (ssize < concurrencyLevel) {
                ++sshift;
                ssize <<= 1;
            }
    
    //令segmentShift = 32 - sshift,segmentMask=ssize - 1。由于ssize是二次幂数,所以segmentMask的各个二进制位都为1,目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。
            this.segmentShift = 32 - sshift;
            this.segmentMask = ssize - 1;
    
    //检查给的容量值是否大于允许的最大容量值,如果大于该值,设置为该值。最大容量值为static final int MAXIMUM_CAPACITY = 1 << 30;。
            if (initialCapacity > MAXIMUM_CAPACITY)
                initialCapacity = MAXIMUM_CAPACITY;
            
    
    //然后计算每个Segment平均应该放置多少个元素,这个值c是向上取整的值。比如初始容量为15,Segment个数为6,则每个Segment平均需要放置3个元素。
            int c = initialCapacity / ssize;
            if (c * ssize < initialCapacity)
                ++c;
            int cap = MIN_SEGMENT_TABLE_CAPACITY;
            while (cap < c)
                cap <<= 1;
            // create segments and segments[0]
    //创建一个Segment实例,当做Segment数组的第一个元素。
            Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                                 (HashEntry<K,V>[])new HashEntry[cap]);
            Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
            UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
            this.segments = ss;
        }
    

    传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个:

    initialCapacity表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;

    loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;

    concurrencyLevel表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。

    put操作

    put操作的源码如下:

    public V put(K key, V value) {
          Segment<K,V> s;
    //判断value是否为null,如果为null,直接抛出异常。
          if (value == null)
              throw new NullPointerException();
    //key通过一次hash运算得到一个hash值
          int hash = hash(key);
    
    //将得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j
          int j = (hash >>> segmentShift) & segmentMask;
          if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
               (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
              s = ensureSegment(j);
          return s.put(key, hash, value, false);
      }
    
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    unlock();
                }
                return oldValue;
            }
    

    获得segment位置:取key的hash值高位和(容量-1)&运算获取segment索引

    获得hashentry位置:hash和(table.length - 1)&运算获取HashEntry数组索引

    put:如果有相同key就覆盖,否则就加入到链表第一个位置

    get操作

    get操作的源码如下:

    public V get(Object key) {
            Segment<K,V> s; // manually integrate access methods to reduce overhead
            HashEntry<K,V>[] tab;
    
    //key进行hash确定应该去哪个Segment中取数据。使用Unsafe获取对应的Segment
            int h = hash(key);
            long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
            if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
                (tab = s.table) != null) {
    
    //hash值进行一次&运算得到HashEntry链表的位置,然后从链表头开始遍历整个链表(因为Hash可能会有碰撞,所以用一个链表保存),如果找到对应的key,则返回对应的value值,如果链表遍历完都没有找到对应的key,则说明Map中不包含该key,返回null。
                for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                         (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                     e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                        return e.value;
                }
            }
            return null;
        }
    

    get操作通过volatile和final来确保数据安全,是不需要加锁的。

    有些方法需要跨段(比如size()和containsValue()方法),它们需要锁定整个表而不仅仅是某个段,具体的实现方式是先不锁住查询,要是期间没有修改过就返回值,否则按顺序锁定所有段,然后进行操作,完毕后又按顺序释放所有段的锁。按顺序是很重要的,否则就有可能出现死锁。

    size操作

    size操作需要遍历所有的Segment才能算出整个Map的大小,方法是这样的:

    不加大锁遍历3次,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。

    如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment。

    源代码如下:

    public int size() {
            // Try a few times to get accurate count. On failure due to
            // continuous async changes in table, resort to locking.
            final Segment<K,V>[] segments = this.segments;
            int size;
            boolean overflow; // true if size overflows 32 bits
            long sum;         // sum of modCounts
            long last = 0L;   // previous sum
            int retries = -1; // first iteration isn't retry
            try {
                for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    sum = 0L;
                    size = 0;
                    overflow = false;
                    for (int j = 0; j < segments.length; ++j) {
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null) {
                            sum += seg.modCount;
                            int c = seg.count;
                            if (c < 0 || (size += c) < 0)
                                overflow = true;
                        }
                    }
                    if (sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return overflow ? Integer.MAX_VALUE : size;
        }
    

    containsValue操作

    containsValue操作采用了和size操作一样的想法:

    public boolean containsValue(Object value) {
            // Same idea as size()
            if (value == null)
                throw new NullPointerException();
            final Segment<K,V>[] segments = this.segments;
            boolean found = false;
            long last = 0;
            int retries = -1;
            try {
                outer: for (;;) {
                    if (retries++ == RETRIES_BEFORE_LOCK) {
                        for (int j = 0; j < segments.length; ++j)
                            ensureSegment(j).lock(); // force creation
                    }
                    long hashSum = 0L;
                    int sum = 0;
                    for (int j = 0; j < segments.length; ++j) {
                        HashEntry<K,V>[] tab;
                        Segment<K,V> seg = segmentAt(segments, j);
                        if (seg != null && (tab = seg.table) != null) {
                            for (int i = 0 ; i < tab.length; i++) {
                                HashEntry<K,V> e;
                                for (e = entryAt(tab, i); e != null; e = e.next) {
                                    V v = e.value;
                                    if (v != null && value.equals(v)) {
                                        found = true;
                                        break outer;
                                    }
                                }
                            }
                            sum += seg.modCount;
                        }
                    }
                    if (retries > 0 && sum == last)
                        break;
                    last = sum;
                }
            } finally {
                if (retries > RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        segmentAt(segments, j).unlock();
                }
            }
            return found;
        }
    

    注意

    • ConcurrentHashMap中的key和value值都不能为null,HashMap中key和value可以为null,HashTable中key不能为null。
    • ConcurrentHashMap是线程安全的类并不能保证使用了ConcurrentHashMap的操作都是线程安全的
    • ConcurrentHashMap的get操作不需要加锁,put操作需要加锁

    相关文章

      网友评论

        本文标题:ConcurrentHashMap

        本文链接:https://www.haomeiwen.com/subject/hhpglftx.html