美文网首页
concurrentHashMap源码分析

concurrentHashMap源码分析

作者: 无聊之园 | 来源:发表于2019-06-18 21:22 被阅读0次

初始化的是什么都没做,集合都这样,节省性能。

 public ConcurrentHashMap() {
    }

看put操作

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 注意,这里,put不允许key和value为null,这点和hashmap不一样
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 遍历node数组
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 前面都是一些局部遍历,没有并发问题
            // tab没有初始化,就初始化
            if (tab == null || (n = tab.length) == 0)
                // 初始化,看下文
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 因为,前面没有加锁,所以前面判定没有初始化,可能到了这里,就别别的线程初始化了,所以这里判定
        // 同时,while循环,自cas
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl这个变量,保证了可见性。这个变量的作用是,多个线程争取这
//个变量资源,挣到了就把这个变量cas变成-1,其他线程发现这个变量变成
//了-1,则自旋,循环获取这个资源。由于volatile保证了可见性,所以没有问题。
            // private transient volatile int sizeCtl;
          
          if ((sc = sizeCtl) < 0)
                // 如果sizeCtl < 0,则说明,资源被别人争抢到了,让出cpu资源
                Thread.yield(); // lost initialization race; just spin
            // cas改变成-1,SIZECTL是sizeCtl的内存地址
            // cas失败,则继续while循环
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 这里获取到资源,因为,这里还可能其他线程初始化了table,所以这里再次判定,别人初始化了,则break跳出while
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sc变成了数组扩容的临界值,n为16,sc为12
                      // 也就是说put第12个元素的时候就会扩容
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 初始化之后,释放资源,sc是扩容临界值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

总结:
1、initTable通过一个改变volatile变量,起到锁的作用。
2、初始化table长度为16,扩容临界值为12。

再回到putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 计算一下hash值,跟hashmap计算方式有一点不同,就是最高位置为0了,不过无所谓,反正hash值是多少都为所谓
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 前面分析到了这里,走完了这里之后,继续for循环
                tab = initTable();
            // 然后会走入这里
           // tabAt方法是,从tab数组的i这个位置的元素, (n - 1) & hash是取模运算
          // 可是,这里曾经让我很疑惑,因为,table数组是volatile,为什么不直接table[index]这样读呢
// 原因是,table是volatile,但是用了tab这个局部变量接收,而tab不是可见性的,也就是说,tab不会感知到数组的变化。
// 详情看[volatile 数组可见性问题](https://www.jianshu.com/p/0fa5451e9dd0)
// 但是其实这里,不保证可见性也没关系的,因为,下一步是cas替换,顶多替换失败,自旋重来,只是性能差了一点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 没有元素则cas替换table数组内的元素
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果数组这个的index下的元素,正在扩容,则帮助扩容
            // 这个扩容时候再说啊,move是-1,因为扩容的时候,会放一个假的node占位,表示这个node部分正在扩容。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                // 进入这里,说明table[index]有元素,所以下一步进行链表或红黑树添加元素
                V oldVal = null;
                // 先把table[index]这个元素锁住
                synchronized (f) {
                    // 这里再判定一次,因为,可能在这锁之前,有人把table[index]元素改了。
                    if (tabAt(tab, i) == f) {
                      // fh〉0 说明这个节点是一个链表的节点 不是树的节点
                      // 因为树节点是treebin节点,这个类的hashcode固定死了,是-2
                        if (fh >= 0) {
                            binCount = 1;
                        // 遍历链表节点    
                        for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 这个节点的key和put的key相同,则替换
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 添加到链表尾节点
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 是树节点
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // 调用treeBin的putTree方法
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                              // 放置不成功,则替换节点。
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 当操纵链表的时候,binCount 是遍历的链表的长度,当操作树的受,binCount被重置为2了
                if (binCount != 0) {
                    // 链表长度大于8则,树化
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果替换了元素,则把旧元素返回回去
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 元素添加进去了,size需要变,还可能需要扩容,这些东西都需要复杂操作保证线程安全性
// 这里面设计到了concurrenthashmap最难的部分,比较复杂
        addCount(1L, binCount);
        return null;
    }

看了个大概的流程,总结一下:
1、put操作。锁的是table的一个node节点。所以并发很高。
2、同时,大量的cas操作进行自旋。

看addCount代码

private final void addCount(long x, int check) {
        // countercell这个东西,当cas替换basecount的时候,basecount代表集合
// 的size,比如,再极其高的并发下,有很多个线程会cas替换basecount,自然
// 就会很多线程替换失败,所以,这个大佬就设计了这个玩意。

 // 还有一个LongAdder类,思想跟这里是一样,LongAddr的作用就是跟atomicLong作用差不多,但是没有atomicLong功能丰富,但是longAddr并发性能好一些。

//大概的原理是,分段锁的思想,把cas冲突,分段,也就是分成很多个countercell,然后各个线程会在自己的countercell中cas竞争,所以就把cas的压力分担下去了。

// 详细的源代码,暂时不分析,有点复杂的
        CounterCell[] as; long b, s;
          // counterCells 初始为null,所以会直接cas加1替换baseCount这个值
     // baseCount 是volatile修饰,存储了concurrenthashmap的size
    // cas失败则起用  CounterCell统计
      if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 这个暂时不分析
                fullAddCount(x, uncontended);
              //  这里为什么直接返回,不进行检查是否扩容呢?
          // 因为,concurrenthashmap,无法非常准确的知道size的大小。即使高并
// 发下,size超高了扩容临界值,也不会影响集合的使用。所以这里留给别的线
// 程去扩容,否则fullAddCount已经做了很多事情了,再去扩容的话,这一个put
//下来,消耗就有点大了
                return;
            }
            if (check <= 1)
                return;
            // 统计size,这个方法后面再说
            s = sumCount();
        }
// check >=0 则检查是否要扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
          // sizeCtl除了初始化table的时候用于争抢资源达到锁的效果之外(也就是sizeCtl为-1的时候)。
// sizeCtl还可能存储的是扩容的临界值,初始化为12(sizeCtl为正数)。
// sizeCtl还可能为非-1的负数,这个负数是在扩容的时候,sizeCtl用来统计多少
// 个线程正在扩容的,但是并非真的记载的多少个,而是各种运算的结果,比
// 如,只有1个线程正在扩容给,sizectl为-212454(这个是假设值)
   
   // 大于临界值则扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // 这个rs是通过位运算之后得到的一个很大的数,不管
                int rs = resizeStamp(n);
                // sc < 0则说明别的线程正在扩容
                if (sc < 0) {
                    // transferIndex 记载的是扩容还剩多少个坐标,是从右往左一批一批
// 扩容的,<0 则说明扩容完了,则break,nextTable是扩容新建的数组,=null也
// 说明扩容完了
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 这里统计扩容线程数,注意,这里是负数,而且并非是直接的个数就代表多少个线程
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        // 一起去扩容,下面会分析
                        transfer(tab, nt);
                }
                // 这里cas成功了,则说明这是第一个扩容的线程
              // 看,sizectl并非是真正的线程个数的数,而是经过了位运算,得到的一个很大的负数
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                // 扩容之后统计数,因为这是在while循环里,如果这里统计的个数,while循环判断后,发现还要扩容,则还会扩容
                s = sumCount();
            }
        }
    }

总结:
1、addCount就是统计数量,以及扩容。
2、但是,因为,高并发下,concurrentHashmap并非那种直接lock住全部的那种强一致性的集合,所以,统计数量,会大量的cas和复杂的操作,而且为了减轻cas的压力还引入了CounterCell的机制。
3、concurrentHashMap,size+1的时候,如果不引入conterCell,那么只能锁住size字段,然后单线程操作size,其他线程只能等待,
这样其他线程等待是一个很浪费的操作,只要我们不获取size,就不需要实时知道size大小,也就是说,size不需要保证强一致性。
所以,可以想到的办法是:分段锁的思想,把多线程操作size,分担成多块,每块多个线程自己内部竞争+1,最后,再合并起来,
这样操作size就不需要单线程操作,其他所有线程等待了。
看源码,很大的好处就是学习这种思想。
3、扩容是多线程一起扩容的。

接下来说一下扩容。

先大概说一下扩容的原理:
1、会把table数组,分成几个stride片(不一定叫stride,反正就是分成几片)。
2、然后按片进行扩容,一个线程先领一片,执行完了再领一片,其他线程如果发现数组正在扩容,则帮助扩容,也过来领一片,知道最后,全部扩容完。
3、分片,是从数组的右边往左边分的,领片扩容也是如此。
4、扩容锁的时候,也只是会锁正在扩容的那个node。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
         // 分片的每片大小是多少,ncpu是所有cpu的线程数,如果cpu是单核单线程的,则一片的大小就是整个table的大小,因为无法利用多线程增强性能。
// 如果是多核,则stride是,n / 8 / 线程数
// 如果分片之后,每个分片太小了,小于最小分片16,则最小分片就是16,分的太小没有意义
         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // nextTab 是扩容新建的数组,=null则说明这是第一个扩容线程,前面没有新建扩容数组 
       if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 扩容,大小扩容了2倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
              //transferIndex 记载的是没有被线程认领的片的坐标,记录的是从右往左的index,初始时数组的长度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // 这个节点就是用于占位的,hashcode为move也就是-1,表示这个节点正在扩容
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 控制下面while循环的,为false则跳出循环
        boolean advance = true;
        // 这个变量,使得,扩容之后,还会再从右往左检查一遍,是否所有的节点都用move节点标识了,有没有被忽略的。
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                // 初始i=0,bound=0,所以,自然不成立
                // 等后面i变成了正在线程正在扩容的分片中的node节点的下标,则 --i >= boud,则说明这个分片的扩容还没有完成,则跳出while循环,继续扩容
                if (--i >= bound || finishing)
                    advance = false;
              // transferIndex < 0,则说明扩容完了,但是并不会马上就跳出这个方法,还会检查一遍,检查完了之后,finishing置为true,则跳出方法
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 这里是线程领分片,transferindex代表还有多少没有分片的下标
               // 不管是初始扩容线程,还是帮助扩容线程,第一次进入这个方法,会进入这里领取分片,线程扩容了一个分片之后,前面 --i >= boud不成立,transferindex < 0也不成立,还会进入这里获取下一个分片
                 else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 跳出前面的while循环,则这里下去会进行真正的扩容
             // i < 0则说明扩容完了,前面while循环,transferindex <=0会讲i置为-1
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 只有finishing为true,才算真正扩容完了,会return跳出方法
                if (finishing) {
                    // 扩容完了,将这些东西重置一下
                    nextTable = null;
                    table = nextTab;
                    // sizeCtl又记录了扩容的临界值:n / 2 - 0.5n = 1.5n
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 第一i < 0, finishing没有被置为true,会进入这里
              // 这里把i = n,然后i 又会一个一个遍历每个节点,检查一遍是否每个节点都是move
            // 并且这里会把finishing变为true。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果这个节点是null,则置为moved节点,不让别人再put了
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 这个节点是move,则说明别的线程正在扩容这个节点,则什么都不做,找下一个节点
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 这个节点不为null,且没有其他线程扩容,则进入这里
                // 锁住这个节点
                synchronized (f) {
                    // 这里再判断一下,如果锁之前其他线程改变了这个节点,则跳出继续循环,什么都不做
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // fh >= 0 则是链表
                        if (fh >= 0) {
                            // 跟hashmap一样,计算节点hashcode的高位是否为1,然后,根据高位是否为1,梳理出两条链表,一条是不需要改变坐标的,一条是需要坐标 + length的。
// 但是,这里有一点要说明:放到新链表得都是new Node节点,而不是改变Node节点得指针。这么做得目的是,不改变原来得节点,使得get方法无锁直接能获取到值。否则,无锁get会获取到null值。
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                  // 每一个节点都new一个新节点
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
// 把这两条链表设置到新数组上去,新数组的i + n这个坐标,只有i这个节点扩容
// 之后的元素才可能放上去,所以没有并发问题
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            // 把旧数组的这个node节点,替换成move节点,表示正在扩容
                            setTabAt(tab, i, fwd);
                          // advance = true,继续循环,找下一个节点
                            advance = true;
                        }
                        // 树节点也一样
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

transfer这个方法,真的是内容比较多。
总结一下:
1、扩容是多线程的,但是不是开启多线程,而是利用cpu的多线程特性,如果其他线程操作某一个节点,发现正在扩容,则帮助扩容。
2、数组小与16不会分片多线程扩容,因为数组小,多线程没必要。
3、扩容,只会锁正在扩容的那个节点。

然后,回过头来看前面put的时候,发现这个节点是move,则帮助扩容的方法。
除了if判断控制一下并发,其实就是调用transfer帮助扩容。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                      // 直接调用transfer方法
                    // nextTab是move节点类fwn的成员遍历,new fwn的时候传过来的
                     transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

再看一下树化方法,前面说了,put之后,链表大于8,则会调用这个方法。

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            // 会发现,链表大于8,还要判断一下,table的lenght是否>64,如果小
 //于,则还不会树化,之后扩容,这个机制跟hashmap一样。看tryPresize分析
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                // 锁住这个节点,树化不分析
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
private final void tryPresize(int size) {
      // c是准备扩容的大小,1.5size + 1,然后向上取2的n次方  
     // 但是前面看transfer方法,其实真正扩容,只会扩容成原来数组的两倍
      int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // sizeCtl记载了扩容临界值
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            // 这里数组没有初始化则进行初始化。因为putAll方法,还没有初始化,会直接调用这个方法
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
             // 这里以下都是if控制并发,最后调用的就是transfer方法。
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

到这里,put相关的方法,分析的差不多了。下面分析get。

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        // table已经初始化了,并且这个hash的位置的元素不为null
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 这个位置的node节点的hash值正好等于get的key的hash值
            if ((eh = e.hash) == h) {
                // 等于则返回
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 这个节点的的hash值 < 0,则说明,要么eh = move(-1)正在扩容给,要么 eh = -2(treebin节点)是一个树节点
            else if (eh < 0)
                // 树节点就调用treeBin的find查找,传入hash,和key
               // 扩容则调用fwn类的find查找   
             return (p = e.find(h, key)) != null ? p.val : null;
            
            // e.next 1= null则说明链表下有元素,遍历链表
             while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

总结:get方法全程没有锁。
因为:
1、put方法不扩容的时候,直接锁的node节点,然后直接往链表尾节点添加元素,并不会印象get方法。node节点的next指针是volatile的,保证了可见性,所以一添加,get就会看到。
2、扩容得时候,会new 新Node放到新数组上去,不会改变旧Node得next指针,所以,扩容得时候,也不会影响到get操作。

看一下,fwn的find方法。发现是找的是新nextTable数组

Node<K,V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
         // 可以发现,fwn找的是新nextTable数组    
        outer: for (Node<K,V>[] tab = nextTable;;) {
                Node<K,V> e; int n;
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                for (;;) {
                    int eh; K ek;
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    if (eh < 0) {
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else
                            // 如果是树节点,则调用树节点的find方法
                            return e.find(h, k);
                    }
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }

看一下树的find方法,一系列树的操作。不分析了。

final Node<K,V> find(int h, Object k) {
            if (k != null) {
                for (Node<K,V> e = first; e != null; ) {
                    int s; K ek;
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        e = e.next;
                    }
                    else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                 s + READER)) {
                        TreeNode<K,V> r, p;
                        try {
                            p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));
                        } finally {
                            Thread w;
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)
                                LockSupport.unpark(w);
                        }
                        return p;
                    }
                }
            }
            return null;
        }

再看一下size方法

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
final long sumCount() {
         // 统计,basecount和各个countercell的value和
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

总结一下:
1、get操作,除了树节点,都是无锁的。树节点有cas。put操作都不会影响到get。
2、put操作,锁的是node节点。扩容操作锁的也是node节点。
3、concurrentHashMap通过move节点使得get操作无锁。
3、扩容采用的是多线程分片机制。其他和hashmap一样。
4、concurrenthashmap是弱一致性,并非强一致性。

concurrenthashmap得弱一致性体现在:
1、iterator得时候:iterator过程中,元素可以发生变动,使得跟你期望得不一样。
2、clear: clear一个数组index之后,这个数组index马上被别的线程put了元素,导致,clear之后,table还有元素。
3、get操作,get得一瞬间,把这个元素删掉了。或者 get返回null得一瞬间,添加了这个元素
4、resize:并非非常准时得扩容,比如,CounterCell哪里fullAddCount了之后。
5、size:统计得也并非实时得size,别得线程put了,但是size还没来得及+1。
总之,就是,因为get这种读操作,没有加锁,导致很多东西都是弱一致性得。

相关文章

网友评论

      本文标题:concurrentHashMap源码分析

      本文链接:https://www.haomeiwen.com/subject/mdqnfctx.html