美文网首页
【Java】ConcurrentHashMap/CopyOnWr

【Java】ConcurrentHashMap/CopyOnWr

作者: littlefogcat | 来源:发表于2021-02-07 16:05 被阅读0次

    ConcurrentHashMap和CopyOnWriteArrayList都是java.util.concurrent包下的类,分别解决了HashMap和ArrayList的线程安全问题。

    本文以对这两个类进行简析,并不作详尽分析。

    相关文章
    【Java】HashMap原理及相关面试题
    【Java】ArrayList、LinkedList原理及相关面试题

    本文源码版本为Java14。

    一、ConcurrentHashMap

    ConcurrentHashMap的数据结构大部分与HashMap相同,依旧是数组+链表/红黑树的结构,每一个单元称为一个。不同点在于,ConcurrentHashMap使用了TreeBin类来表示红黑树,而不是树的根节点。

    1. put

    这里大略的看一下put方法,见微知著;重要的地方我都标了注释。

        public V put(K key, V value) {
            return putVal(key, value, false);
        }
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode()); // 计算哈希值
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh; K fk; V fv;
                // 变量:f => 桶的头节点;n => 数组长度
                // i => 节点所在桶序号;fh => 桶头节点的哈希值
                // fk => 桶头节点的key;fv => 桶头节点的value
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable(); // 懒加载,使用CAS保证线程安全
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    // 当前桶为空,通过CAS创建根节点
                    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                        break;
                }
                else if ((fh = f.hash) == MOVED)
                    // 当前resize过程正在进行,则本线程进行辅助,多线程同时执行
                    tab = helpTransfer(tab, f);
                else if (onlyIfAbsent // check first node without acquiring lock
                         && fh == hash
                         && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                         && (fv = f.val) != null)
                    // 如果设置了`onlyIfAbsent`,且这个键已经存在,就什么也不做 
                    return fv;
                else {
                    V oldVal = null;
                    // 当前桶不为空,对头节点加锁执行
                    synchronized (f) {
                        // tabAt(tab, i):利用Unsafe.getReferenceAcquire实现一致性
                        if (tabAt(tab, i) == f) { // 再次确认,保证元素没有被别的线程改变
                            if (fh >= 0) { // 哈希值大于0,说明这是个链表
                                binCount = 1;
                                // 遍历链表,找到对应节点
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent) // putIfAbsent()方法
                                            e.val = value; // 修改值
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        // 没有找到对应节点,添加在末尾
                                        pred.next = new Node<K,V>(hash, key, value);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                // 这是一棵树
                                Node<K,V> p;
                                binCount = 2;
                                // 在树中寻找节点,如果没找到就添加,找到就修改值
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                            // 保留节点,占位用,只在compute和computIfAbsent方法中使用
                            else if (f instanceof ReservationNode)
                                throw new IllegalStateException("Recursive update");
                        }
                    }
                    if (binCount != 0) {
                        // 长度大于阈值,转树
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount); // 哈希表节点计数+1,使用CAS保证线程安全
            return null;
        }
    

    首先要明确的是,线程不安全的根本原因是线程共享变量的使用。
    下面来一一分析put方法的步骤。

    0. 计算哈希值
    首先计算put传入k-v对的哈希值。

    1. 接下来将开始无限循环,每次循环将进入以下分支之一

    1.1 如果table数组为空
    如果table数组为空,则使用CAS初始化table数组。
    与HashMap不同,ConcurrentHashMap中的table数组不会随着对象的初始化而初始化,而是在第一次使用的时候才进行懒加载。
    执行完这一步之后,继续下一轮循环。

    1.2 如果对应的桶为空
    如果对应的桶为空,创建对应的节点作为链表的头节点,放入这个桶中。
    这里的f = tabAt(tab, i)方法使用了Unsafe的getReferenceAcquire确保了读取数组时的一致性。
    节点入桶的操作不是通过tab[i] = node进行赋值,而是通过CAS来完成,以确保线程安全;如果CAS失败,则继续下一轮循环。

    1.3 如果当前链表正在扩容
    如果当前链表正在进行扩容(其他线程干的),那么本线程将会协助一起进行扩容。

    1.4 对应的桶不为空(else分支)
    这个步骤基本和HashMap一致,如果是链表那就遍历链表查找节点,找到了就修改、没找到就添加在尾部,长度超过阈值就转树;如果是树的话就按照红黑树的逻辑添加节点。如果修改或者添加成功了,就跳出循环。

    本步骤基本靠synchronized加锁完成线程安全,不过有两点需要注意。
    第一点,在进入同步块后,又进行了if (tabAt(tab, i) == f)判断,确保对应桶的头节点没有被其他线程更改;
    第二点,synchronized锁定的是f,也就是这个桶的头节点。也就是说,当本线程运行这个同步块代码时,其他线程如果要修改其他桶中的节点,是不受影响的,这样有助于提高效率。用网上的话说,就是降低了锁的粒度。

    最后,节点计数+1,并检查扩容。扩容的机制特别复杂,而且是多线程协同操作(每个线程在一个时刻负责一个桶)。不过扩容的基本逻辑和HashMap是一致的,参见【Java】HashMap原理及相关面试题 ,其中线程安全是使用CAS与synchronized保证的。

    2. get

    读数据和HashMap基本一致,除了获取头节点的时候使用了tabAt()来保证一致性。
    首先找到对应哈希值的头节点,然后遍历链表找到对应节点;或者按照红黑树的方式查找节点。

    而之所以get方法不需要加锁就能保证线程安全,是因为Node的valnext都是用volatile修饰的,这样就保证了本线程对其修改的可见性,在遍历链表的时候,不会因为其他线程修改变了节点而导致误读。

    3. remove

    remove基本就是put的逆过程,这里就不再赘述。

    4. 小结

    • ConcurrentHashMap保证线程安全的方式在Java7及之前版本使用的分段锁Segment,在Java8之后使用Unsafe类+synchronized。
    • ConcurrentHashMap避免对table数组直接引用和修改,因为数组中的元素无法使用volatile来保证一致性。所以,采用了读使用tabAt方法,写使用casTabAtsetTabAt方法,以此保证线程安全。
        static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
        }
        static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                            Node<K,V> c, Node<K,V> v) {
            return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
        }
        static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
            U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
        }
        // ——————  Unsafe  ——————
        public final Object getReferenceAcquire(Object o, long offset) {
            return getReferenceVolatile(o, offset);
        }
        public final void putReferenceRelease(Object o, long offset, Object x) {
            putReferenceVolatile(o, offset, x);
        }
    
    • 事实上,使用tabAt+casTabAt可以在不加锁的情况下确保对数组修改的线程安全。而setTabAt总是在同步代码块中调用,仅仅起到防重排的作用,这一点在ConcurrentHashMap内部的注释中也提到了。
    • ConcurrentHashMap使用synchronized锁定桶的头节点来保证修改数据时的线程安全,这一点既保证了线程安全,又提高了效率,因为其他桶是不受影响的。
    • ConcurrentHashMap的扩容可以多线程并发执行,一个线程同一时间负责一个桶的转移,锁定桶的头节点保证线程安全。

    二、CopyOnWriteArrayList

    CopyOnWriteArrayList相对来说比较简单,直接看它的getadd方法:

        private transient volatile Object[] array;
        final Object[] getArray() { return array; }
        final void setArray(Object[] a) { array = a; }
    
        public E get(int index) { return elementAt(getArray(), index); }
        static <E> E elementAt(Object[] a, int index) { return (E) a[index]; }
    
        public boolean add(E e) {
            synchronized (lock) {
                Object[] es = getArray();
                int len = es.length;
                es = Arrays.copyOf(es, len + 1);
                es[len] = e;
                setArray(es);
                return true;
            }
        }
        public void add(int index, E element) {
            synchronized (lock) {
                Object[] es = getArray();
                int len = es.length;
                if (index > len || index < 0)
                    throw new IndexOutOfBoundsException(outOfBounds(index, len));
                Object[] newElements;
                int numMoved = len - index;
                if (numMoved == 0)
                    newElements = Arrays.copyOf(es, len + 1);
                else {
                    newElements = new Object[len + 1];
                    System.arraycopy(es, 0, newElements, 0, index);
                    System.arraycopy(es, index, newElements, index + 1,
                                     numMoved);
                }
                newElements[index] = element;
                setArray(newElements);
            }
        }
    

    CopyOnWriteArrayList在读数据的时候,直接返回数组中的元素,没有做任何处理;
    在写数据的时候,会复制当前数组,在对应位置添加元素,然后将内部数组array设置为复制后的新数组,整个写的过程是加锁的。

    可以看出,CopyOnWriteArrayList添加元素的效率是极低的,但是获取元素的效率很高。同时,由于读取数据的时候是直接返回的array数组中的数据,所以ConcurrentModificationException不能保证读数据时的一致性。
    所以,CopyOnWriteArrayList适用范围是:并发线程数高,读数据的次数高,且极少进行修改的情况。如果这个List中的内容经常修改的情况,CopyOnWriteArrayList就不适用了。

    再有一点就是,CopyOnWriteArrayList在迭代遍历的过程中是可以进行修改的,不会抛出ConcurrentModificationException异常。

    相关文章

      网友评论

          本文标题:【Java】ConcurrentHashMap/CopyOnWr

          本文链接:https://www.haomeiwen.com/subject/ooxgtltx.html