美文网首页
java集合源码分析(五)ConcurrentHashMap

java集合源码分析(五)ConcurrentHashMap

作者: 爱情小傻蛋 | 来源:发表于2019-08-29 19:25 被阅读0次

    ConcurrentHashMap顾名思义就是同步的HashMap,也就是线程安全的HashMap,所以本篇介绍的ConcurrentHashMap和HashMap有着很重要的关系,本篇以JDK1.8版本的源码进行分析,最后在介绍完ConcurrentHashMap之后会对ConcurrentHashMap、Hashtable和HashMap做一个比较和总结。

    ConcurrentHashMap

    我们先看一下ConcurrentHashMap实现了哪些接口、继承了哪些类,对ConcurrentHashMap有一个整体认知。

    image

    ConcurrentHashMap继承AbstractMap接口,这个和HashMap一样,然后实现了ConcurrentMap接口,这个和HashMap不一样,HashMap是直接实现的Map接口。 再细看ConcurrentHashMap的结构,这里列举几个重要的成员变量tablenextTablebaseCountsizeCtltransferIndexcellsBusy

    • table:数据类型是Node数组,这里的Node和HashMap的Node一样都是内部类且实现了Map.Entry接口
    • nextTable:哈希表扩容时生成的数据,数组为扩容前的2倍
    • sizeCtl:多个线程的共享变量,是操作的控制标识符,它的作用不仅包括threshold的作用,在不同的地方有不同的值也有不同的用途
      • -1代表正在初始化
      • -N代表有N-1个线程正在进行扩容操作
      • 0代表hash表还没有被初始化
      • 正数表示下一次进行扩容的容量大小
    • ForwardingNode:一个特殊的Node节点,Hash地址为-1,存储着nextTable的引用,只有table发生扩用的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者已被移动
    image

    ConcurrentHashMapHashMap一样都是采用拉链法处理哈希冲突,且都为了防止单链表过长影响查询效率,所以当链表长度超过某一个值时候将用红黑树代替链表进行存储,采用了数组+链表+红黑树的结构

    image

    所以从结构上看HashMapConcurrentHashMap还是很相似的,只是ConcurrentHashMap在某些操作上采用了CAS + synchronized来保证并发情况下的安全。 说到ConcurrentHashMap处理并发情况下的线程安全问题,这不得不提到Hashtable,因为Hashtable也是线程安全的,那ConcurrentHashMapHashtable有什么区别或者有什么高明之处嘛?以至于官方都推荐使用ConcurrentHashMap来代替Hashtable

    • 线程安全的实现Hashtable采用对象锁(synchronized修饰对象方法)来保证线程安全,也就是一个Hashtable对象只有一把锁,如果线程1拿了对象A的锁进行有synchronized修饰的put方法,其他线程是无法操作对象A中有synchronized修饰的方法的(如get方法、remove方法等),竞争激烈所以效率低下。而ConcurrentHashMap采用CAS + synchronized来保证并发安全性,且synchronized关键字不是用在方法上而是用在了具体的对象上,实现了更小粒度的锁,等会源码分析的时候在细说这个SUN大师们的鬼斧神工
    • 数据结构的实现:Hashtable采用的是数组 + 链表,当链表过长会影响查询效率,而ConcurrentHashMap采用数组 + 链表 + 红黑树,当链表长度超过某一个值,则将链表转成红黑树,提高查询效率。

    构造函数

    ConcurrentHashMap的构造函数有5个,从数量上看就和HashMapHashtable(4个)的不同,多出的那个构造函数是public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel),即除了传入容量大小、负载因子之外还多传入了一个整型的concurrencyLevel,这个整型是我们预先估计的并发量,比如我们估计并发是30,那么就可以传入30

    其他的4个构造函数的参数和HashMap的一样,而具体的初始化过程却又不相同,HashMapHashtable传入的容量大小和负载因子都是为了计算出初始阈值(threshold),而ConcurrentHashMap传入的容量大小和负载因子是为了计算出sizeCtl用于初始化table,这个sizeCtl即table数组的大小,不同的构造函数计算sizeCtl方法都不一样。

    6和HashMap的一样,默认sizeCtl为0

    //无参构造函数,什么也不做,table的初始化放在了第一次插入数据时,默认容量大小是1
    public ConcurrentHashMap() {
    }
    
    //传入容量大小的构造函数。
    public ConcurrentHashMap(int initialCapacity) {
        //如果传入的容量大小小于0 则抛出异常。
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        //如果传入的容量大小大于允许的最大容量值 则cap取允许的容量最大值 否则cap =
        //((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
        //即如果传入的容量大小是12 则 cap = 32(12 + (12 >>> 1) + 1=19
        //向上取2的幂次方即32),这里为啥一定要是2的幂次方,原因和HashMap的threshold一样,都是为
        //了让位运算和取模运算的结果一样。
        //MAXIMUM_CAPACITY即允许的最大容量值 为2^30。
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   //tableSizeFor这个函数即实现了将一个整数取2的幂次方。
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        //将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小。
        this.sizeCtl = cap;
    }
    
    //包含指定Map的构造函数。
    //置sizeCtl为默认容量大小 即16。
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    
    //传入容量大小和负载因子的构造函数。
    //默认并发数大小是1。
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    
    //传入容量大小、负载因子和并发数大小的构造函数
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //如果传入的容量大小 小于 传入的并发数大小,
        //则容量大小取并发数大小,这样做的原因是确保每一个Node只会分配给一个线程,而一个线程则
        //可以分配到多个Node,比如当容量大小为64,并发数大
        //小为16时,则每个线程分配到4个Node。
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        //size = 1.0 + (long)initialCapacity / loadFactor 这里计算方法和上面的构造函数不一样。
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        //如果size大于允许的最大容量值则 sizeCtl = 允许的最大容量值 否则 sizeCtl =
        //size取2的幂次方。
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    

    put方法

    1. 判断键值是否为null,为null抛出异常。
    2. 调用spread()方法计算key的hashCode()获得哈希地址,这个HashMap相似。
    3. 如果当前table为空,则初始化table,需要注意的是这里并没有加synchronized,也就是允许多个线程去尝试初始化table,但是在初始化函数里面使用了CAS保证只有一个线程去执行初始化过程。
    4. 使用 容量大小-1 & 哈希地址 计算出待插入键值的下标,如果该下标上的bucket为null,则直接调用实现CAS原子性操作的casTabAt()方法将节点插入到table中,如果插入成功则完成put操作,结束返回。插入失败(被别的线程抢先插入了)则继续往下执行。
    5. 如果该下标上的节点(头节点)的哈希地址为-1,代表需要扩容,该线程执行helpTransfer()方法协助扩容。
    6. 如果该下标上的bucket不为空,且又不需要扩容,则进入到bucket中,同时锁住这个bucket,注意只是锁住该下标上的bucket而已,其他的bucket并未加锁,其他线程仍然可以操作其他未上锁的bucket,这个就是ConcurrentHashMap为什么高效的原因之一。
    7. 进入到bucket里面,首先判断这个bucket存储的是红黑树(哈希地址小于0,原因后面分析)还是链表。
    8. 如果是链表,则遍历链表看看是否有哈希地址和键key相同的节点,有的话则根据传入的参数进行覆盖或者不覆盖,没有找到相同的节点的话则将新增的节点插入到链表尾部。如果是红黑树,则将节点插入。到这里结束加锁
    9. 最后判断该bucket上的链表长度是否大于链表转红黑树的阈值(8),大于则调用treeifyBin()方法将链表转成红黑树,以免链表过长影响效率。
    10. 调用addCount()方法,作用是将ConcurrentHashMap的键值对数量+1,还有另一个作用是检查ConcurrentHashMap是否需要扩容。
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //不允许键值为null,这点与线程安全的Hashtable保持一致,和HashMap不同。
        if (key == null || value == null) throw new NullPointerException();
        //取键key的hashCode()和HashMap、Hashtable都一样,然后再执行spread()方法计算得到哈希地
        //址,这个spread()方法和HashMap的hash()方法一样,都是将hashCode()做无符号右移16位,只不
        //过spread()加多了 &0x7fffffff,让结果为正数。
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table数组为空或者长度为0(未初始化),则调用initTable()初始化table,初始化函数
            //下面介绍。
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //调用实现了CAS原子性操作的tabAt方法
            //tabAt方法的第一个参数是Node数组的引用,第二个参数在Node数组的下标,实现的是在Nod
            //e数组中查找指定下标的Node,如果找到则返回该Node节点(链表头节点),否则返回null,
            //这里的i = (n - 1)&hash即是计算待插入的节点在table的下标,即table容量-1的结果和哈
            //希地址做与运算,和HashMap的算法一样。
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该下标上并没有节点(即链表为空),则直接调用实现了CAS原子性操作的
                //casTable()方法,
                //casTable()方法的第一个参数是Node数组的引用,第二个参数是待操作的下标,第三
                //个参数是期望值,第四个参数是待操作的Node节点,实现的是将Node数组下标为参数二
                //的节点替换成参数四的节点,如果期望值和实际值不符返回false,否则参数四的节点成
                //功替换上去,返回ture,即插入成功。注意这里:如果插入成功了则跳出for循环,插入
                //失败的话(其他线程抢先插入了),那么会执行到下面的代码。
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
            //table需要扩容,值得注意的是ConcurrentHashMap初始化和扩容不是用同一个方法,而
            //HashMap和Hashtable都是用同一个方法,当前线程会去协助扩容,扩容过程后面介绍。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //如果该下标上的节点既不是空也不是需要扩容,则表示这个链表可以插入值,将进入到链表
            //中,将新节点插入或者覆盖旧值。
            else {
                V oldVal = null;
                //通过关键字synchroized对该下标上的节点加锁(相当于锁住锁住
                //该下标上的链表),其他下标上的节点并没有加锁,所以其他线程
                //可以安全的获得其他下标上的链表进行操作,也正是因为这个所
                //以提高了ConcurrentHashMap的效率,提高了并发度。
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //如果该下标上的节点的哈希地址大于等于0,则表示这是
                        //个链表。
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历链表。
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果哈希地址、键key相同 或者 键key不为空
                                //且键key相同,则表示存在键key和待插入的键
                                //key相同,则执行更新值value的操作。
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //如果找到了链表的最后一个节点都没有找到相
                                //同键Key的,则是插入操作,将插入的键值新建
                                //个节点并且添加到链表尾部,这个和HashMap一
                                //样都是插入到尾部。
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果该下标上的节点的哈希地址小于0 且为树节点
                        //则将带插入键值新增到红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            //如果插入的结果不为null,则表示为替换
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
                            key,value)) != null){
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //判断链表的长度是否大于等于链表的阈值(8),大于则将链表转成
                //红黑树,提高效率。这点和HashMap一样。
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    get方法

    1. 调用spread()方法计算key的hashCode()获得哈希地址。
    2. 计算出键key所在的下标,算法是(n - 1) & h,如果table不为空,且下标上的bucket不为空,则到bucket中查找。
    3. 如果bucket的头节点的哈希地址小于0,则代表这个bucket存储的是红黑树,否则是链表。
    4. 到红黑树或者链表中查找,找到则返回该键key的值,找不到则返回null。
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //运用键key的hashCode()计算出哈希地址
        int h = spread(key.hashCode());
        //如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
        //则代表这个bucket存在,进入到bucket中查找,
        //其中(n - 1) & h为计算出键key相对应的数组下标的算法。
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    

    remove方法

    1. 调用spread()方法计算出键key的哈希地址。
    2. 计算出键key所在的数组下标,如果table为空或者bucket为空,则返回null
    3. 判断当前table是否正在扩容,如果在扩容则调用helpTransfer方法协助扩容。
    4. 如果table和bucket都不为空,table也不处于在扩容状态,则锁住当前bucket,对bucket进行操作。
    5. 根据bucket的头结点判断bucket是链表还是红黑树。
    6. 在链表或者红黑树中移除哈希地址、键key相同的节点。
    7. 调用addCount方法,将当前table存储的键值对数量-1。
    public V remove(Object key) {
        return replaceNode(key, null, null);
    }
    
    final V replaceNode(Object key, V value, Object cv) {
        //计算需要移除的键key的哈希地址。
        int hash = spread(key.hashCode());
        //遍历table。
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //table为空,或者键key所在的bucket为空,则跳出循环返回。
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            //如果当前table正在扩容,则调用helpTransfer方法,去协助扩容。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                //将键key所在的bucket加锁。
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        //bucket头节点的哈希地址大于等于0,为链表。
                        if (fh >= 0) {
                            validated = true;
                            //遍历链表。
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                //找到哈希地址、键key相同的节点,进行移除。
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        //如果bucket的头节点小于0,即为红黑树。
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            //找到节点,并且移除。
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                //调用addCount方法,将当前ConcurrentHashMap存储的键值对数量-1。
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }
    

    initTable初始化方法

    table的初始化主要由initTable()方法实现的,initTable()方法初始化一个合适大小的数组,然后设置sizeCtl。

    我们知道ConcurrentHashMap是线程安全的,即支持多线程的,那么一开始很多个线程同时执行put()方法,而table又没初始化,那么就会很多个线程会去执行initTable()方法尝试初始化table,而put方法和initTable方法都是没有加锁的(synchronize),那SUN的大师们是怎么保证线程安全的呢?

    通过源码可以看得出,table的初始化只能由一个线程完成,但是每个线程都可以争抢去初始化table。

    1. 判断table是否为null,即需不需要首次初始化,如果某个线程进到这个方法后,其他线程已经将table初始化好了,那么该线程结束该方法返回。
    2. 如果table为null,进入到while循环,如果sizeCtl小于0(其他线程正在对table初始化),那么该线程调用Thread.yield()挂起该线程,让出CPU时间,该线程也从运行态转成就绪态,等该线程从就绪态转成运行态的时候,别的线程已经table初始化好了,那么该线程结束while循环,结束初始化方法返回。如果从就绪态转成运行态后,table仍然为null,则继续while循环。
    3. 如果table为nullsizeCtl不小于0,则调用实现CAS原子性操作的compareAndSwap()方法将sizeCtl设置成-1,告诉别的线程我正在初始化table,这样别的线程无法对table进行初始化。如果设置成功,则再次判断table是否为空,不为空则初始化table,容量大小为默认的容量大小(16),或者为sizeCtl。其中sizeCtl的初始化是在构造函数中进行的,sizeCtl = ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方)
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //如果table为null或者长度为0, //则一直循环试图初始化table(如果某一时刻别的线程将table初始化好了,那table不为null,该//线程就结束while循环)。
        while ((tab = table) == null || tab.length == 0) {
            //如果sizeCtl小于0,
            //即有其他线程正在初始化或者扩容,执行Thread.yield()将当前线程挂起,让出CPU时间,
            //该线程从运行态转成就绪态。
            //如果该线程从就绪态转成运行态了,此时table可能已被别的线程初始化完成,table不为
            //null,该线程结束while循环。
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //如果此时sizeCtl不小于0,即没有别的线程在做table初始化和扩容操作,
            //那么该线程就会调用Unsafe的CAS操作compareAndSwapInt尝试将sizeCtl的值修改成
            //-1(sizeCtl=-1表示table正在初始化,别的线程如果也进入了initTable方法则会执行
            //Thread.yield()将它的线程挂起 让出CPU时间),
            //如果compareAndSwapInt将sizeCtl=-1设置成功 则进入if里面,否则继续while循环。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //再次确认当前table为null即还未初始化,这个判断不能少。
                    if ((tab = table) == null || tab.length == 0) {
                        //如果sc(sizeCtl)大于0,则n=sc,否则n=默认的容量大
                        小16,
                        //这里的sc=sizeCtl=0,即如果在构造函数没有指定容量
                        大小,
                        //否则使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //创建指定容量的Node数组(table)。
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //计算阈值,n - (n >>> 2) = 0.75n当ConcurrentHashMap储存的键值对数量
                        //大于这个阈值,就会发生扩容。
                        //这里的0.75相当于HashMap的默认负载因子,可以发现HashMap、Hashtable如果
                        //使用传入了负载因子的构造函数初始化的话,那么每次扩容,新阈值都是=新容
                        //量 * 负载因子,而ConcurrentHashMap不管使用的哪一种构造函数初始化,
                        //新阈值都是=新容量 * 0.75。
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    transfer扩容方法

    transfer()方法为ConcurrentHashMap扩容操作的核心方法。由于ConcurrentHashMap支持多线程扩容,而且也没有进行加锁,所以实现会变得有点儿复杂。整个扩容操作分为两步:

    1. 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
    2. 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗。
    //协助扩容方法
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        //如果当前table不为null 且 f为ForwardingNode节点 且 //新的table即nextTable存在的情况下才能协助扩容,该方法的作用是让线程参与扩容的复制。
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //更新sizeCtl的值,+1,代表新增一个线程参与扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    
    //扩容的方法
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //根据服务器CPU数量来决定每个线程负责的bucket数量,避免因为扩容的线程过多反而影响性能。
        //如果CPU数量为1,则stride=1,否则将需要迁移的bucket数量(table大小)除以CPU数量,平分给
        //各个线程,但是如果每个线程负责的bucket数量小于限制的最小是(16)的话,则强制给每个线程
        //分配16个bucket数。
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //如果nextTable还未初始化,则初始化nextTable,这个初始化和iniTable初始化一样,只能由
        //一个线程完成。
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        //分配任务和控制当前线程的任务进度,这部分是transfer()的核心逻辑,描述了如何与其他线
        //程协同工作。
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //迁移过程(对当前指向的bucket),这部分的逻辑与HashMap类似,拿旧数组的容量当做一
            //个掩码,然后与节点的hash进行与操作,可以得出该节点的新增有效位,如果新增有效位为
            //0就放入一个链表A,如果为1就放入另一个链表B,链表A在新数组中的位置不变(跟在旧数
            //组的索引一致),链表B在新数组中的位置为原索引加上旧数组容量。
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
    

    addCount、sumCount方法

    addCount()做的工作是更新table的size,也就是table存储的键值对数量,在使用put()remove()方法的时候都会在执行成功之后调用addCount()来更新table的size。对于ConcurrentHashMap来说,它到底有储存有多少个键值对,谁也不知道,因为他是支持并发的,储存的数量无时无刻都在变化着,所以说ConcurrentHashMap也只是统计一个大概的值,为了统计出这个值也是大费周章才统计出来的。

    image
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //如果计算盒子不是空,或者修改baseCount的值+x失败,则放弃对baseCount的修改。
        //这里的大概意思就是首先尝试直接修改baseCount,达到计数的目的,如果修改baseCount失败(
        //多个线程同时修改,则失败)
        //则使用CounterCell数组来达到计数的目的。
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //如果计数盒子是空的 或者随机取余一个数组为空 或者修改这个槽位的变量失败,
            //即表示出现了并发,则执行fullAddCount()方法进行死循环插入,同时返回,
            //否则代表修改这个槽位的变量成功了,继续往下执行,不进入if。
            //每个线程都会通过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell,
            //然后进行计数。ThreadLocalRandom是一个线程私有的伪随机数生成器,
            //每个线程的probe都是不同的。CounterCell数组的大小永远是一个2的n次方,初始容量
            //为2,每次扩容的新容量都是之前容量乘以二,处于性能考虑,它的最大容量上限是机器
            //的CPU数量,所以说CounterCell数组的碰撞冲突是很严重的。
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                 //并发过大,使用CAS修改CounterCell失败时候执行fullAddCount,
                fullAddCount(x, uncontended);
                return;
            }
            //如果上面对盒子的赋值成功,且check<=1,则直接返回,否则调用sumConut()方法计算
            if (check <= 1)
                return;
            s = sumCount();
        }
        //如果check>=0,则检查是否需要扩容。
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
    
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    

    size、mappingCount方法

    sizemappingCount方法都是用来统计table的size的,这两者不同的地方在size返回的是一个int类型,即可以表示size的范围是[-231,231-1],超过这个范围就返回int能表示的最大值,mappingCount返回的是一个long类型,即可以表示size的范围是[-263,263-1]。

    这两个方法都是调用的sumCount()方法实现统计。

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    
    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }
    

    HashMap、Hashtable、ConcurrentHashMap三者对比

    \ HashMap Hashtable ConcurrentHashMap
    是否线程安全
    线程安全采用的方式 采用synchronized类锁,效率低 采用CAS + synchronized,锁住的只有当前操作的bucket,不影响其他线程对其他bucket的操作,效率高
    数据结构 数组+链表+红黑树(链表长度超过8则转红黑树) 数组+链表 数组+链表+红黑树(链表长度超过8则转红黑树)
    是否允许null键值
    哈希地址算法 (key的hashCode)^(key的hashCode无符号右移16位) key的hashCode ( (key的hashCode)^(key的hashCode无符号右移16位) )&0x7fffffff
    定位算法 哈希地址&(容量大小-1) (哈希地址&0x7fffffff)%容量大小 哈希地址&(容量大小-1)
    扩容算法 当键值对数量大于阈值,则容量扩容到原来的2倍 当键值对数量大于等于阈值,则容量扩容到原来的2倍+1 当键值对数量大于等于sizeCtl,单线程创建新哈希表,多线程复制bucket到新哈希表,容量扩容到原来的2倍
    链表插入 将新节点插入到链表尾部 将新节点插入到链表头部 将新节点插入到链表尾部
    继承的类 继承abstractMap抽象类 继承Dictionary抽象类 继承abstractMap抽象类
    实现的接口 实现Map接口 实现Map接口 实现ConcurrentMap接口
    默认容量大小 16 11 16
    默认负载因子 0.75 0.75 0.75
    统计size方式 直接返回成员变量size 直接返回成员变量count 遍历CounterCell数组的值进行累加,最后加上baseCount的值即为size

    相关文章

      网友评论

          本文标题:java集合源码分析(五)ConcurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/skswectx.html