ConcurrentHashMap 1.8

作者: virtual灬zzZ | 来源:发表于2022-01-26 20:20 被阅读0次

1.7版本的ConcurrentHashMap采用的是分段锁的思想,提高了锁的数量,提高了并发的特性,但是也有其局限性,例如就是并发的数量也就是锁的数量是不可改变的等;

而1.8版本的ConcurrentHashMap其实也是采用了多锁的思想,不过在1.8中没有了segments这些东西了,每次锁住的数组中的一个元素或者桶(其实也就是数组或者树的头结点),然后锁也和1.7发生变了,使用的是Synchronized锁,1.8中的锁是随着数组的长度发生变化的,锁是每个Node的headNode对象作为monitor,提升了并发的数量的灵活性,还有就是1.8的数据结构也发生了一些变化,采用的是数组+链表+红黑树(链表到达阈值会树化)。

模型

重要属性

Node(普通Node)

val 和 next 是volatile 修饰的,注意!

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

TreeNode(用来构建TreeBin,红黑树的节点)

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

TreeBin(存放红黑树的首节点和根节点)

 static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; /

ForwardingNode(占位节点,插入链表头部,表示正在被转移)

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

重要的属性

/** node数组的最大容量 2^30 */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

   /** 默认初始化值16,必须是2的冥 */
    private static final int DEFAULT_CAPACITY = 16;

   /** 虚拟机限制的最大数组长度,在ArrayList中有说过,jdk1.8新引入的,需要与toArrar()相关方法关联 */
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

   /** 并发数量,1.7遗留,兼容以前版本 */
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /** 负载因子,兼容以前版本,构造方法中指定的参数是不会被用作loadFactor的,为了计算方便,统一使用 n - (n >> 2) 代替浮点乘法 *0.75 */
    private static final float LOAD_FACTOR = 0.75f;

    /** 链表转红黑树,阈值>=8 */
    static final int TREEIFY_THRESHOLD = 8;

    /** 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,
     *  <=UNTREEIFY_THRESHOLD 则untreeify(lo))
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /** 链表转红黑树的阈值,64(map容量小于64时,链表转红黑树时先进行扩容) */
    static final int MIN_TREEIFY_CAPACITY = 64;

/** 下面这三个和多线程协助扩容有关 */
    
   /** // 扩容操作中,transfer这个步骤是允许多线程的,这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer
    //     (不足16就按16算,多控制下正负号就行)
    private static final int MIN_TRANSFER_STRIDE = 16;

    /** 生成sizeCtl所使用的bit位数 */
    private static int RESIZE_STAMP_BITS = 16;

    /** 参与扩容的最大线程数 */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**  移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,相反方向移位后能够反解出生成
     戳 */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /*
     * Encodings for Node hash fields. See above for explanation.
     */
    static final int MOVED     = -1; // 表示正在转移
    static final int TREEBIN   = -2; // 表示已经转换为树
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    /** 可用处理器数量 */
    static final int NCPU = Runtime.getRuntime().availableProcessors();
        
         /** 用于存放node数组 */
    transient volatile Node<K,V>[] table;
        
        /**
     * baseCount为并发低时,直接使用cas设置成功的值
     * 并发高,cas竞争失败,把值放在counterCells数组里面的counterCell里面
     * 所以map.size = baseCount + (每个counterCell里面的值累加)
     */
    private transient volatile long baseCount;
        
        /**
     * 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
     * 当为负数时:-1代表正在初始化,-N就代表在扩容,-N-RS-2就代表有多少个线程在协助扩容
     * 当为0时:代表当时的table还没有被初始化
     * 当为正数时:表示初始化或者下一次进行扩容的大小
     */
    private transient volatile int sizeCtl;
        
        /**
     * 通过cas实现的锁,0 无锁,1 有锁
     */
    private transient volatile int cellsBusy;


    /**
     * counterCells数组,具体的值在每个counterCell里面
     */
    private transient volatile CounterCell[] counterCells;

源码分析

初始化

   //没有设置任何属性
    public ConcurrentHashMap() {
    }

   //capacity 为2次幂
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

可见,初始化时,如果没有传入任何参数,它根本不会设置capacity、threshold、concurrentLevel的,如果有传入capacity都是转化为2次幂,具体初始化在put,达到延迟 初始化的目的。

put

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
       //key 、value都不能是null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
               //初始化table
                tab = initTable();
            //从主内存中获取tab[idx]的值,如果是空(即headNode为null),就new Node并使用cas将其挂上去,cas竞争失败,出去for循环继续自旋
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //扩容,先不展开
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //无需扩容
            else {
                V oldVal = null;
               //使用tab[index]的headNode来做monitor
                synchronized (f) {
                   /*这里需要判断,因为remove也是以headNode为锁的,如果remove先抢到锁,删除了
                     headNode,这里重新获得锁后就不是接上,而是顶替掉 */
                    if (tabAt(tab, i) == f) {
                       //普通Node的hash值为key的hash值大于零,而ForwardingNode的是-1,TreeBin是-2
                        if (fh >= 0) { 
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //遍历单链表,如果key相同就替换。
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                //都找不到,就尾插法入单链表
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果属于红黑树节点,就替换或加入
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                /*对table[index]做出过新增或替换  非headNode ,bitCount都不等于0,
                 更重要的是,只有单链表遍历,bitCount才会累加的,操作单链表的时候,bitCount 初始值是 1,
                通过累加后,累加数bitCount就是等于遍历过的非空Node,如果超过8个,就会达到树化的第一个标准,
               treeifyBin()方法里头还有个条件,就是 总的tableSize要不少于64个 */
                if (binCount != 0) {
                    //TREEIFY_THRESHOLD = 8;  
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

//根据 i 算出的内存偏移量。从主内存中获取tab[i] 的Node值(即最新的)
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

//根据 i 算出的内存偏移量,如果tab[i] == c, 使用cas将Node V 更新为tab[i] = v
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

//根据 i 算出的内存偏移量 ,将Node V 将强制更新到 tab[i] 的内存偏移位置,更新到主内存,令其他线程能看到
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

put的时候,首次是table还没有初始化,所以调用initTable()初始化

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //sc==sizeCtl 是2次幂,如果为空,默认16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //sc变为 0.75 * capacity
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

初始化,如果table为null,就创建。如果没有传入capacity,就默认16,sizeCtl就是0.75 * capacity,它就是threshold。这里用了cas和自旋,防止多个线程同时创建。这里需要清楚的是 cas 是将SIZECTL 置为 -1,抢不到锁的继续while,判断SIZECTL < 0 ,就让出时间片了 Thread.yield()重新竞争cpu时间片,这里应该是尽量避免过于死循环while。

正常插入,通过key计算出hash,定位到对应的table[index],通过内存偏移地址得到其在主内存中的最新值,如果是null,线程通过cas将其挂上去。竞争失败的线程会跳出去继续for自旋,使用尾插法进行插入。

如果该table[index]不是null,继续执行下去,如果不是扩容(扩容后面详细说),使用synchronize关键字锁定该table[index]的headNode,让它来做monitor,查看headNode究竟是commonNode还是treeNode,根据其hash值,如果是>0,就是commonNode,-2就是TreeNode,判断好后,就能替换或尾插法增加,执行完之后就跳出synchronize范围了,根据bitCount(遍历单链表的非headNode、非空NextNode个数),再看看单链表的长度是不是大于8,Node总数是否不少于64,之后在判定是否需要树化操作,并添加计数addCount(1,bitcount);

put流程

1)我们可以通过源码判断key和value不允许为null。
2)需要判断table有没有初始化,没有调用initTable初始化,然后接着循环。
3)判断key的hash(调用spread方法)的位置有没有值,证明是第一个,使用cas设置,为什么cas,可能不止一个线程。
4)判断当前线程的hash是不是MOVED,其实就是节点是不是ForwardingNode节点,ForwardingNode代表正在扩容,至于为什么会是ForwardingNode,这个在扩容的方法里面再讲,如果是ForwardingNode节点就协助扩容,也就是当前也去扩容,然后扩容完毕,在执行循环,协助扩容执行helpTransfer方法。
5)如果不是扩容、table也初始化了和hash位置也有值了,那证明当前hash的位置是链表或者树,接下来锁住这个节点,进行链表或者树的节点的追加,如果存在相同的key,就替换,最后释放锁。
6)判断链表的节点数,有没有大于8,满足就树化,调用treeifyBin方法,这个方法会在树化前判断大于等于64,没有就扩容,调用tryPresize方法,有就树化。
7)修改节点的数量,调用addCount方法。

计数addCount(long x, int check)

每次put完之后,都会执行此方法,addCount(1L,bitcount),
check 默认值是0:

  • 等于0时,代表替换了在为null的headNode新增了节点
  • 不等于0时:check等于2代表了树,其它代表了单链表

这里需要先了解一下baseCount和counterCell[]到底有什么用,其实它们都是为了计算concurrentHashMap的size的。

concurrentHashMap的大小 = baseCount + 遍历counterCell[]中各个item的value
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

CounterCell的值value是用当前线程算出来的

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // counterCells[]等于null,当前线程尝试使用cas更新baseCount失败,成功那个线程已经更新了baseCount + 1
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 在counterCells没有初始化,或者counterCell[index]是null(index计算与当前线程有关),或者尝试cas更新当前线程的CounterCell失败时;cas成功的话当前线程的CounterCell的value + 1
            // 调用fullAddCount更新
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }

        // check >= 0,新加入一个值,是headNode
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // s代表了 现在map的数据量
            // sc= 12 ,证明刚刚初始化,没有进行扩容
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null && // 当前容量大于sc,table已经有值,table的cap小于最大cap
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n); // 这一步不好理解,
                // Integer.numberOfLeadingZeros(n) 其实就是最高位前面有多少个0,n代表table的长度
                // | (1 << (RESIZE_STAMP_BITS - 1)) 2^15 二进制16位,第16位1,其余15位0
                // 其实就是相加
                // 表示正在扩容
                if (sc < 0) {

                    // sc >>> RESIZE_STAMP_SHIFT  // 扩容结束
                    // 只有最后一个线程在扩容
                    // sc == rs + MAX_RESIZERS  达到最大数
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // sc 加1,表示有一个线程,协助参加了扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 表示没有正在进行扩容,开始扩容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
fullAddCount(long x, boolean wasUncontended)

在counterCells没有初始化,或者counterCell[index]是null(index计算与当前线程有关),或者尝试cas更新当前线程的CounterCell失败时,就进入,wasUncontended默认是true,如果cas更新当前线程的CounterCell失败就是false。

fullAddCount,主要用来记录竞争导致的basecount修改失败的这些操作,其实主要就是把这些失败的次数记录在CounterCell[]数组里面,然后在统计size时,就是basecount+CounterCell[]里面的次数

/**
     *
     * @param x 需要更新的值
     * @param wasUncontended 是否发生竞争
     */
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        // 初始化一个随机值
        // ThreadLocalRandom是JDK 7之后提供并发产生随机数,能够解决多个线程发生的竞争争夺。
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            // 为当前线程初始化一个随机值
            ThreadLocalRandom.localInit();      // force initialization
            // 获取这个值
            h = ThreadLocalRandom.getProbe();
            // 由于重新生成了probe,未冲突标志位设置为true
            wasUncontended = true;
        }

       // 冲突标志位,决定了扩容还是不扩容
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            // counterCells数组已经被初始化了
            if ((as = counterCells) != null && (n = as.length) > 0) {
                // 求在counterCells中的位置,与hash一样求%,因为counterCells数组长度是2次幂
                if ((a = as[(n - 1) & h]) == null) { // 当前位置没有CounterCell
                   //cellsBusy  = 0 为无锁 ,1为有锁
                    if (cellsBusy == 0) {// Try to attach new Cell
                        // 创建新的CounterCell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&   // cellsBusy=0还没有加锁,使用cas进行加锁,cellsBusy设置为1
                                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                    rs[j] = r; // 放进数组
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            // 操作成功,退出死循环
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                // 在调用fullAddCount之前就发生了竞争
                // 然后wasUncontended=true,未发生竞争,然后重新循环更新
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 当前位置的CounterCell不为空,进行累加
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                // 数组被扩容了
                // 数组大于了cpu数量
                // 设置冲突标志, collide = false,防止扩容
                else if (counterCells != as || n >= NCPU)
                    collide = false;
                // 设置冲突标志,重新执行循环
                // 如果下次循环执行到该分支,并且冲突标志仍然为true
                // 那么会跳过该分支,到下一个分支进行扩容// At max size or stale

                // 这个位置决定了扩容还是不扩容 false就不扩容,true就扩容
                else if (!collide)
                    collide = true;
                // 扩容,CAS设置cellsBusy值
                else if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            // 容量扩大一倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 为当前线程重新计算probe
                h = ThreadLocalRandom.advanceProbe(h);
            }
            // 证明数组为空
            // 获取锁,初始化数组
            else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) { // 没有被其它线程初始化
                        // 初始化,默认长度2
                        CounterCell[] rs = new CounterCell[2];
                        // 创建新的CounterCell(x),位置为rs[h&(2-1)]
                        rs[h & 1] = new CounterCell(x);
                        // 赋值给成员变量counterCells
                        counterCells = rs;
                        // 初始化成功
                        init = true;
                    }
                } finally {
                    // 释放锁
                    cellsBusy = 0;
                }
                if (init)
                    // 结束循环
                    break;
            }
            // 证明在CounterCell上也存在竞争,那么尝试对baseCount进行更新
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

协助扩容helpTransfer

/**
     * 帮助扩容
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        // 原table不等于空,当前节点必须是fwd节点,nextTab已经初始化
        if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            // 其实就是去tab.length二进制最高位前面有多少个0,然后 | 1 << 15
            int rs = resizeStamp(tab.length);
            // nextTab和成员变量一样,table也一样,sizeCtl<0,表示在扩容
            while (nextTab == nextTable && table == tab &&
                    (sc = sizeCtl) < 0) {
                // sc >>> RESIZE_STAMP_SHIFT) != rs 说明扩容完毕或者有其它协助扩容者
                // sc == rs + 1 表示只剩下最后一个扩容线程了,其它都扩容完毕了
                // transferIndex <= 0 扩容结束了
                // sc == rs + MAX_RESIZERS 到达最大值
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // 当前线程参加扩容,sc+1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

transfer()真正扩容

1)、看stride这个参数其实就是算每个线程处理的数量,和CPU有关,最小是16.
2)、初始化一个原来二倍的新table就是 nextTable,然后这个过程可能会出错,n<<1可能为负数,设置nextTable和transferIndex,其中transferIndex就是原table的长度。
3)、初始化一个ForwardingNode节点在后面会用到。
4)、死循环for,这个循环就是为每个线程分配任务,然后每个线程处理各自的任务,倒叙分配,举个例子,加入table.length=32,现在的stride为16,第一个线程其实就是32到16(不包含32,因为是索引),第二个线程就是0-15,参考这一段代码((U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0)))),然后遍历每个段,处理节点,知道处理完成,具体逻辑参考代码注释。

/**
     * 扩容方法
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        // n >>> 3(也就是除以8) / cpu个数,每个cpu的每个线程负责的迁移的数量
        // 这样的目的是为了每个cpu处理的桶一样多,避免出现任务转移不均匀的现象,如果桶少的话,默认一个cpu(一个线程)处理16个桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 扩容table 没有初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //  初始化原来的length两倍的table
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // 初始化失败,使用integer的最大值
                sizeCtl = Integer.MAX_VALUE;
                return; // 结束
            }
            // 更新成员变量
            nextTable = nextTab;
            // 更新转移下标,就是运来的table的length
            transferIndex = n;
        }
        // 新table的length
        int nextn = nextTab.length;
        // 创建一个fwd节点,用于占位.当别的节点发现这个槽位中有fwd节点时,则跳过这个节点
        // 它的hash为MOVED
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 首次推进为true,如果为true说明需要再次推进一个目标(i--),反之如果是false,那么就不能推进下标,需要将当前的下标处理完毕
        boolean advance = true;
        // 完成状态,如果为true,就结束方法
        boolean finishing = false; // to ensure sweep before committing nextTab
        // 死循环,因为是倒着遍历,所以i是点前线程的最大位置(i---),bound是边界,也就是区间里面的最小值
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            // 如果当前线程可以向后推进,这个循环就是控制i递减.同时每个线程都会进入这里取得自己需要转移的桶的下标区间
            // 1. true
            while (advance) {
                int nextIndex, nextBound;
                // 1. -1 >= 0,false
                if (--i >= bound || finishing)
                    advance = false;
                    //transferIndex <= 0 说明已经没有需要迁移的桶了
                // 1.nextIndex = 16 <= 0
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //更新 transferIndex
                //为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
                // 1.16 > 16 ? 16 -16 : 0 区间 16 到0
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound; // 0
                    i = nextIndex - 1;// 15
                    advance = false;
                }
            }
            // i = 15 nextn = 32

            // i < 0 ,表示数据迁移已经完成
            // i >= n 和 i + n >= nextn 表示最后一个线程也执行完成了,扩容完成了
            //  第二个if里面的i=n
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) { // 完成扩容
                    nextTable = null; // 删除成员变量
                    table = nextTab; // 更新table
                    sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 表示一个线程退出扩容
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 说明还有其他线程正在扩容
                        return; // 当前线程结束
                    // 当前线程为最后一个线程,负责在检查一个整个队列
                    finishing = advance = true; //
                    i = n; // recheck before commit
                }
            }
            // 待迁移桶为null,用cas把当前节点设置为ForwardingNode节点,表示已经处理
            else if ((f = tabAt(tab, i)) == null) //  第一个线程 获取i处的数据为null,
                advance = casTabAt(tab, i, null, fwd);// 设置当前节点为 fwd 节点
            else if ((fh = f.hash) == MOVED) // 如果当前节点为 MOVED,说明已经处理过了,直接跳过
                advance = true; // already processed
            else {
                // 节点不为空,锁住i位置的头结点
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) { //  表示是链表
                            int runBit = fh & n; // fn表示f.hash & n ,表示获取原来table的位置
                            Node<K,V> lastRun = f; // lastRun = f
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n; // 获取节点的位置
                                if (b != runBit) { // 如果这个节点和上一个节点的位置不一样,记录节点和位置
                                    runBit = b; // 当前节点的位置
                                    lastRun = p; // 当前节点
                                }
                            }
                            // 不管runBit有没有发生变化,只可能是0或者n,
                            // ln表示的不变化的节点
                            // hn表示的是变化节点的位置
                            if (runBit == 0) { // 如果是0,那么ln=lastRun就是位置没有变的这条链 hn=null变化链需要遍历重组
                                ln = lastRun;
                                hn = null;
                            }
                            else { // 如果当前节点不是0,hn=lastRun这个变化链,ln=null没有变化的链需要遍历重组
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            // 原来位置
                            setTabAt(nextTab, i, ln);
                            // 变化位置
                            setTabAt(nextTab, i + n, hn);
                           // 原来table的位置设置fwd节点,表示扩容
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

get

// get方法
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 获取hash值
        int h = spread(key.hashCode());
        // table不为null,table已经初始化,通过hash查找的node不为nul
        if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) { // hash相等
                if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 找到了相同的key
                    return e.val; // 返回当前e的value
            }
            else if (eh < 0) // hash小于0,说明是特殊节点(TreeBin或ForwardingNode)调用find
                return (p = e.find(h, key)) != null ? p.val : null;
            // 不是上面的情况,那就是链表了,遍历链表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

size

获取concurrentHashMap的大小,就是basecount + CounterCell[]中各个item的value。

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                        (int)n);
    }
        
        final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

参考:
并发编程之ConcurrentHashMap源码解读-1.8
ConcurrentHashMap 源码浅析 1.8

相关文章

网友评论

    本文标题:ConcurrentHashMap 1.8

    本文链接:https://www.haomeiwen.com/subject/pglnhrtx.html