ConcurrentHashMap 1.7

作者: virtual灬zzZ | 来源:发表于2022-01-27 11:27 被阅读0次

ConcurrentHashMap 采用的是分段锁的机制来保证并发安全的,个中有着Segment、HashEntry等重要属性。

ConcurrentHashMap会有多个segment,每个segment性质都是一把锁。

Segment继承自ReentrantLock,它就是一把锁,里面有个final Segment<K,V>[] segments;这样的数组,即使用Segment数组来进行并发安全的,后续会讲,这里不展开。

而HashEntry类似于HashMap的Entry,它们都有key,value,next、hash这四个属性,其中HashEntry中的value、next都是使用volatile修饰的,为其增加内存可见性的功能。

代码结构一览

结构模型

重要属性

segment

    scanAndLockForPut中自旋循环获取锁的最大自旋次数
    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    存储着一个HashEntry类型的数组
    transient volatile HashEntry<K,V>[] table;

    存放元素的个数,这里没有加volatile修饰,所以只能在加锁
或者确保可见性(如Unsafe.getObjectVolatile)的情况下进行访问,不然无法保证数据的正确性
    transient int count;

    存放segment元素修改次数记录,由于未进行volatile修饰,所以访问规则和count类似
    transient int modCount;

    当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
    transient int threshold;

    /负载因子
    final float loadFactor;

hashEntry

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
        //只列出主要部分

源码:

初始化

concurrencyLevel并发级别 默认16,loadFactor负载因子默认0.75,initialCapacity默认初始化大小是16

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        //初始化segment[0],后续其他segment下标会以它的地址为基准
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        //SBASE,即Segment[0]的地址,内存偏移量
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

查看其初始化代码,最终还是调用 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)这里ssize是决定segment数组大小的,ssize又是由concurrentLevel决定的,由上面的代码可知,ssize是等于靠近concurrentLevel的最小的2次幂数值。如果concurrent=15,那么它就是16,如果是31就是32,33就是64。

sshift记录就是ssize的左移次数。

这两参数后续有用,用于put数据
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;

cap就是Segment底下的数组长度,它是2次幂数,threshold也是cap * loadFactor,和Hashmap相近。

从源码最后可以看,这里会初始化Segment[0],用作参考,其初始化后还用Unsafe类,注释写明: ordered write of segments[0]。

put

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
       //进行hash扰动,得到hash值
        int hash = hash(key);
       // 位运算得到key到底是属于哪个下标
        int j = (hash >>> segmentShift) & segmentMask; 
       //s就是所在的segment[j],可以看到根据SBASE+偏移量得到了内存地址从而拿到segment[j]
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
           //如果segment[j]为null,就初始化它
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

可见看到,value是不能为null,否则扔异常,这点和HashMap有区别,需要注意。将key的进行hash,得到其hash值,再根据这个hash值进行位运算,得到下标index,再根据index偏移内存地址得到segment[j],如果这个segment[j]==null,就创建它,执行ensureSegment(j);

 private Segment<K,V> ensureSegment(int k) {
       //k 就是传进来的j,下标index
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
       //注意这个是getObjectVolatile,保证读线程的内存可见性的,确保获取是最新,外面的只是getObject         
      //如果该segment[k]等于空,就初始化,它的threshold、hashEntry[]、capacity都是和segment[0]一样的,所以才说segment[0]是用于后续index创建做参考的
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
          //这里再一次判断,为求正确初始化,所以多次检查,下面还是自旋+cas的方式
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

这个方法确保该segment[j]创建成功,但这里需要注意的是,seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u),这里使用的是UNSAFE.getObjectVolatile,而不是外面方法所用的UNSAFE.getObject,多了个volatile,保证多个线程之间共享数据的可见性,保证在主内存中拿数据,而UNSAFE.getObject是不保证多线程之间的可见性的。这里是防止其他线程已经初始化好而又重新初始化,覆盖掉数据。这里还是多次检查,因为是在无锁状态。

创建好之后就返回该segment[j],执行return s.put(key, hash, value, false);

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //拿到锁,node是空,拿不到锁之后说,这里先不展开,
            //总之拿不到锁,
            //① 要么就是new Node等到拿到锁跳出来进行头插法;
            //② 要么就是key是相等的,拿到锁出来替换
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
               //熟悉的算法获取下标index
                int index = (tab.length - 1) & hash;
               //根据index计算偏移量,得到这个segment下的hashEntry[index]的头结点
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //当前结点不为空,看是不是相同key,
                    /*① 如果是,onlyIfAbsent是false就是替换返回null,如果onlyIfAbsent是true就返回旧值,
                    因为有个putIfAbsent方法,如果缺失才会补位。回归正题,之后结束并返回,最后解锁。*/
                   /*② 如果不是,接着遍历这个hashEntry[index]的单链表*/ 
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    /*当前节点是空,先说拿到锁的,node==null,新建一个hashEntry 在这个hashEntry[index]的单链表的头部挂上*/
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                           //目前先分析node==null的情况,这里fitst=e,e成了next,头插法
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //大于threshold阈值就扩容
                            rehash(node);
                        else
                           //该hashEntry[index]所在的内存偏移地址就是new HashEntry
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

获取HashEntry[i]的内存偏移地址
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }

将新的hashEntry==e ,设置在HashEntry[i]的内存偏移地址上的值,即头部
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

总的流程就是:权当是首次就拿到了锁(已经确定是哪个segment了),根据key的hash计算得到hashEntry是哪一个,记hashEntry[index],
① 如果上面没有数据,要么就new HashEntry挂上去,要么相同key替换。
②如果有数据,只有一个数据相同就替换,多于一个那就是单链表,还是遍历相同就替换,不同就是头插法新增。

偏移量模型图:

segment[idx]和 HashEntry[index]的区别如下:


接着说首次拿不到锁的

进入 scanAndLockForPut(key, hash, value); 方法

HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);

//传入hash,计算index,获取这个hashEntry[index]的headNode内存偏移地址
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
        HashEntry<K,V>[] tab;
        return (seg == null || (tab = seg.table) == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    }

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
           //entryHash方法上面说过,获取这个hashEntry[index]的headNode内存偏移地址
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node、
            //拿不到锁死循环
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    /* hashEntry[index]的headNode是null,就是新建一个hashEntry,继续循环,此时retries=0 ,进入else */
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                   /* headNode不是null,但key和传入的一样,retires=0,继续循环;
                  如果不一致,retires不动,继续下一个,无非就是挨个查下这个单链表有无相同key的。
                 知道找不到,就去外面的else */
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                /* 如果重试次数 大于 最大重试次数,MAX_SCAN_RETRIES单核和1,多核为64,
                   如果64次都拿不到锁,就lock,入同步队列,这是很极端的情况。*/
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
               /* 重试次数还没有大于最大重试次数,如果retries为偶数,就重新获取HashEntry链表的头结点*/
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
             //拿到锁就出去,node无非就是null,或者newNode,null是遇到相同key,否则就是newNode
            return node;
        }

简述一下流程,里面还是尝试获取锁,如果拿锁失败,就执行以下动作,看看这个hashEntry[index]:
① 如果上面没有数据,要么就new HashEntry,之后不断重试尝试拿锁,拿到就带着 newNode 出去。
②如果有数据,遍历是否有相同key:

  • 有:啥都不干,等待拿锁;
  • 没有:还是newNode,等到拿锁出去。

出去后还是还是回到原来上面的代码,这里再贴一次。记得返回的Node无非就是 null、或者 new HashEntry

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //拿到锁,node是空,拿不到锁之后说,这里先不展开,
            //总之拿不到锁,
            //① 要么就是new Node等到拿到锁跳出来进行头插法;
            //② 要么就是key是相等的,拿到锁出来替换
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
               //熟悉的算法获取下标index
                int index = (tab.length - 1) & hash;
               //根据index计算偏移量,得到这个segment下的hashEntry[index]的头结点
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //当前结点不为空,看是不是相同key,
                    /*① 如果是,onlyIfAbsent是false就是替换返回null,如果onlyIfAbsent是true就返回旧值,
                    因为有个putIfAbsent方法,如果缺失才会补位。回归正题,之后结束并返回,最后解锁。*/
                   /*② 如果不是,接着遍历这个hashEntry[index]的单链表*/ 
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    /*当前节点是空,先说拿到锁的,node==null,新建一个hashEntry 在这个hashEntry[index]的单链表的头部挂上*/
                    else {
                        if (node != null)
                            //node不是null,还是头插法
                            node.setNext(first);
                        else
                           //node==null的情况,new HashEntry,这里fitst=e,e成了next,头插法
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //大于threshold阈值就扩容
                            rehash(node);
                        else
                           //该hashEntry[index]所在的内存偏移地址就是new HashEntry
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

出来了还是先拿到HashEntry[index]的headNode,相同key就替换,不同就直接else,headNode是不会空的,因为之前都抢锁失败了,所以早就被别的线程设置好了,所以直接到这里,执行头插法即可。

 if (node != null)
     //node不是null,还是头插法
     node.setNext(first);
else
    //node==null的情况,new HashEntry,这里fitst=e,e成了next,头插法
    node = new HashEntry<K,V>(hash, key, value, first);

扩容

如果当前segment元素数量操作阈值且数组长度小于最大值的时候,就要进行扩容,记住这个操作,是已经拿到锁的了。

/**
         *扩容方法
         */
        private void rehash(HashEntry<K,V> node) {
           
            // 旧的table
            HashEntry<K,V>[] oldTable = table;
            // 旧的table的长度
            int oldCapacity = oldTable.length;
            // 扩容原来capacity的一倍
            int newCapacity = oldCapacity << 1;
            // 新的阈值
            threshold = (int)(newCapacity * loadFactor);
            // 新的table
            HashEntry<K,V>[] newTable =
                    (HashEntry<K,V>[]) new HashEntry[newCapacity];
            // 新的掩码
            int sizeMask = newCapacity - 1;
            // 遍历旧的table
            for (int i = 0; i < oldCapacity ; i++) {
                // table中的每一个链表元素
                HashEntry<K,V> e = oldTable[i];
                if (e != null) { // e不等于null
                    HashEntry<K,V> next = e.next; // 下一个元素
                    int idx = e.hash & sizeMask;  // 重新计算位置,计算在新的table的位置
                    if (next == null)   //  Single node on list 证明只有一个元素
                        newTable[idx] = e; // 把当前的e设置给新的table
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e; // 当前e
                        int lastIdx = idx;          // 在新table的位置
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) { // 遍历链表
                            int k = last.hash & sizeMask; // 确定在新table的位置
                            if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
                                lastIdx = k;    // 记录变化位置
                                lastRun = last; // 记录变化节点
                            }
                        }
                        // 以下把链表设置到新table分为两种情况
                        // (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
                        // (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
                        //     ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
                        //      下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            // 处理扩容时那个添加的节点

            // 计算位置
            int nodeIndex = node.hash & sizeMask; // add the new node
            // 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
            node.setNext(newTable[nodeIndex]);
            // 设置位置
            newTable[nodeIndex] = node;
            // 新table替换旧的table
            table = newTable;
        }

流程还是创建一个newTab,大小为oldTab的2倍,之后转移数据,遍历oldTab,如果oldTab的headNode为null,即它只有一个元素,计算好基于newTab的hash,挂上去newTab[index]的headNode即可。多于一个的话,迭代找到和headNode不同的index,其实这里仅仅会分为2种index,1种是oldTab的原位置,另1种是oldTab位置+oldTab.length,和hashmap1.8是何其相似,就像高低位单链表一样。,最后将newNode的key计算好index,添加进HashEntry即可。

这里展开说一下,2个问题:

  1. oldTab的headNode为null,即它只有一个元素,计算好基于newTab的hash,挂上去newTab[index]的headNode即可,为什么?

  2. 为什么要记录lastRun、lastIdx,最后将不是lastRun的节点遍历,头插法连起来?

首先,我们需要明确一下,这里不同于hashmap1.7,扩容的时候,oldTab的Node中的key是需要重新调用用hash()方法来获得新的hash。concurrentHashMap直接就是用初始已经设置好的hash属性来进行计算index的,即hash & (newTab.length -1)。

试想一下,无论oldTab.length 还是 newTab.length ,它们都是2次幂,我们就打个比方,oldTab.length = 16 , 那么它的二进制数就是:0000 1111 ,低4位全是1,它的table就只有16个栅位,所以它低4位就是用来参与运算,计算出index的,同理newTab也是,不过它是 0001 1111 ,它的table有32个栅位,它是用低5位来参与运算的 。

我们的HashEntry的hash是不变的,比如它在oldTab的位置是9,那么它低4位用二进制表示就是1001,第5位 不是0 就是 1,而现在 (newTab.length -1) = 0001 1111 ,进行 & 运算,要么结果还是9,要么就是9 + 16 = 25。

所以啊,现在回答第1个问题:假如HashEntry[] = 2,扩容后 hashEntry[] = 4 ,原本index=0的元素,要么就是分配到index=0,要么分配到index=2,而当前index上只有1个元素, 所以直接挂就行了,根本不会出现覆盖的可能。

关于第2个问题:这时候因为不是只有一个headNode,而是单链表了,按照前面说的分为高低链,headNode的位置Idx是一类,lastIdx是另一类。lastRun是另一类的最后一个节点,这时候两类的头结点都找出来了,并赋予到各自在newTab[index]newTab[index+oldTab.length] 中,之后从该HashEntry的头结点开始,除去lastRun,进行头插法转移数据。这样应该效率会提高,提前找出了各自在newTab中的位置。

for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
       V v = p.value;
       int h = p.hash;
       int k = h & sizeMask;
       HashEntry<K,V> n = newTable[k];
       newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}

get

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

没有加锁,效率高
注意:get方法使用了getObjectVolatile方法读取segment和hashentry,保证是最新的,具有锁的语义,可见性
分析:为什么get不加锁可以保证线程安全
(1) 首先获取value,我们要先定位到segment,使用了UNSAFE的getObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment.
(2) 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table.
(3) 然后我们获取具体的hashentry,也时使用了UNSAFE的getObjectVolatile具有读的volatile语义,然后遍历查找返回.
(4) 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可.

size

public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                  // RETRIES_BEFORE_LOCK = 2,从-1开始,重复操作3次,进行修改次数modCount的判断
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

//通过下标 j 获得 segment[j] 的内存偏移地址
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
        long u = (j << SSHIFT) + SBASE;
        return ss == null ? null :
            (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
    }

这里有个重要的属性modCount用于判断,modCount在replace、put、remove、clear方法中都会有(++modCount)的操作,表明对segment做出过修改。遍历segment[],尝试3次,如果modCount都没发生变化,即证明尝试阶段没有线程修改这个concurrentHashMap,就不用加锁了,能提高性能。最后能得出concurrentHashMap的总size。

执行流程
(1) 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum.
(2) 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环.
(3) 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环.
(4) 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size.(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞).

总结

  1. ConcurrentHashMap是如何保证线程安全的?
  • 从读(get)的层面,利用Unsafe.getObjectVolatile保证取到的是最新的Segment&HashEntry<K,V>[i],保证写线程对读线程的可见性,从而保证线程安全
  • 从写(put)的层面,利用ReentrantLock 可重入锁保证每次只有一个线程对Segment[i]做数据插入,从而保证了线程安全。
  • 从扩容(rehash)的层面,因为只有put的时候才会调用到rehash,同样利用ReentrantLock 可重入锁保证线程之间的互斥性,从而保证线程安全。
  • 从计数(size)层面,put的时候进行自增,统计的时候先尝试不加锁统计,当计数期间结果改变的时候再采用ReentrantLock 可重入锁保证统计期间其它线程无法更改数据,从而实现线程安全。
  1. ConcurrentHashMap什么情况下才会触发扩容?
  • 当前Segment[i]的元素数量超过阈值(负载因子*初始数量),且数组长度小于最大长度时
  1. 是否支持并发扩容?
  • 不支持,扩容的时候定位到相同Segment,准备做数据更改的线程会阻塞。
  1. 扩容的时候是否支持并发读写?
  • 扩容的时候支持并发读。
  • 扩容的时候不支持并发写(同一个Segment),此时其它往这个Segment写数据的线程会阻塞。

参考:
ConcurrentHashMap 源码浅析 1.7
并发编程之ConcurrentHashMap源码解读-1.7

相关文章

网友评论

    本文标题:ConcurrentHashMap 1.7

    本文链接:https://www.haomeiwen.com/subject/nbewhrtx.html