美文网首页Java Concurrency面试精选Java技术升华
Java并发编程之并发容器 CopyOnWrite,Concur

Java并发编程之并发容器 CopyOnWrite,Concur

作者: 干天慈雨 | 来源:发表于2021-07-16 11:35 被阅读0次

    前言

    JUC 高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程(Lock Free)算法主要通过 CAS(Compare And Swap)+volatile 组合实现,通过 CAS 保障操作的原子性,通过volatile 保障变量的内存的可见性。无锁编程(Lock Free)算法的主要优点:
    (1)开销较小:不需要在内核态和用户态之间切换进程。
    (2)读写不互斥:只有写操作需要使用基于 CAS 机制的乐观锁,读读操作之间可以不用互斥。

    1.高并发容器分类

    JUC 包中提供了 List、Set、Queue、Map 各种类型的高并发容器,如 ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。
    在性能上,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步
    的 TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList 优于同步的ArrayList。

    1.1 List

    JUC 包中高并发 List 主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。
    CopyOnWriteArrayList 相当于线程安全的 ArrayList,它实现了 List 接口。在读多写少的场景中,其性能远远高于 ArrayList 的同步包装容器。

    1.2 Set

    JUC 包中 Set 主要有 CopyOnWriteArraySet、ConcurrentSkipListSet。

    • CopyOnWriteArraySet 继承于 AbstractSet 类,对应的基础容器为 HashSet。其内部组合了一个 CopyOnWriteArrayList 对象,它是核心操作是基于 CopyOnWriteArrayList 实现的。
    • ConcurrentSkipListSet 是线程安全的有序的集合,对应的基础容器为 TreeSet。它继承于
      AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap
      实现的。

    1.3 Map

    JUC 包中 Map 主要有 ConcurrentHashMap、ConcurrentSkipListMap。

    • ConcurrentHashMap 对应的基础容器为 HashMap。JDK7 中 ConcurrentHashMap 采用一种更加细粒度的“分段锁(Segment)”加锁机制,JDK8 中采用 CAS 无锁算法。
    • ConcurrentSkipListMap 对应的基础容器为 TreeMap。其内部的 Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。

    1.4 Queue

    JUC 包中 Queue 的实现类包括三类:单向队列、双向队列、阻塞队列。

    • ConcurrentLinkedQueue 是一个基于列表实现的单向队列,按照 FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。
    • ConcurrentLinkedDeque 是基于链表的双向队列,但是该队列不允许 null 元素。作为双端队列,ConcurrentLinkedDeque 可以当作“栈”来使用,并且高效地支持并发环境。
      除了提供普通的单向、双向队列,JUC 拓展了 Queue,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:
    • ArrayBlockingQueue:基于数组实现的可阻塞的 FIFO 队列
    • LinkedBlockingQueue:基于链表实现的可阻塞的 FIFO 队列
    • PriorityBlockingQueue:按优先级排序的队列
    • DelayQueue:按照元素的 delay 时间进行排序的队列
    • SynchronousQueue:无缓冲等待队列

    2. CopyOnWriteArrayList分析

    在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此如果每次读取都进行加锁操作,其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读操作是线程安全的。
    写时复制(CopyOnWrite,简称 COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个 Accessor(访问器)访问一个资源(如内存或者是磁盘上的数据存储)时,他们会共同获取相同的指针指向相同的资源,只要有一个(修改器)需要修改该资源,系统会复制一份专用 Private Copy(副本)给该 Mutator,而其他 Accessor 所见到的最初的资源仍然保持不变,修改的过程对其他的 Accessor 都是透明的(transparently)。COW 主要的优点是如果没有修改器(mutator)去修改资源,就不会有副本被创建,因此多个 Accessor 可以共享同一份资源。

    2.1 CopyOnWriteArrayList 的使用

    在不使用CopyOnWriteArrayList 的情况下代码如下:

    public class WithoutCopyOnWriteArrayListTest {
    
        public static class ConcurrentTarget implements Runnable {
            //并发操作的目标队列
            List<String> targetList = null;
            public ConcurrentTarget(List<String> targetList) {
                this.targetList = targetList;
            }
    
            @Override
            public void run() {
                Iterator<String> iterator = targetList.iterator();
                //迭代操作
                while (iterator.hasNext()) {
                    // 在迭代操作时,进行列表的修改
                    String threadName = Thread.currentThread().getName();
                    System.out.println("开始往同步队列加入线程名称:" + threadName);
                    targetList.add(threadName);
                }
            }
    
            //测试同步队列:在迭代操作时,进行列表的修改
            public static void main(String[] args) {
                List<String> notSafeList = Arrays.asList("a", "b", "c");
                List<String> synList = Collections.synchronizedList(notSafeList);
                //创建一个执行目标
                ConcurrentTarget synchronizedListListDemo =
                        new ConcurrentTarget(synList);
                //10 个线程并发
                for (int i = 0; i < 10; i++) {
                    new Thread(synchronizedListListDemo , "线程" + i).start();
                }
                //主线程等待
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    运行代码会报如下错误:

    java.lang.UnsupportedOperationException
        at java.util.AbstractList.add(AbstractList.java:148)
        at java.util.AbstractList.add(AbstractList.java:108)
        at java.util.Collections$SynchronizedCollection.add(Collections.java:2035)
        at com.ymj.study.code10_juc_container.CopyOnWriteArrayListTest$ConcurrentTarget.run(CopyOnWriteArrayListTest.java:33)
        at java.lang.Thread.run(Thread.java:748)
    

    这个时候可使用 CopyOnWriteArrayList 替代 Collections.synchronizedList同步包装实例,具体的代码如下:

    public class CopyOnWriteArrayListTest {
        public static class ConcurrentTarget implements Runnable {
            //并发操作的目标队列
            List<String> targetList = null;
    
            public ConcurrentTarget(List<String> targetList) {
                this.targetList = targetList;
            }
    
            @Override
            public void run() {
                Iterator<String> iterator = targetList.iterator();
                //迭代操作
                while (iterator.hasNext()) {
                    // 在迭代操作时,进行列表的修改
                    String threadName = Thread.currentThread().getName();
                    System.out.println("开始往同步队列加入线程名称:" + threadName);
                    targetList.add(threadName);
                }
            }
        }
    
        public static void main(String[] args) {
            List<String> notSafeList = Arrays.asList("a", "b", "c");
            //创建一个 CopyOnWriteArrayList 队列
            List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();
            copyOnWriteArrayList.addAll(notSafeList);
    
            //并发执行目标
            ConcurrentTarget copyOnWriteArrayListDemo =
                    new ConcurrentTarget(copyOnWriteArrayList);
            for (int i = 0; i < 10; i++) {
                new Thread(copyOnWriteArrayListDemo, "线程" + i).start();
            }
            //主线程等待
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    运行之后发现UnsupportedOperationException 异常没有了。也就是说,使用CopyOnWriteArrayList 容器,可以在进行元素迭代的同时,又要进行元素添加操作。

    2.2 CopyOnWriteArrayList 原理

    所谓 CopyOnWrite(写时复制):就是在修改器(mutator)对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。
    CopyOnWriteArrayList 是写时复制思想的一种典型实现: 其含有一个指向操作内存的内部指针 array,而可变操作(add、set 等)是在 array 数组的副本上进行的。当元素需要被修改或者增加的时候,并不直接在 array 指向的原有数组上操作,而是首先对 array 进行一次拷贝,将修改的内容写入拷贝副本中。写完之后,再将内部指针 array 指向新的副本,这样就可以确保修改操作不会影响访问器(accessor)的读取操作了。CopyOnWriteArrayList 的原理,如图所示:


    CopyOnWriteArrayList 的原理

    CopyOnWriteArrayList 核心成员如下:

    public class CopyOnWriteArrayList<E>
            implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
        private static final long serialVersionUID = 8673264195747942595L;
    
        /** The lock protecting all mutators */
        /**
         * 对所有的修改器(mutator)方法进行保护,访问器(accessor)方法并不需要保护
         */
        final transient ReentrantLock lock = new ReentrantLock();
    
        /** The array, accessed only via getArray/setArray. */
        /**
         * 内部对象数组,通过 getArray/setArray 方法去访问
         */
        private transient volatile Object[] array;
    
        /**
         * Gets the array.  Non-private so as to also be accessible
         * from CopyOnWriteArraySet class.
         */
        /**
         * 获取内部对象数组
         */
        final Object[] getArray() {
            return array;
        }
    
        /**
         * Sets the array.
         */
        /**
         * 设置内部对象数组
         */
        final void setArray(Object[] a) {
            array = a;
        }
    }
    

    2.3 CopyOnWriteArrayList 读取操作

    访问器(accessor)的读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

       /** 操作内存的引用*/
        private transient volatile Object[] array;
        public E get(int index) {
            return get(getArray(), index);
        }
        //获取元素
        @SuppressWarnings("unchecked")
        private E get(Object[] a, int index) {
            return (E) a[index];
        }
        //返回操作内存
        final Object[] getArray() {
            return array;
        }
    

    2.4 CopyOnWriteArrayList 写入操作

    CopyOnWriteArrayList 的写入操作 add( )方法在执行的时候加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会 copy 出多个副本。

       /**
         * Appends the specified element to the end of this list.
         *
         * @param e element to be appended to this list
         * @return {@code true} (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            // 加锁
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                // 拷贝新数组
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    
    

    从 add 操作可以看出,在每次进行添加操作的时候,CopyOnWriteArrayList 底层都是重新 copy了一份数组,再往新的数组中添加数组,待添加完了,再将新的 array 引用指向新的数组。当 add操作完成后,array 的引用就已经指向另一个存储空间了。
    那么既然每次添加元素的时候,都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList 并不适合做添加操作。但是如果在并发场景下,迭代操作比较频繁,那CopyOnWriteArrayList 是个不错的选择。

    2.5 CopyOnWriteArrayList 的迭代器实现

    CopyOnWriteArray 有自己的迭代器,该迭代器不会检查修改状态,也无需检查状态。为什么呢?因为被迭代的 array 数组是可以说是只读的,不会有其他线程能够修改它。

       static final class COWIterator<E> implements ListIterator<E> {
            /** Snapshot of the array */
            /**对象数组的快照(snapshot)*/
            private final Object[] snapshot;
            /** Index of element to be returned by subsequent call to next.  */
            private int cursor;
    
            private COWIterator(Object[] elements, int initialCursor) {
                cursor = initialCursor;
                snapshot = elements;
            }
    
            public boolean hasNext() {
                return cursor < snapshot.length;
            }
    
            public boolean hasPrevious() {
                return cursor > 0;
            }
    
            @SuppressWarnings("unchecked")
            //下一个元素
            public E next() {
                if (! hasNext())
                    throw new NoSuchElementException();
                return (E) snapshot[cursor++];
            }
    
            @SuppressWarnings("unchecked")
            public E previous() {
                if (! hasPrevious())
                    throw new NoSuchElementException();
                return (E) snapshot[--cursor];
            }
    
            public int nextIndex() {
                return cursor;
            }
    
            public int previousIndex() {
                return cursor-1;
            }
    
            /**
             * Not supported. Always throws UnsupportedOperationException.
             * @throws UnsupportedOperationException always; {@code remove}
             *         is not supported by this iterator.
             */
            public void remove() {
                throw new UnsupportedOperationException();
            }
    
            /**
             * Not supported. Always throws UnsupportedOperationException.
             * @throws UnsupportedOperationException always; {@code set}
             *         is not supported by this iterator.
             */
            public void set(E e) {
                throw new UnsupportedOperationException();
            }
    
            /**
             * Not supported. Always throws UnsupportedOperationException.
             * @throws UnsupportedOperationException always; {@code add}
             *         is not supported by this iterator.
             */
            public void add(E e) {
                throw new UnsupportedOperationException();
            }
    
            @Override
            public void forEachRemaining(Consumer<? super E> action) {
                Objects.requireNonNull(action);
                Object[] elements = snapshot;
                final int size = elements.length;
                for (int i = cursor; i < size; i++) {
                    @SuppressWarnings("unchecked") E e = (E) elements[i];
                    action.accept(e);
                }
                cursor = size;
            }
        }
    

    迭代器的 snapshot(快照)成员,会在构造迭代器的时候,使用 CopyOnWriteArrayList 的 array成员去初始化,具体如下:

    //获取迭代器
     public Iterator<E> iterator() {
     return new COWIterator<E>(getArray(), 0);
     }
    
     //返回操作内存
     final Object[] getArray() {
          return array;
     }
    

    2.6 CopyOnWriteArrayList总结

    CopyOnWriteArrayList 的优点
    CopyOnWriteArrayList 有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非常快。所以,CopyOnWriteArrayList 适用于读操作多、写操作相对较少的场景("读多写少"),比如可以在进行“黑名单”拦截时使用 CopyOnWriteArrayList。
    CopyOnWriteArrayList 和 ReentrantReadWriteLock 的比较
    CopyOnWriteArrayList 和 ReentrantReadWriteLock 读写锁的思想非常类似,读写锁的思想是读读共享、写写互斥、读写互斥、写读互斥。但是 CopyOnWriteArrayList 相比读写锁的又更进一步:为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升。

    3 BlockingQueue分析

    在 Java8 中,提供了 7 个阻塞队列

    阻塞队列 介绍
    ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
    LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
    PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序
    DelayQueue 优先级队列实现的无界阻塞队列
    SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
    LinkedTransferQueue 链表实现的无界阻塞队列
    LinkedBlockingDeque 链表实现的双向阻塞队列

    3.1 阻塞队列的操作方法

    在阻塞队列中,提供了四种处理方式:
    1. 插入操作

    • add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。
    • offer(e): 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true
    • put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,知道队列可用
    • offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出
      2. 移除操作
    • remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true
    • poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null
    • take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费
    • poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回

    3.2 ArrayBlockingQueue 原理分析

    3.2.1 构造方法

    ArrayBlockingQueue 提供了三个构造方法,分别如下:
    capacity: 表示数组的长度,也就是队列的长度。
    fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。

        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
      public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //重入锁,出队和入队持有这一把锁
            lock = new ReentrantLock(fair);
            //初始化非空等待队列
            notEmpty = lock.newCondition();
            //初始化非满等待队列
            notFull =  lock.newCondition();
        }
    
       public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // Lock only for visibility, not mutual exclusion
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    
    

    3.2.2 Add方法

    以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.如果看源码看得比较多的话,一般这种写法都是调用父类的模版方法来解决通用性问题

    public boolean add(E e) {
     return super.add(e);
    }
       // 从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常
        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
    3.2.2.1 offer 方法

    add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈。
    这段代码做了几个事情:

    1. 判断添加的数据是否为空
    2. 添加重入锁
    3. 判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false
    4. 否则,直接调用 enqueue 将元素添加到队列中
        public boolean offer(E e) {
            //对请求数据做判断
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == items.length)
                    return false;
                else {
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
    3.2.2.2 enqueue方法

    这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items

       /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            //通过 putIndex 对数据赋值
            items[putIndex] = x;
            // 当putIndex 等于数组长度时,将 putIndex 重置为 0
            if (++putIndex == items.length)
                putIndex = 0;
            count++; //记录队列元素的个数
            //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
            notEmpty.signal();
        }
    

    putIndex 为什么会在等于数组长度的时候重新设置为 0?
    因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了。
    下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。同时还需要思考两个问题:

    1. 当元素满了以后是无法继续添加的,因为会报错
    2. 其次,队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除


      image.png

    3.2.3 put方法

    put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下
    它的实现逻辑:

       public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            /**
             * 这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调
             *  用等待线程的 interrupt 方法来中断等待直接返回。而 lock
             *  方法是尝试获得锁成功后才响应中断
             */
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    //队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    put

    3.2.4 take方法

    take 方法是一种阻塞获取队列中元素的方法它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    //如果队列为空的情况下,直接通过 await 方法阻塞
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    take

    如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素


    image.png
    3.2.4.1 dequeue 方法

    这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端,takeIndex,是用来记录拿数据的索引值

      /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            //默认获取 0 位置的元素
            E x = (E) items[takeIndex];
            //将该位置的元素设置为空
            items[takeIndex] = null;
            //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据
            if (++takeIndex == items.length)
                takeIndex = 0;
            //记录 元素个数递减
            count--;
            if (itrs != null)
                //同时更新迭代器中的元素数据
                itrs.elementDequeued();
            //触发 因为队列满了以后导致的被阻塞的线程
            notFull.signal();
            return x;
        }
    
    
    3.2.4.2 itrs.elementDequeued();

    ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素

          /**
             * Called whenever an element has been dequeued (at takeIndex).
             */
            void elementDequeued() {
                // assert lock.getHoldCount() == 1;
                if (count == 0)
                    queueIsEmpty();
                else if (takeIndex == 0)
                    takeIndexWrapped();
            }
        }
    

    itrs.elementDequeued() 是用来更新迭代器中的元素数据的

    3.2.4 remove方法

    remove 方法是移除一个指定元素。看看它的实现代码

       public boolean remove(Object o) {
            if (o == null) return false;
            //获取数组元素
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            //获得锁
            lock.lock();
            try {
                //如果队列不为空
                if (count > 0) {
                    //获取下一个要添加元素时的索引
                    final int putIndex = this.putIndex;
                    //获取当前要被移除的元素的索引
                    int i = takeIndex;
                    do {
                        //从takeIndex 下标开始,找到要被删除的元素
                        if (o.equals(items[i])) {
                            //移除指定元素
                            removeAt(i);
                            //返回执行结果
                            return true;
                        }
                        //当前删除索引执行加 1 后判断是否与数组长度相等
                        //若为 true,说明索引已到数组尽头,将 i 设置为 0
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex); //继续查找,直到找到最后一个元素
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
    

    4 BlockingDeque分析

    BlockingDeque定义了一个阻塞的双端队列接口,如下所示

    public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { 
        void putFirst(E e) throws InterruptedException; 
        void putLast(E e) throws InterruptedException; 
        E takeFirst() throws InterruptedException; 
        E takeLast() throws InterruptedException; 
        // ... 
    }
    

    该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。
    其核心数据结构如下所示,是一个双向链表。

    public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements
            BlockingDeque<E>, java.io.Serializable {
        static final class Node<E> {
            E item; Node<E> prev; // 双向链表的Node Node<E> next;
            Node(E x) {
                item = x;
            }
        }
        transient Node<E> first; // 队列的头和尾
        transient Node<E> last;
        private transient int count; // 元素个数
        private final int capacity; // 容量
        // 一把锁+两个条件
        final ReentrantLock lock = new ReentrantLock();
        private final Condition notEmpty = lock.netCondition();
        private final Condition notFull = lock.newCondition();
        // ...
    }
    

    对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

    5 ConcurrentLinkedQueue/Deque

    AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
    首先,它是一个单向链表,定义如下:

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements
            Queue<E>, java.io.Serializable {
        private static class Node<E> {
            volatile E item;
            volatile Node<E> next;
            //...
        }
        private transient volatile Node<E> head;
        private transient volatile Node<E> tail;
        //...
    }
    

    其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部。但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。下面进行详细分析:

    5.1 初始化

    初始的时候, head 和 tail 都指向一个 null 节点。对应的代码如下。

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
    
    image.png

    5.2 入队列

    代码如下所示:

       public boolean offer(E e) {
            checkNotNull(e);
            final Node<E> newNode = new Node<E>(e);
    
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // p is last node
                    // 对tail的next指针而不是对tail指针执行CAS操作
                    if (p.casNext(null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this queue,
                        // and for newNode to become "live".
                        if (p != t) // hop two nodes at a time
                            // 每入队两个节点后移一次tail指针
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                else if (p == q)
                    // We have fallen off list.  If tail is unchanged, it
                    // will also be off-list, in which case we need to
                    // jump to head, from which all live nodes are always
                    // reachable.  Else the new tail is a better bet.
                    // 已经到达队列尾部
                    p = (t != (t = tail)) ? t : head;
                else
                    // Check for tail updates after two hops.
                    // 后移p指针
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    

    上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点。如下图所示:
    初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
    step1: p=tail,q=p.next=NULL.
    step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail方法不会执
    行,直接返回。此时tail指针没有变化。


    image.png

    之后,假设线程2要入队item3节点,如下图所示:
    step3: p=tail,q=p.next.
    step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
    step5:q=NULL,对p的next执行CAS操作,入队item3节点。
    step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。


    image.png
    总结出以下关键点:
    1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
    2. 只有当 p != tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。

    5.3 出队列

    上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?

     public E poll() {
            restartFromHead:
            for (;;) {
                for (Node<E> h = head, p = h, q;;) {
                    E item = p.item;
                    // 出队列的时候,并没有移动head指针,而是把item设置为null
                    if (item != null && p.casItem(item, null)) {
                        // Successful CAS is the linearization point
                        // for item to be removed from this queue.
                        if (p != h) // hop two nodes at a time
                            // 每产生2个null节点,才把head指针后移两位
                            updateHead(h, ((q = p.next) != null) ? q : p);
                        return item;
                    }
                    else if ((q = p.next) == null) {
                        updateHead(h, p);
                        return null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    
    

    出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3 三个节点。
    step1:p=head,q=p.next.p!=q.
    step2:后移p指针,使得p=q。
    step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
    step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作。


    image.png

    总结:

    1. 出队列的判断并非观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件。
    2. 只要对节点的item执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。

    5.4 队列判空

    因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示:

       public boolean isEmpty() {
            // 寻找第一个不是null的节点
            return first() == null;
        }
        Node<E> first() {
            restartFromHead:
            for (;;) {
                // 从head指针开始遍历,查找第一个不是null的节点
                for (Node<E> h = head, p = h, q;;) {
                    boolean hasItem = (p.item != null);
                    if (hasItem || (q = p.next) == null) {
                        updateHead(h, p);
                        return hasItem ? p : null;
                    }
                    else if (p == q)
                        continue restartFromHead;
                    else
                        p = q;
                }
            }
        }
    
    

    6. ConcurrentHashMap解析

    Java并发编程之并发容器ConcurrentHashMap详解

    7. ConcurrentSkipListMap/Set

    ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。

    7.1 ConcurrentSkipListMap

    7.1.1 为什么要使用SkipList实现Map?

    在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
    而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
    借用Doug Lea的原话:

    The reason is that there are no known efficient lock0free insertion and deletion algorithms for search trees.

    也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。
    那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。

    7.1.2 无锁链表

    之前讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
    操作1:在节点10后面插入节点20。如下图所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。

    操作1
    操作2:删除节点10。如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可。
    操作2
    但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如下图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。
    image.png
    为什么会出现这个问题呢
    原因: 在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
    针对这个问题,在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
    第一步,把节点10的next指针,mark成删除,即软删除;
    第二步,找机会,物理删除。
    做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1 个CAS操作里面完成!
    image.png
    具体的实现有两个办法:
    办法一:AtomicMarkableReference
    保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
    办法2:Mark节点
    我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成。

    7.1.3 跳查表

    解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的
    下面先看一下跳查表的数据结构:

    static final class Node<K,V> {
            final K key;
            volatile Object value;
            volatile Node<K, V> next;
    
            /**
             * Creates a new regular node.
             */
            Node(K key, Object value, Node<K, V> next) {
                this.key = key;
                this.value = value;
                this.next = next;
            }
            //...
        }
    

    上图中的Node就是跳查表底层节点类型。所有的<K, V>对都是由这个单向链表串起来的。
    上面的Index层的节点:

       static class Index<K,V> {
            final Node<K, V> node;
            final Index<K, V> down;
            volatile Index<K, V> right;
    
            /**
             * Creates index node with given values.
             */
            Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
                this.node = node;
                this.down = down;
                this.right = right;
            }
            //...
        }
    

    上图中的node属性不存储实际数据,指向Node节点。
    down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点。
    right属性:Index也组成单向链表。
    整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:

    public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
            implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
        // ... 
        private transient Index<K,V> head; 
        // ... 
    }
    
    image.png

    下面详细分析如何从跳查表上查找、插入和删除元素。

    7.1.4 put实现分析

      private V doPut(K key, V value, boolean onlyIfAbsent) {
            Node<K,V> z;             // added node
            if (key == null)
                throw new NullPointerException();
            Comparator<? super K> cmp = comparator;
            outer: for (;;) {
                for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                    if (n != null) {
                        Object v; int c;
                        Node<K,V> f = n.next;
                        if (n != b.next)               // inconsistent read
                            break;
                        if ((v = n.value) == null) {   // n is deleted
                            n.helpDelete(b, f);
                            break;
                        }
                        if (b.value == null || v == n) // b is deleted
                            break;
                        if ((c = cpr(cmp, key, n.key)) > 0) {
                            b = n;
                            n = f;
                            continue;
                        }
                        if (c == 0) {
                            if (onlyIfAbsent || n.casValue(v, value)) {
                                @SuppressWarnings("unchecked") V vv = (V)v;
                                return vv;
                            }
                            break; // restart if lost race to replace value
                        }
                        // else c < 0; fall through
                    }
    
                    z = new Node<K,V>(key, value, n);
                    if (!b.casNext(n, z))
                        break;         // restart if lost race to append to b
                    break outer;
                }
            }
    
            int rnd = ThreadLocalRandom.nextSecondarySeed();
            if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
                int level = 1, max;
                while (((rnd >>>= 1) & 1) != 0)
                    ++level;
                Index<K,V> idx = null;
                HeadIndex<K,V> h = head;
                if (level <= (max = h.level)) {
                    for (int i = 1; i <= level; ++i)
                        idx = new Index<K,V>(z, idx, null);
                }
                else { // try to grow by one level
                    level = max + 1; // hold in array and later pick the one to use
                    @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                        (Index<K,V>[])new Index<?,?>[level+1];
                    for (int i = 1; i <= level; ++i)
                        idxs[i] = idx = new Index<K,V>(z, idx, null);
                    for (;;) {
                        h = head;
                        int oldLevel = h.level;
                        if (level <= oldLevel) // lost race to add level
                            break;
                        HeadIndex<K,V> newh = h;
                        Node<K,V> oldbase = h.node;
                        for (int j = oldLevel+1; j <= level; ++j)
                            newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                        if (casHead(h, newh)) {
                            h = newh;
                            idx = idxs[level = oldLevel];
                            break;
                        }
                    }
                }
                // find insertion points and splice in
                splice: for (int insertionLevel = level;;) {
                    int j = h.level;
                    for (Index<K,V> q = h, r = q.right, t = idx;;) {
                        if (q == null || t == null)
                            break splice;
                        if (r != null) {
                            Node<K,V> n = r.node;
                            // compare before deletion check avoids needing recheck
                            int c = cpr(cmp, key, n.key);
                            if (n.value == null) {
                                if (!q.unlink(r))
                                    break;
                                r = q.right;
                                continue;
                            }
                            if (c > 0) {
                                q = r;
                                r = r.right;
                                continue;
                            }
                        }
    
                        if (j == insertionLevel) {
                            if (!q.link(r, t))
                                break; // restart
                            if (t.node.value == null) {
                                findNode(key);
                                break splice;
                            }
                            if (--insertionLevel == 0)
                                break splice;
                        }
    
                        if (--j >= insertionLevel && j < level)
                            t = t.down;
                        q = q.down;
                        r = q.right;
                    }
                }
            }
            return null;
        }
    

    在底层,节点按照从小到大的顺序排列,上面的index层间隔地串在一起,因为从小到大排列。查找的时候,从顶层index开始,自左往右、自上往下,形成图示的遍历曲线。假设要查找的元素是32,遍历过程如下:
    先遍历第2层Index,发现在21的后面;
    从21下降到第1层Index,从21往后遍历,发现在21和35之间;
    从21下降到底层,从21往后遍历,最终发现在29和35之间。
    在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间。


    image.png

    在put代码中,通过findPredecessor找到了待插入的元素在[b,n]之间之后,并不能马上插入。因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的检查逻辑,而这也正是无锁链表的复杂之处。

    7.1.5 remove实现分析

       // 若找到了(key, value)就删除,并返回value;找不到就返回null
        final V doRemove(Object key, Object value) {
            if (key == null)
                throw new NullPointerException();
            Comparator<? super K> cmp = comparator;
            outer: for (;;) {
                for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                    Object v; int c;
                    if (n == null)
                        break outer;
                    Node<K,V> f = n.next;
                    if (n != b.next)                    // inconsistent read
                        break;
                    if ((v = n.value) == null) {        // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n)      // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) < 0)
                        break outer;
                    if (c > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (value != null && !value.equals(v))
                        break outer;
                    if (!n.casValue(v, null))
                        break;
                    if (!n.appendMarker(f) || !b.casNext(n, f))
                        findNode(key);                  // retry via findNode
                    else {
                        findPredecessor(key, cmp);      // clean index
                        if (head.right == null)
                            tryReduceLevel();
                    }
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
            }
            return null;
        }
    

    上面的删除方法和插入方法的逻辑非常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定位到元素所在的区间[b,n]。在定位之后,执行下面几个步骤:

    1. 如果发现b、n已经被删除了,则执行对应的删除清理逻辑;
    2. 否则,如果没有找到待删除的(k, v),返回null;
    3. 如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后面加上Marker节点,同时检查是否需要降低Index的层次。

    7.1.6 get实现分析

    private V doGet(Object key) {
            if (key == null)
                throw new NullPointerException();
            Comparator<? super K> cmp = comparator;
            outer: for (;;) {
                for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                    Object v; int c;
                    if (n == null)
                        break outer;
                    Node<K,V> f = n.next;
                    if (n != b.next)                // inconsistent read
                        break;
                    if ((v = n.value) == null) {    // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n)  // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) == 0) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    if (c < 0)
                        break outer;
                    b = n;
                    n = f;
                }
            }
            return null;
        }
    

    无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被删除,如果是,则需要执行相应的删除清理逻辑。这也正是无锁链表复杂的地方。

    7.2 ConcurrentSkipListSet

    如下面代码所示,ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。

    public class ConcurrentSkipListSet<E>
            extends AbstractSet<E>
            implements NavigableSet<E>, Cloneable, java.io.Serializable {
        // 封装了一个ConcurrentSkipListMap
        private final ConcurrentNavigableMap<E,Object> m;
        public ConcurrentSkipListSet() {
            m = new ConcurrentSkipListMap<E,Object>();
        }
        public boolean add(E e) {
            return m.putIfAbsent(e, Boolean.TRUE) == null;
        }
        // ... 
    }
    

    以上内容就是对Java并发编程中并发容器的一些介绍,其中阻塞队列中还有很多并没有一一赘述了。

    相关文章

      网友评论

        本文标题:Java并发编程之并发容器 CopyOnWrite,Concur

        本文链接:https://www.haomeiwen.com/subject/rwzppltx.html