美文网首页
【Java】ConcurrentHashMap/CopyOnWr

【Java】ConcurrentHashMap/CopyOnWr

作者: littlefogcat | 来源:发表于2021-02-07 16:05 被阅读0次

ConcurrentHashMap和CopyOnWriteArrayList都是java.util.concurrent包下的类,分别解决了HashMap和ArrayList的线程安全问题。

本文以对这两个类进行简析,并不作详尽分析。

相关文章
【Java】HashMap原理及相关面试题
【Java】ArrayList、LinkedList原理及相关面试题

本文源码版本为Java14。

一、ConcurrentHashMap

ConcurrentHashMap的数据结构大部分与HashMap相同,依旧是数组+链表/红黑树的结构,每一个单元称为一个。不同点在于,ConcurrentHashMap使用了TreeBin类来表示红黑树,而不是树的根节点。

1. put

这里大略的看一下put方法,见微知著;重要的地方我都标了注释。

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode()); // 计算哈希值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            // 变量:f => 桶的头节点;n => 数组长度
            // i => 节点所在桶序号;fh => 桶头节点的哈希值
            // fk => 桶头节点的key;fv => 桶头节点的value
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); // 懒加载,使用CAS保证线程安全
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 当前桶为空,通过CAS创建根节点
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;
            }
            else if ((fh = f.hash) == MOVED)
                // 当前resize过程正在进行,则本线程进行辅助,多线程同时执行
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                // 如果设置了`onlyIfAbsent`,且这个键已经存在,就什么也不做 
                return fv;
            else {
                V oldVal = null;
                // 当前桶不为空,对头节点加锁执行
                synchronized (f) {
                    // tabAt(tab, i):利用Unsafe.getReferenceAcquire实现一致性
                    if (tabAt(tab, i) == f) { // 再次确认,保证元素没有被别的线程改变
                        if (fh >= 0) { // 哈希值大于0,说明这是个链表
                            binCount = 1;
                            // 遍历链表,找到对应节点
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent) // putIfAbsent()方法
                                        e.val = value; // 修改值
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    // 没有找到对应节点,添加在末尾
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 这是一棵树
                            Node<K,V> p;
                            binCount = 2;
                            // 在树中寻找节点,如果没找到就添加,找到就修改值
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        // 保留节点,占位用,只在compute和computIfAbsent方法中使用
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    // 长度大于阈值,转树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); // 哈希表节点计数+1,使用CAS保证线程安全
        return null;
    }

首先要明确的是,线程不安全的根本原因是线程共享变量的使用。
下面来一一分析put方法的步骤。

0. 计算哈希值
首先计算put传入k-v对的哈希值。

1. 接下来将开始无限循环,每次循环将进入以下分支之一

1.1 如果table数组为空
如果table数组为空,则使用CAS初始化table数组。
与HashMap不同,ConcurrentHashMap中的table数组不会随着对象的初始化而初始化,而是在第一次使用的时候才进行懒加载。
执行完这一步之后,继续下一轮循环。

1.2 如果对应的桶为空
如果对应的桶为空,创建对应的节点作为链表的头节点,放入这个桶中。
这里的f = tabAt(tab, i)方法使用了Unsafe的getReferenceAcquire确保了读取数组时的一致性。
节点入桶的操作不是通过tab[i] = node进行赋值,而是通过CAS来完成,以确保线程安全;如果CAS失败,则继续下一轮循环。

1.3 如果当前链表正在扩容
如果当前链表正在进行扩容(其他线程干的),那么本线程将会协助一起进行扩容。

1.4 对应的桶不为空(else分支)
这个步骤基本和HashMap一致,如果是链表那就遍历链表查找节点,找到了就修改、没找到就添加在尾部,长度超过阈值就转树;如果是树的话就按照红黑树的逻辑添加节点。如果修改或者添加成功了,就跳出循环。

本步骤基本靠synchronized加锁完成线程安全,不过有两点需要注意。
第一点,在进入同步块后,又进行了if (tabAt(tab, i) == f)判断,确保对应桶的头节点没有被其他线程更改;
第二点,synchronized锁定的是f,也就是这个桶的头节点。也就是说,当本线程运行这个同步块代码时,其他线程如果要修改其他桶中的节点,是不受影响的,这样有助于提高效率。用网上的话说,就是降低了锁的粒度。

最后,节点计数+1,并检查扩容。扩容的机制特别复杂,而且是多线程协同操作(每个线程在一个时刻负责一个桶)。不过扩容的基本逻辑和HashMap是一致的,参见【Java】HashMap原理及相关面试题 ,其中线程安全是使用CAS与synchronized保证的。

2. get

读数据和HashMap基本一致,除了获取头节点的时候使用了tabAt()来保证一致性。
首先找到对应哈希值的头节点,然后遍历链表找到对应节点;或者按照红黑树的方式查找节点。

而之所以get方法不需要加锁就能保证线程安全,是因为Node的valnext都是用volatile修饰的,这样就保证了本线程对其修改的可见性,在遍历链表的时候,不会因为其他线程修改变了节点而导致误读。

3. remove

remove基本就是put的逆过程,这里就不再赘述。

4. 小结

  • ConcurrentHashMap保证线程安全的方式在Java7及之前版本使用的分段锁Segment,在Java8之后使用Unsafe类+synchronized。
  • ConcurrentHashMap避免对table数组直接引用和修改,因为数组中的元素无法使用volatile来保证一致性。所以,采用了读使用tabAt方法,写使用casTabAtsetTabAt方法,以此保证线程安全。
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }
    // ——————  Unsafe  ——————
    public final Object getReferenceAcquire(Object o, long offset) {
        return getReferenceVolatile(o, offset);
    }
    public final void putReferenceRelease(Object o, long offset, Object x) {
        putReferenceVolatile(o, offset, x);
    }
  • 事实上,使用tabAt+casTabAt可以在不加锁的情况下确保对数组修改的线程安全。而setTabAt总是在同步代码块中调用,仅仅起到防重排的作用,这一点在ConcurrentHashMap内部的注释中也提到了。
  • ConcurrentHashMap使用synchronized锁定桶的头节点来保证修改数据时的线程安全,这一点既保证了线程安全,又提高了效率,因为其他桶是不受影响的。
  • ConcurrentHashMap的扩容可以多线程并发执行,一个线程同一时间负责一个桶的转移,锁定桶的头节点保证线程安全。

二、CopyOnWriteArrayList

CopyOnWriteArrayList相对来说比较简单,直接看它的getadd方法:

    private transient volatile Object[] array;
    final Object[] getArray() { return array; }
    final void setArray(Object[] a) { array = a; }

    public E get(int index) { return elementAt(getArray(), index); }
    static <E> E elementAt(Object[] a, int index) { return (E) a[index]; }

    public boolean add(E e) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            es = Arrays.copyOf(es, len + 1);
            es[len] = e;
            setArray(es);
            return true;
        }
    }
    public void add(int index, E element) {
        synchronized (lock) {
            Object[] es = getArray();
            int len = es.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException(outOfBounds(index, len));
            Object[] newElements;
            int numMoved = len - index;
            if (numMoved == 0)
                newElements = Arrays.copyOf(es, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(es, 0, newElements, 0, index);
                System.arraycopy(es, index, newElements, index + 1,
                                 numMoved);
            }
            newElements[index] = element;
            setArray(newElements);
        }
    }

CopyOnWriteArrayList在读数据的时候,直接返回数组中的元素,没有做任何处理;
在写数据的时候,会复制当前数组,在对应位置添加元素,然后将内部数组array设置为复制后的新数组,整个写的过程是加锁的。

可以看出,CopyOnWriteArrayList添加元素的效率是极低的,但是获取元素的效率很高。同时,由于读取数据的时候是直接返回的array数组中的数据,所以ConcurrentModificationException不能保证读数据时的一致性。
所以,CopyOnWriteArrayList适用范围是:并发线程数高,读数据的次数高,且极少进行修改的情况。如果这个List中的内容经常修改的情况,CopyOnWriteArrayList就不适用了。

再有一点就是,CopyOnWriteArrayList在迭代遍历的过程中是可以进行修改的,不会抛出ConcurrentModificationException异常。

相关文章

网友评论

      本文标题:【Java】ConcurrentHashMap/CopyOnWr

      本文链接:https://www.haomeiwen.com/subject/ooxgtltx.html