美文网首页
LinkedBlockingQueue

LinkedBlockingQueue

作者: 程序员札记 | 来源:发表于2022-05-06 07:14 被阅读0次
    image.png

    LinkedBlockingDeque 与 LinkedBlockingQueue 对比

    LinkedBlockingDeque在结构上有别于之前讲解过的阻塞队列,它不是Queue而是Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除

    LinkedBlockingDeque是链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列,可以回顾下之前的LinkedBlockingQueue阻塞队列特点,本质上是类似的,但是又有些不同:

    • 内部是通过Node节点组成的链表来实现的,当然为了支持双端操作,结点结构不同
      LinkedBlockingQueue通过两个ReentrantLock锁保护竞争资源,实现了多线程对竞争资源的互斥访问,入队和出队互不影响,可同时操作,然而LinkedBlockingDeque只设置了一个全局ReentrantLock锁,两个条件对象实现互斥访问,性能上要比LinkedBlockingQueue差一些 (为什么有两把锁 和一把锁的区别?)

    • 无界,默认链表长度为Integer.MAX_VALUE,本质上还是有界

    • 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待

    Queue和Deque的关系有点类似于单链表和双向链表,LinkedBlockingQueue和LinkedBlockingDeque的内部结点实现就是单链表和双向链表的区别,具体可参考源码。

    在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:

    A线程先进行入队操作,B线程随后进行出队操作,如果是LinkedBlockingQueue,A线程入队过程还未结束(已获得锁还未释放),B线程出队操作不会被阻塞等待(锁不同),如果是LinkedBlockingDeque则B线程会被阻塞等待(同一把锁)A线程完成操作才继续执行

    LinkedBlockingQueue一般的操作是获取一把锁就可以,但有些操作例如remove操作,则需要同时获取两把锁,之前的LinkedBlockingQueue讲解曾经说明过

    LinkedBlockingQueue 由于是单链表结构,只能一端操作,读只能在头,写只能在尾,因此两把锁效率更高。LinkedBlockingDeque 由于是双链表结构,两端头尾都能读写,因此只能用一把锁保证原子性。 当然效率也就更低

    ArrayBlockingQueue与LinkedBlockingQueue对比

    ArrayBlockingQueue

    • 一个对象数组+一把锁+两个条件
    • 入队与出队都用同一把锁
    • 在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
    • 采用了数组,必须指定大小,即容量有限。从空间利用角度,数组结构的ArrayBlockingQueue要比LinkedBlockingQueue紧凑,因为其不需要创建所谓节点,但是其初始分配阶段就需要一段连续的空间,所以初始内存需求更大。

    LinkedBlockingQueue

    • 一个单向链表+两把锁+两个条件
    • 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
    • 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
    • 采用了链表,最大容量为整数最大值,可看做容量无限。ArrayBlockingQueue是有明确的容量限制的,而LinkedBlockingQueue则取决于我们是否在创建时指定,

    问题,为什么ArrayBlockingQueue 不能用两把锁
    因为取出后,ArrayBlockingQueue 的元素需要向前移动。

    概述

    LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

    构造器

    LinkedBlockingQueue一共有三个构造器,分别是无参构造器、可以指定容量的构造器、可以穿入一个容器的构造器。如果在创建实例的时候调用的是无参构造器,LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,这样做很可能会导致队列还没有满,但是内存却已经满了的情况(内存溢出)。

     public LinkedBlockingQueue();   //设置容量为Integer.MAX
    
     public LinkedBlockingQueue(int capacity);  //设置指定容量
    
    public LinkedBlockingQueue(Collection<? extends E> c);  //穿入一个容器,如果调用该构造器,容量默认也是Integer.MAX_VALUE
    
    

    LinkedBlockingQueue常用操作

    取数据

    • take():首选。当队列为空时阻塞

    • poll():弹出队顶元素,队列为空时,返回空

    • peek():和poll烈性,返回队队顶元素,但顶元素不弹出。队列为空时返回null

    • remove(Object o):移除某个元素,队列为空时抛出异常。成功移除返回true

    添加数据

    • put():首选。队满是阻塞

    • offer():队满时返回false

    判断队列是否为空

    size()方法会遍历整个队列,时间复杂度为O(n),所以最好选用isEmtpy

    put元素原理

    基本过程:

    1.判断元素是否为null,为null抛出异常

    2.加锁(可中断锁)

    3.判断队列长度是否到达容量,如果到达一直等待

    4.如果没有队满,enqueue()在队尾加入元素

    5.队列长度加1,此时如果队列还没有满,调用signal唤醒其他堵塞队列

       /**
         * 在队尾插一个元素
         * 如果队列满了,一直阻塞,直到队列不满了或者线程被中断
         */
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            final ReentrantLock putLock = this.putLock;//入队锁
            final AtomicInteger count = this.count;//当前队列中的元素个数
            putLock.lockInterruptibly();//加锁
            try {
                while (count.get() == capacity) {//如果队列满了 
                    /*
                     * 加入notFull等待队列,直到队列元素不满了,
                     * 被其他线程使用notFull.signal()唤醒
                     */
                    notFull.await();
                }
                enqueue(e);//入队
                c = count.getAndIncrement();//入队数量+1
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    

    take元素原理

    基本过程:

    1.加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁

    2.判断队列是否为空,如果为空就一直等待

    3.通过dequeue方法取得数据

    3.取走元素后队列是否为空,如果不为空唤醒其他等待中的队列

    /**
       * 出队:
       * 如果队列空了,一直阻塞,直到队列不为空或者线程被中断
       */
      public E take() throws InterruptedException {
          E x;
          int c = -1;
          final AtomicInteger count = this.count;//获取队列中的元素总量
          final ReentrantLock takeLock = this.takeLock;
          takeLock.lockInterruptibly();//获取出队锁
          try {
              while (count.get() == 0) {//如果没有元素,一直阻塞
                  /*
                   * 加入等待队列, 一直等待条件notEmpty(即被其他线程唤醒)
                   * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了)
                   */
                  notEmpty.await();
              }
              x = dequeue();//出队
              c = count.getAndDecrement();//元素数量-1
              if (c > 1)
                  notEmpty.signal();
          } finally {
              takeLock.unlock();
          }
          if (c == capacity)
              signalNotFull();
          return x;
      }
    
      /**
       * 从队列头部移除一个节点
       */
      private E dequeue() {
          Node<E> h = head;//获取头节点:x==null
          Node<E> first = h.next;//将头节点的下一个节点赋值给first
          h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备)
          head = first;//将当前将要出队的节点作为了头节点
          E x = first.item;//获取出队节点的值
          first.item = null;//将出队节点的值置空
          return x;
      }
    
      private void signalNotFull() {
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              notFull.signal();
          } finally {
              putLock.unlock();
          }
      }
    

    public boolean offer(E e)

    原理:在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false。

    
    /**
         * 在队尾插入一个元素, 容量没满,可以立即插入,返回true; 队列满了,直接返回false
         * 注:如果使用了限制了容量的队列,这个方法比add()好,因为add()插入失败就会抛出异常
         */
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final AtomicInteger count = this.count;// 获取队列中的元素个数
            if (count.get() == capacity)// 队列满了
                return false;
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            putLock.lock();// 获取入队锁
            try {
                if (count.get() < capacity) {// 容量没满
                    enqueue(e);// 入队
                    c = count.getAndIncrement();// 容量+1,返回旧值(注意)
                    if (c + 1 < capacity)// 如果添加元素后的容量,还小于指定容量(说明在插入当前元素后,至少还可以再插一个元素)
                        notFull.signal();// 唤醒等待notFull条件的其中一个线程
                }
            } finally {
                putLock.unlock();// 释放入队锁
            }
            if (c == 0)// 如果c==0,这是什么情况?一开始如果是个空队列,就会是这样的值,要注意的是,上边的c返回的是旧值
                signalNotEmpty();
            return c >= 0;
        }
    
    
        /**
         * 创建一个节点,并加入链表尾部
         * @param x
         */
        private void enqueue(E x) {
            /*
             * 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点
             */
            last = last.next = new Node<E>(x);
        }
    
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();//获取出队锁
            try {
                notEmpty.signal();//唤醒等待notEmpty条件的线程中的一个
            } finally {
                takeLock.unlock();//释放出队锁
            }
        }
    
    

    public E poll()

    原理:如果没有元素,直接返回null;如果有元素,出队

        /**
         * 出队: 
         * 1、如果没有元素,直接返回null 
         * 2、如果有元素,出队
         */
        public E poll() {
            final AtomicInteger count = this.count;// 获取元素数量
            if (count.get() == 0)// 没有元素
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();// 获取出队锁
            try {
                if (count.get() > 0) {// 有元素
                    x = dequeue();// 出队
                    // 元素个数-1(注意:该方法是一个无限循环,直到减1成功为止,且返回旧值)
                    c = count.getAndDecrement();
                    if (c > 1)// 还有元素(如果旧值c==1的话,那么通过上边的操作之后,队列就空了)
                        notEmpty.signal();// 唤醒等待在notEmpty队列中的其中一条线程
                }
            } finally {
                takeLock.unlock();// 释放出队锁
            }
            if (c == capacity)// c == capacity是怎么发生的?如果队列是一个满队列,注意:上边的c返回的是旧值
                signalNotFull();
            return x;
        }
    
    

    五、总结

    1、具体入队与出队的原理图

    图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用。

    1.1、初始化

    image.png

    初始化之后,初始化一个数据为null,且head和last节点都是这个节点。

    1.2、入队两个元素过后

    image.png

    1.3、出队一个元素后

    image.png

    表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

    2、三种入队对比:

    • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
    • put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
    • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断

    3、三种出队对比:

    • poll():如果没有元素,直接返回null;如果有元素,出队
    • take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
    • poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
      • 被唤醒
      • 等待时间超时
      • 当前线程被中断

    相关文章

      网友评论

          本文标题:LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/xxsayrtx.html