美文网首页Java学习笔记Java 杂谈Spring-Boot
【并发编程系列8】阻塞队列之ArrayBlockingQueue

【并发编程系列8】阻塞队列之ArrayBlockingQueue

作者: 刀哥说Java | 来源:发表于2020-10-10 13:39 被阅读0次

    什么是阻塞队列

    阻塞队列有两个特点:

    • 当队列中没有元素时,从队列中获取元素会被阻塞
    • 当队列满了时,添加元素会被阻塞

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素,消费者则从队列里取元素。

    队列Queue接口核心方法

    阻塞队列,本质上来说还是属于队列,也就是说阻塞队列继承了队列的功能,这里我们先来看看Queue接口中的几个核心方法:

    方法 功能
    add(e) 添加一个元素,成功返回true,如果空间满了则抛出异常
    offer(e) 添加一个元素,成功返回true,如果空间满了则返回false,处理有界队列时优于add方法
    remove() 检索并移除队列头元素,成功则返回移除的元素,如果队列为空则抛出异常
    poll() 检索并移除队列头元素,成功则返回移除的元素,如果队列为空则返回null
    element() 检索并返回队列头元素,如果队列为空则抛出异常
    peek() 检索并返回队列头元素,如果位列为空则返回null

    这几个方法是队列接口所提供的,然而这些方法并不会阻塞,所以需要重新定义阻塞队列的接口,下面我们看看阻塞队列中的核心方法。

    阻塞队列BlockigQueue接口核心方法

    方法 功能
    put(e) 添加一个元素,成功返回true,如果空间满了则阻塞等待
    offer(e,time,unit) 添加一个元素,成功返回true,如果空间满了则阻塞指定时间,到达指定时间还没空间则返回null
    take() 检索并移除队列头元素,成功则返回移除的元素,如果队列为空则阻塞
    poll(time,unit) 检索并移除队列头元素,成功则返回移除的元素,如果队列为空则阻塞指定时间,到达指定时间后队列还是空则返回null
    drainTo(Collection) 一次性获取队列所有元素放到指定的集合中,并返回转移个数
    drainTo(c,n) 一次性获取队列中指定个数的元素放到指定的集合中,并返回转移个数
    remainingCapacity() 返回队列中理想情况下可添加元素个数

    在Java中,提供了7种常用的阻塞队列。

    • ArrayBlockingQueue:由数组结构组成的有界阻塞队列
    • LinkedBlockingQueue:由链表结构组成的有界阻塞队列
    • PriorityBlockingQueue:支持优先级排序的无界阻塞队列
    • DelayQueue:使用优先级队列实现的无界阻塞队列
    • SynchronousQueue:不存储元素的阻塞队列
    • LinkedTransferQueue:由链表结构组成的无界阻塞队列
    • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

    ArrayBlockingQueue

    ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下采用非公平锁的方式实现,可以通过构造器传参控制是采用公平锁还是非公平锁实现。先看看ArrayBlockingQueue类图关系:


    在这里插入图片描述

    可以看到有3个构造器,其实最终都是会调用上图中第二个构造器进行初始化,第三个构造器在初始化之后会再进行赋值(如果传入的Collection不为空)。

    ArrayBlockingQueue nonFairQueue = new ArrayBlockingQueue(10);//默认非公平锁实现
    ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(10,true);//true表示公平锁
    
    

    模拟实现生产者消费者

    package com.zwx.concurrent.queue.block;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    public class ArrayBlockingQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(100);//默认非公平锁实现
            new Thread(new ConsumerThread(queue)).start();
            Thread.sleep(2000);
            new Thread(new ProcuctThread(queue)).start();
        }
    }
        class ProcuctThread extends Thread{
            private ArrayBlockingQueue queue;
            public ProcuctThread(ArrayBlockingQueue queue) {
                this.queue = queue;
            }
            @Override
            public void run() {
               for (int i=0;i<100;i++){
                   try {
                       queue.put(i);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
            }
        }
    
        class ConsumerThread extends Thread{
            private ArrayBlockingQueue queue;
            public ConsumerThread(ArrayBlockingQueue queue) {
                this.queue = queue;
            }
            @Override
            public void run() {
                while (true){
                    try {
                        int a = (int)queue.take();
                        System.out.println("消费:" + a);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    

    上面的示例中,我们先启动了消费者模式,运行之后可以发现,因为队列是空的,所以会阻塞等待生产者添加元素之后输出。

    初始化队列

    首先是初始化两个Condition队列分别用于阻塞生产者和消费者线程,关于Condition队列,想详细了解的可以点击这里

    在这里插入图片描述

    添加元素(生产者)

    添加元素的时候,如果队列满了,就阻塞,不满则调用enque方法添加元素


    在这里插入图片描述

    获取元素时通过内部维护的putIndex逐个往后添加元素,满了则重新从0开始


    在这里插入图片描述

    获取元素(消费者)

    首先会判断队列是不是空了,空了就阻塞,否则调用dequeue方法获取元素


    在这里插入图片描述

    调用dequeue方法移除元素,并唤醒添加元素线程。


    在这里插入图片描述

    LinkedBlockingQueue

    一个由链表结构组成的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序,和ArrayBlockingQueue的区别是ArrayBlockingQueue内部维护的是一个数组,通过数组下标来维护队列,而LinkedBlockingQueue维护的是一个链表,通过Node来维护队列。还是先来看看类图关系:


    在这里插入图片描述

    LinkedBlockingQueue依然有3个构造函数,第一个和第三个构造器最终也是会调用第二个构造器,而且默认会初始化一个Integer.MAX_VALUE大小队列,第三个构造器在初始化队列之后会进行赋值(如果传入的Collection不为空)。

    初始化队列

    在这里插入图片描述

    Node是LinkedBlockingQueue中的一个静态内部类:


    在这里插入图片描述

    所以第一次初始化之后Node节点中的item是默认为null值的。初始化之后得到如下队列:


    在这里插入图片描述

    这个头节点也是一个哨兵,和AQS同步队列一样,设置了一个空信息的节点作为哨兵。

    添加元素(生产者)

    在这里插入图片描述

    添加元素获取的是putLock,后面可以看到,获取元素获取的的takeLock,采用了读写双锁分离方式实现性能上的提升。
    再看下添加元素的enqueue方法:


    在这里插入图片描述

    添加元素后得到如下队列:


    在这里插入图片描述

    获取元素(消费者)

    在这里插入图片描述

    这里消费者获取的是另一把锁takeLock,这里的逻辑也应该是比较容易看懂,我们进入真正获取元素的dequeue方法:


    在这里插入图片描述

    1、我们先看219行,执行之后得到如下队列:


    在这里插入图片描述

    2、继续看221行代码,执行之后得到如下结果:


    在这里插入图片描述

    经过这两步,其实原先的E1节点已经被移除了,那么从上图可以看到,原先旧的head节点中next还持有了当前head节点的引用,这时候根据GC的可达性算法,是无法回收的,所以需要将next的引用去掉(也就是上面218行代码的作用),这样Node节点就没有持有其他对象的引用了,GC就可以将其当做垃圾回收掉,这个和之前在AQS同步队列以及Condition队列中做法是一个意思,都是为了取消其引用,方便GC。

    LinkedBlockingDeque

    LinkedBlockingDeque是和LinkedBlockingQeque一样均是由链表结构组成,也就是说内部都是通过一个Node内部类来实现聊表,而LinkedBlockingDeque是双向的阻塞队列,故而Node中肯定会比单向的多了一个prev指向前一个节点。

    双向队列因为多了一个操作队列的入口,所以相比较于LinkedBlockingQeque单向队列中多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst,而take方法却等同于takeFirst,这些事实需要注意的,为了避免混乱,建议使用的时候还是带上First和Last关键字。
    首先还是来看一看类图:


    在这里插入图片描述

    可以看出相比较于单项队列,多了一个Deque(双向)接口,构造器和单向列表一样,也是提供了3个。

    初始化队列

    在这里插入图片描述

    这里可以看到,初始化的时候没有设置任何节点,仅仅只是设置了一个容量。

    添加元素(生产者)

    从First添加

    在这里插入图片描述

    这里基本没有逻辑,主要看linkFirst(Node)方法:


    在这里插入图片描述

    从Last添加

    在这里插入图片描述

    这个方法也是一样没有逻辑,主要看linkLast(Node)方法:


    在这里插入图片描述

    获取元素(消费者)

    从First获取

    在这里插入图片描述

    继续看unlinkFirst这个方法:


    在这里插入图片描述

    这些逻辑和上面的单向队列是差不多的,只是多了一个prev指向

    从Last获取

    在这里插入图片描述

    继续看unlinkLast()方法:


    在这里插入图片描述

    总结

    本文主要讲述了Java提供的7种队列中的其中三种队列,这三种队列实现方式比较接近,而且源码比较容易理解。

    相关文章

      网友评论

        本文标题:【并发编程系列8】阻塞队列之ArrayBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/tuadpktx.html