美文网首页
音视频开发之旅(55)-阻塞队列与无锁并发容器

音视频开发之旅(55)-阻塞队列与无锁并发容器

作者: yabin小站 | 来源:发表于2021-08-29 13:06 被阅读0次

    目录

    1. 阻塞队列的定义和使用场景
    2. 阻塞的队列的实现原理
    3. 简单学习无锁并发容器之ConcurrentLinkedQueue和CAS
    4. 资料
    5. 收获

    一、阻塞队列的定义和使用场景

    阻塞队列(BlockingQueue)在队列Queue的基础上增加了两个场景的阻塞

    1. 当队列满时,再向队列添加数据会阻塞,直到队列不满时
    2. 当队列为空时,再向队列获取数据会阻塞,直到队列变为非空

    阻塞队列常用于生产者消费者的场景

    下面我们先来Queue和BolckingQueue接口的定义

    //java.util.Queue
    public interface Queue<E> extends Collection<E> {
    
      //添加一个元素到队列,如果队列满时会抛出异常IllegalStateException
      boolean add(E e); 
    
      //添加一个元素到队列,如果队列满时不会抛异常,而是返回false
      boolean offer(E e);
      
      //从队列中获取并移除一个元素,如果队列为空, 会抛出NoSuchElementException
      E remove();
    
      //从队列中获取并移除一个元素,如果队列为空, 不会抛异常,而是返回null
      E poll();
      
      //从队列中获取一个元素 但不移除。注意和remove的区别
      //当队列为空时,会抛出异常NoSuchElementException
      E element();
      
      //从队列中获取一个元素,也不移除。注意和poll的区别
      //当队列为空时,不会抛出异常,而是返回null
      E peek();
    }
    
    //java.util.concurrent.BlockingQueue
    public interface BlockingQueue<E> extends Queue<E> {
    
    
      //插入一个元素到队列,如果队列满了,等待直到有空间空用
      void put(E e) throws InterruptedException;
    
      //插入一个元素到队列,如果队列满了,等待一定时间返回,或者有空间空用
      boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
      //获取队列的头元素,如果队列为空,则等待
      E take() throws InterruptedException;
    
      //从队列中获取并移除一个元素,如果队列为空,等待一段时间
      E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
    }
    

    我们可以看到BlockingQueue继承自Queue并且新增了几个阻塞的方法。

    Java中BlockingQueue接口有七个实现类,分别如下:

    1. ArrayBlockingQueue : 由数组结构组成的有界阻塞队列,在添加和获取时内部使用一个ReentrantLock可重入同步锁
    2. LinkedBlockingQueue:由链表结构组成的有界阻塞队列。在添加和获取时内部使用两个ReentrantLock,吞吐量高于ArrayBlockingQueue,Executors#newSingleThreadExecutor()和Executors#newFixedThreadPool(int)都使用了这个阻塞队列
      public static ExecutorService newSingleThreadExecutor() {
               return new FinalizableDelegatedExecutorService
                   (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
           }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    1. SynchronousQueue:不存储元素的阻塞队列。每个插入操作必须等待另个一个线程调用的移除操作,否则一致处于阻塞状态。吞吐量一般高于LinkedBlockingQueue。Executors#newCachedThreadPool()使用了这个阻塞队列
     public static ExecutorService newCachedThreadPool() {
               return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                             60L, TimeUnit.SECONDS,
                                             new SynchronousQueue<Runnable>());
           }
    
    1. PriorityBlockingQueue:支持优先级排序的无界阻塞队列
    2. DelayQueue:使用优先级队列实现的支持延迟获取元素的无界阻塞队列
    3. TransferQueue:链表结构组成的无界阻塞队列
    4. BlockingDeque:链表结构组成的双向阻塞队列

    二、阻塞的队列的实现原理(LinkedBlockingQueue)

    我们以LinkedBlockingQueue来分析

     //节点结构体 
      static class Node<E> {
            E item;
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
    
        /** 从队列获取元素时的可重入锁 ,非公平锁*/
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 非空condition,等待队列非空*/
        private final Condition notEmpty = takeLock.newCondition();
    
        /** 向队列中插入元素时的可重入锁 ,非公平锁*/
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** 非满condition,等待队列非满 */
        private final Condition notFull = putLock.newCondition();
    
        /**
         * 当队列有元素后,发出非空信号
         */
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    
        /**
         * 当队列由满到不满后,发出该非满信号
         */
        private void signalNotFull() {
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                notFull.signal();
            } finally {
                putLock.unlock();
            }
        }
    

    2.1 插入元素到队列

    offer的实现 (添加一个元素到队列,如果队列满时不会抛异常,而是返回false)

     public boolean offer(E e) {
            ...
            int c = -1;
            Node<E> node = new Node<E>(e);
            //获取写 可重入锁
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                //如果队列还未满,插入该元素节点
                if (count.get() < capacity) {
                    // enqueue 插入元素到队列,一会我们在看下其实现
                    enqueue(node);
                    c = count.getAndIncrement();
                    //如果插入后,还队列还未满,发送未满信号
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            // 如果成功插入后,发送非空信号
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }
    

    put的实现 (插入一个元素到队列,如果队列满了,等待直到有空间空用)

    public void put(E e) throws InterruptedException {
            ...
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
      
            //相比较offer这是差异点1
            //采用了可中断锁,等待过程中可以接收中断
            putLock.lockInterruptibly();
            try {
               //相比较offer这是差异点2,
               //如果当前队列满了,则阻塞,等待非空的信号到来
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    enqueue的实现

     private void enqueue(Node<E> node) {
           //把当前节点作为队列先前未节点的next插入到队列中
           //然后吧last指向新插入的节点
            last = last.next = node;
        }
    

    2.2 从队列获取元素

    poll的实现(从队列中获取并移除一个元素,如果队列为空, 不会抛异常,而是返回null)

    public E poll() {
           ...
            int c = -1;
            //获取取 可重入锁
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                 //如果当前队列的元素个数大于0
                if (count.get() > 0) {
                    //dequeue 从队列中获取一个元素,稍后再分析
                    x = dequeue();
                   //取出后如果队列中元素的个数还大于1
                  //(为什么不是大于0? 
                  //    这是因为getAndDecrement的实现是先获取再减1),
                  // 则发出非空信号
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            //如果c的值等于容器的值(由于getAndDecrement的实现是先获取再减1,这是队列从满变为了非满状态),则发出非满信号
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    take的实现 (获取队列的头元素,如果队列为空,则等待)

     public E take() throws InterruptedException {
    
            ...
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            //和poll的差异点1:wait时支持中断
            takeLock.lockInterruptibly();
            try {
              //和poll的差异点2:如果队列为空,则阻塞等待,知道收到非空的信号
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    dequeue的实现

        private E dequeue() {
            //链表操作的通用做法,head是一个虚节点
            Node<E> h = head;
            //头节点的next赋值给定义的first节点
            Node<E> first = h.next;
           //把先前的头节点头的next指向自身节点,方便gc
            h.next = h; // help GC
            //标记新的头节点给到head指针
            head = first;
            //获取元素
            E x = first.item;
            first.item = null;
            return x;
        }
    

    为了方便dequeue的理解,画下列表的节点图如下


    我们看先LinkedBlockingQueue再线程池中的使用,前面已经提到了,Executors#newSingleThreadExecutor()和Executors#newFixedThreadPool(int)都使用了LinkedBlockingQueue,我们通过下面两张来自《java并发编程的艺术》的示意图来看下


    其他阻塞队列的实现可以自行分析下,比如ArrayBlockingQueue和SynchronousQueue的实现。

    三、简单学习无锁并发容器之ConcurrentLinkedQueue和CAS

    上面介绍的LinkedBlockingQueue通过加锁阻塞的方式保证线程安全性。还有一种非阻塞的算法实现。ConcurrentLinkedQueue就是通过后者实现的,我们一起来分析学习下。

    public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
            implements Queue<E>, java.io.Serializable {
    
    
        private static class Node<E> {
            volatile E item;
            volatile Node<E> next;
        }
    
        static <E> Node<E> newNode(E item) {
            Node<E> node = new Node<E>();
            //这里的U是sun.misc.Unsafe
            U.putObject(node, ITEM, item);
            return node;
        }
    
        static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) {
            return U.compareAndSwapObject(node, NEXT, cmp, val);
        }
    
    
    public boolean offer(E e) {
            final Node<E> newNode = newNode(Objects.requireNonNull(e));
    
            for (Node<E> t = tail, p = t;;) {
                Node<E> q = p.next;
                if (q == null) {
                    // p is last node
                    if (casNext(p, null, newNode)) {
                        if (p != t) // hop two nodes at a time
                            casTail(t, newNode);  // Failure is OK.
                        return true;
                    }
                    // Lost CAS race to another thread; re-read next
                }
                else if (p == q)
                    p = (t != (t = tail)) ? t : head;
                else
                    p = (p != t && t != (t = tail)) ? t : q;
            }
        }
    
    }
    

    Unsafe类
    Unsafe类中存在直接操作内存的方法 ,Java中CAS操作的执行依赖于Unsafe类的方法,注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务

    CAS为什么能保证原子性?
    无锁策略则采用一种称为CAS的技术来保证线程执行的安全性
    CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下

    CAS(V,E,N)
    
    其包含3个参数
    V表示要更新的变量
    E表示预期值
    N表示新值
    //如果V值等于E值,则将V的值设为N。若V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做
    

    假设存在多个线程执行CAS操作并且CAS的步骤很多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?

    答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

    Unsafe这块源码解析和理解还是有些不足,根据需要再去看吧,Java并发系列到这里就暂时告一段落。
    接下来进入编解码的学习时间,准备建立学习和写作打卡群,有兴趣的欢迎加我微信“yabin_yangO2O”,备注 视频编码读书写作
    ,一起学习成长。

    四、资料

    1. 图书:《Java并发编程的艺术》
    2. 深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
    3. Java并发编程-无锁CAS与Unsafe类及其并发包Atomic

    五、收获

    通过本篇的学习实践

    1. 分析了java并发阻塞队列的应用和实现
    2. 简单分析学习了CAS和无锁并发容器ConcurrentLinkedQueue

    感谢你的阅读,Java并发编程到这里就暂告一段落,接下来一段时间会进入编码的学习时间。
    主要是针对《视频编码全角度详解》这本书的阅读和实践。以21天为一个周期(不一定要读完,但是每天至少读一页,且至少输出50字),有兴趣的朋友可以一起来学习交流,加我微信“yabin_yangO2O”,备注 视频编码读书写作

    下一篇我们开始视频编码知识的学习实践,欢迎关注公众号“音视频开发之旅”,一起学习成长。

    欢迎交流

    相关文章

      网友评论

          本文标题:音视频开发之旅(55)-阻塞队列与无锁并发容器

          本文链接:https://www.haomeiwen.com/subject/zjiwiltx.html