美文网首页
ConcurrentHashMap源码分析(一)

ConcurrentHashMap源码分析(一)

作者: 非典型_程序员 | 来源:发表于2019-02-17 14:18 被阅读0次

    ConcurrentHashMap在面试中也是会被经常问道的问题,所以今天就来学习一下,看看ConcurrentHashMap的线程安全是如何实现的。需要注意的是JDK1.8和1.7版本的实现是不同的,我使用的是1.8版本。其实以前自己也看过一部分源码,但是其实理解的不是很透彻,我觉得有必要再仔细的看一看。另外看源码的时候看代码注释是很重要的,可以帮你更直观的了解作者的想法和意图,甚至一些很小的需要注意事项。不过就是英文看起来比较吃力,但是这个困难必须克服才行。

    一、基本常量

    private static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量,必须为2的次幂
    private static final intDEFAULT_CAPACITY = 16;//table的默认大小
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 最大数组大小,不需要是2的次幂,在toArray和相关方法时需要
    private static finalint DEFAULT_CONCURRENCY_LEVEL = 16;//默认并发级别,好像没什么用
    private static final float LOAD_FACTOR = 0.75f;//负载因子
    static final int TREEIFY_THRESHOLD = 8; // 链表转树的阀值,即如果这个值大等于8会从链表转成树,必须大于2
    static final int UNTREEIFY_THRESHOLD = 6; //树转链表阀值,小于等于6,且小于TREEIFY_THRESHOLD。仅在扩容tranfer时才可能树转链表。
    static final int MIN_TREEIFY_CAPACITY = 64;//树化的最小容量,为避免冲突最小应为TREEIFY_THRESHOLD的4倍。
    private static final int MIN_TRANSFER_STRIDE = 16;
    private static int RESIZE_STAMP_BITS = 16;//sizeCtl中用于生成戳记的位数。在32个bit数组中最小为6
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 帮助resize的最大线程数
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //sizeCtl中记录size大小标记的偏移量
    static final int MOVED     = -1; //forwarding节点的哈希值
    static final int TREEBIN   = -2; //树根节点的哈希值
    static final int RESERVED  = -3; // hash for transient reservations(ReservationNode的hash值)
    static final int HASH_BITS = 0x7fffffff; //正常节点哈希值可用位数
    static final int NCPU = Runtime.getRuntime().availableProcessors(); // 可用CPU数量
    

    有些常量看了注释也还是不太懂具体说明意思,等看源码时遇到再说吧,下面看看具体属性有哪些。

    2、相关属性

    transient volatile Node<K,V>[] table;//bins数组,大小为2次幂,第一次插入时才会初始化
    
    private transient volatile Node<K,V>[] nextTable;//下一个table,仅在resize过程non-null
    
    private transient volatile long baseCount;//基本计数器值,主要在没有争用时使用。通过CAS更新。
    
    private transient volatile int sizeCtl;//table大小控制值,为负数时表示正在初始化或resize。-1表示初始化,-(N+1)表示有N个线程在执行resize。当table为null时保留初始表的大小,或者默认为0。初始化之后,保存下一个元素计数值,在该值上调整表的大小。这个值很重要。
    
    private transient volatile int transferIndex;//resize时要分割的下一个表索引(加1)。
    
    private transient volatile int cellsBusy;//resize以及创建CounterCell时使用的自旋锁(通过CAS)。
    
    private transient volatile CounterCell[] counterCells;//counter cell表,non-null时,大小是2的次幂
    // view
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;
    

    三、构造方法

    // 空构造函数,创建新的、空的map,默认初始化table大小为16
    public ConcurrentHashMap() {  }
    
    // 构造指定大小容量的map,实际大小并不是指定的容量,因为容量必须为2的次幂,因此需要执行tableSizeFor()方法计算容量。然后将计算出的容量(2的次幂)赋值给sizeCtl。
    public ConcurrentHashMap(int initialCapacity) {
         if (initialCapacity < 0)
            throw new IllegalArgumentException();
         int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
         this.sizeCtl = cap;
    }
    // 根据给定的map构造新的ConcurrentHashMap
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
         this.sizeCtl = DEFAULT_CAPACITY;
         putAll(m);
    }
    //根据制定的初始化容量、负载因子构建map,
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
          this(initialCapacity, loadFactor, 1);
    }
    //根据制定的初始化容量、负载因子和并发级别构建map,
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
            if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
                throw new IllegalArgumentException();
            if (initialCapacity < concurrencyLevel)   // Use at least as many bins
                initialCapacity = concurrencyLevel;   // as estimated threads
            long size = (long)(1.0 + (long)initialCapacity / loadFactor);
            int cap = (size >= (long)MAXIMUM_CAPACITY) ?
                MAXIMUM_CAPACITY : tableSizeFor((int)size);
            this.sizeCtl = cap;
    }
    

    可能会有人觉得好奇,感觉这些构造方法好像什么都没做,最多就是将容量赋值给sizeCtl。因为上面解释过,ConcurrentHashMap的初始化是在第一次插入时才会执行的,所以构造函数执行并不执行初始化操作,并且初始化时会使用保存在sizeCtl中的值。下面我们专门看一下初始化方法。

    四、table初始化

        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    根据上面代码可以发现,初始化时如果table为null或者长度为0时表示table还没有初始化,然后根据sizeCtl判断有没有其他线程对其进行操作,上面说过如果sizeCtl为负数,表明table正在初始化或resize,那么线程让步,将CPU资源让给其他的线程。关于Thread.yield()方法建议看下相关资料,这里不细述。代码注释也说了,失去初始化竞争条件,仅仅自旋。
    如果没有其他线程对table进行操作,那么调用unsafe的compareAndSwapInt方法,将内存中的sizeCtl更新为-1,表明正在对table进行初始化。然后再判断table是否为null或为空,是则根据原sizeCtl的值构建一个Node数组,并将其赋值给table,终止while循环,最后返回初始化table。
    到这里大概了解ConcurrentHashMap的初始化是怎么样一个过程了,那么它到底如何保证线程安全的呢?接着往下看,我们主要看下它的增删该这个几个操作。

    五、增加、修改操作

    1、PUT操作

    先上代码吧

        public V put(K key, V value) {
            return putVal(key, value, false);
        }
        /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {//
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {//没有后续节点,新建节点,并赋值给next
                                        pred.next = new Node<K,V>(hash, key,value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {// binCount == 0,表示的是链表的头结点
                        if (binCount >= TREEIFY_THRESHOLD)//链表转成树
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    

    put方法实际上调用的putVal方法,这个方法if-else分支挺多的,一点一点看吧。
    进入方法之后先进行参数校验,有空参数则抛出异常信息。接下来是计算需要添加key的hash值,关于spread()方法可以看一下注释,知道计算的是hash值就可以了。
    如果table为null或为空,则执行table的初始化方法。如果根据table的下标和当前key的hash值进行与运算,计算出当前key在table中的位置,从而获取存放的Node,如果Node为null,表示当前位置为空,那么根据key value以及key的hash值,构建一个Node,并存放到相应位置,结束循环。
    这里说下tabAt和casTabAt这两个方法,这两个方法也都是unsafe类的方法,看下代码:

    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    

    tabAt方法实际调用的是getObjectVolatile方法,这个方法获取的是内存中的最新值,volatile关键字保证每次读取都是最新的数值,所以这个方法返回的是table某个位置上最新的Node值。
    casTabAt方法调用的是compareAndSwapObject方法,其实就是CAS,即传入当前的table,以及偏移位置,期望值和更新值,这个用的地方也挺多的。
    如果根据当前要添加的key拿到对应的Node以后,该Node的hash值等于MOVED(值为-1),表明这个节点是一个ForwardingNode节点,然后调用helpTransfer方法,即帮助进行扩容。ForwardingNode节点内部保存了一 nextTable 引用,它指向一张 hash 表。在进行扩容操作时,我们需要对每个桶中的结点进行分离和转移,如果某个桶结点中所有节点都已经迁移完成了(即已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。关于helpTransfer方法后面有时间再细述吧,现在先知道可以了。
    以上条件不满足的时候,即表明要插入key value的位置是一个普通节点时,使用Java内置的synchronized关键字锁定该节点。如果返回该位置头节点的hash值大于等于0,表明这是一个普通节点。接下来就是一些比较,如果获取到节点的hash值等于要添加key的hash值,并且当前节点的key等于要添加的key,说明添加的位置就是当前节点的位置,直接更新节点的value值就行,然后结束循环。否则以添加的key、value、hash为参数新建Node节点,并将其赋值给其前一个节点的next属性,结束循环。如果返回该位置头节点的hash值小于0,如果该节点是一个treeBin实例,那么调用TreeBin的putTreeVal方法,向该树添加相应的值。binCount为添加key的链表长度,如果binCount大于TREEIFY_THRESHOLD,则将链表转成树,如果该节点的旧值不为空,说明这次的put操作是更新操作,直接返回旧值即可。否则需要继续向下执行addCount方法,这个方法通过CAS更新baseCount的值,并判断是否需要扩容,具体方法还是需要看下具体实现,这里暂时略过。

    2、remove操作

    remove操作实际调用的是replaceNode方法(replace也是调用的replaceNode),所以ConcurrentHashMap的删除和修改操作,只需要看replaceNode就行了。replaceNode方法需要三个参数,分别是原有的key、value以及要更新的value值。下面看下代码:

        final V replaceNode(Object key, V value, Object cv) {
            int hash = spread(key.hashCode());
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
                    break;
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    boolean validated = false;
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                validated = true;
                                for (Node<K,V> e = f, pred = null;;) {
                                    K ek;
                                    if (e.hash == hash &&  ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                        V ev = e.val;
                                        if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
                                            oldVal = ev;
                                            if (value != null)//修改操作
                                                e.val = value;
                                            else if (pred != null)//表示不是链表头结点
                                                pred.next = e.next;
                                            else
                                                setTabAt(tab, i, e.next);//说明是头节点,用下一个节点替代头结点
                                        }
                                        break;
                                    }
                                    pred = e;
                                    if ((e = e.next) == null)
                                        break;
                                }
                            }
                            else if (f instanceof TreeBin) {
                                validated = true;
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> r, p;
                                if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
                                    V pv = p.val;
                                    if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
                                        oldVal = pv;
                                        if (value != null)
                                            p.val = value;
                                        else if (t.removeTreeNode(p))
                                            setTabAt(tab, i, untreeify(t.first));
                                    }
                                }
                            }
                        }
                    }
                    if (validated) {
                        if (oldVal != null) {
                            if (value == null)
                                addCount(-1L, -1);
                            return oldVal;
                        }
                        break;
                    }
                }
            }
            return null;
        }
    

    同样第一步通过spread方法先计算key的hash值。如果当前map的table数组为空(null或者长度为0)或者根据当前key未找到相应的Node,返回null。反之如果找到了节点,但是该节点的hash属性等于MOVED(-1),表明这个节点是一个ForwardingNode,因此执行helpTransfer方法,基本和putVal一样。接下来同样是给节点加上锁(我看有的说是桶,我觉得一个意思吧),然后定位到链表上具体的节点,如果是更新,将更新值赋给原值即可,如果是删除则判断该结点是不是链表的头结点,不是头结点,则将当前节点的next赋值给前节点的next;是头结点则将调用setTabAt方法,用头结点的下个节点替换当前头结点。
    如果根据key查到的链表为树,则通过findTreeNode根据key以及其hash定位到对应的节点,进行更新或者删除。
    最后如果更新值为null,即说明书删除操作,执行addCount,修改baseCount的值,且不需要扩容。
    到这里关于ConcurrentHashMap的添加、修改(删除算是一种特殊情况)就看完了,但是实际上这两个方法里面又涉及到了其他一些方法,比如helpTransfer,下面看看代码:

        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                int rs = resizeStamp(tab.length);//返回扩容大小为tab.length的table的标记位
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
    

    这个方法先执行必须满足,当前ConcurrentHashMap不为空,且节点是一个ForwardingNode,并且当前节点的nextTable属性值也不为空,即表明这个节点所在的桶中的所有节点已经迁移到nextTable指向的节点数组。resizeStamp方法返回数组扩容的标记位。当table处于扩容状态时,校验数组扩容标记位的值,如果没通过校验或者table扩容已经完成了(关于这几个比较条件自己有点懵,还需要好好想想),直接退出循环。否则调用unsafe将扩容标记的值加1,然后调用transfer进行扩容操作,最后返回迁移后的Node数组。
    这里又涉及到transfer方法,这个方法有点点复杂,只能下次再看了,而且自己看源码看得还是有点头晕(感觉画个结构图,然后看的话感觉效果会更好点)。
    关于ConcurrentHashMap源码先到这里吧,感觉自己写的挺乱的。首先从它的一些属性和一些常量值开始,了解它们各自的含义,接着是构造方法和初始化方法,后面主要看了它的增加和修改节点元素的方法,了解它实现程程安全也是通过synchronized实现的,不过同步代码块需要的是节点对象锁,这样锁的粒度更细,从而有更好的并发性能。自己网上也看了一些别人写的文章,感觉不管思路也好,表达能力也好比自己好多了,推荐:
    为并发而生的 ConcurrentHashMap(Java 8)
    ConcurrentHashMap源码分析(JDK8版本)
    我觉得学习源码,首先一定要对相关数据结构有很清晰明了的认识,比方说链表、树等,再一个我觉得最好可以自己动手画一个示意图出来,看源码时候结合示意图,这样能帮助我们更直观的学习。

    相关文章

      网友评论

          本文标题:ConcurrentHashMap源码分析(一)

          本文链接:https://www.haomeiwen.com/subject/wbvbeqtx.html