Java - Queue

作者: 齐晋 | 来源:发表于2019-02-28 18:11 被阅读0次

    简介

    Queue,翻译成队列,是一种先进先出(FIFO, First In First Out)的数据结构。最先放进去的,取的时候也就最先取出来。最形象的比喻就是我们常见的排队就是一个队列。排队时,新来的人进入队尾,先到的人首先接收服务。

    Queue大多数是单向队列,即只能从一端取数据,另一端放入数据。就像把羽毛球放入球筒,从一端放入,从另一端取出。

    image.png

    Queue体系

    public interface Queue<E> extends Collection<E> {
        boolean add(E e);
        boolean offer(E e);
        E remove();
        E poll();
        E element();
        E peek();
    }
    
    • boolean add(E e):向queue中添加element。成功返回true。不会返回false。当添加失败(比如queue有大小限制),则抛出异常IllegalStateException
    • boolean offer(E e):跟add类似。区别在于add插入失败会抛出异常,offer会返回false。
    • E remove(): 返回第一个element,并从queue中删除。queue为空则抛出异常NoSuchElementException
    • E pool(): 返回第一个element,并从queue中删除。queue为空则返回null
    • E element(): 返回第一个element,但是不从queue中删除。queue为空则抛出异常NoSuchElementException
    • E peak():返回第一个element,但是不从queue中删除。queue为空则抛出异常

    上面的6个方法分别表示了3种操作,每一种操作都有两种类型,抛出异常的类型有特定返回值的类型。下面用表格来描述上述方法的异同:

    方法 作用 返回值 成功 失败
    add(E e) 插入元素 boolean true IllegalStateException
    offer(E e) 同add() boolean true false
    remove(E e) 返回第一个元素,并删除 E E NoSuchElementException
    pool(E e) 同remove boolean E null
    element() 返回第一个元素,不删除 E E NoSuchElementException
    peak() 同element E E null

    Queue接口常见的扩展和实现有:

    image.png
    • AbstractQueue: 实现部分方法的抽象类。做为大多数具体实现类的基类。
    • BlockingQueue: 阻塞队列。在Queue的基础上增加了阻塞接口。
    • Deque: 双向队列,double ended queue的缩写。队列两端都可以操作队列。

    AbstractQueue

    AbstractQueue类似于很多JDK源码中的Abstract*类,实现了部分通用方法,做为具体实现类的基类。这里主要讲一下PriorityQueueConcurrentLinkedQueue

    PriorityQueue

    基于堆实现的优先队列。获取数据时是有序的。默认是升序。
    主要特点:

    • 无界队列。队列是用数组实现,需要指定大小,数组大小可以动态增加,容量无限。
    • 要实现插入的元素有序,有两种方法:
      • 插入元素实现了Comparable接口
      • 在构造函数中传入Comparator实现,如下
    public class PriorityQueue<E> extends AbstractQueue<E>{
        public PriorityQueue(int initialCapacity, Comparator<? super E> comparator);
    }
    
    • 非线程安全。如需线程安全的实现,请使用PriorityBlockingQueue
    • 插入的元素不能为null
    • 使用for-eachiterator来遍历优先队列,得到的结果并不保证有序。只有通过不断调用poll()/remove()方法得到的结果才是有序的。如果需要按顺序遍历,请考虑使用 Arrays.sort(pq.toArray())。如下:
    PriorityQueue<Integer> test = new PriorityQueue<>();
    test.add(10);
    test.add(4);
    test.add(7);
    test.add(2);
    test.add(9);
    
    //可保证有序输出,输出 2 4 7 9 10
    while (!test.isEmpty()) {
        System.out.println(test.poll());
    }
    
    //不保证有序输出,输出 2 4 7 10 9
    for(Integer e : test){
        System.out.println(e);
    }
    
    //不保证有序输出,输出 2 4 7 10 9
    Iterator it = test.iterator();
    while (it.hasNext()) {
        System.out.println(it.next());
    }
    

    ConcurrentLinkedQueue

    ConcurrentLinkedQueue是一个无锁线程安全队列。
    常见的线程安全实现都是通过加锁(在“线程安全的实现”这一节就能看到),而加锁的成本是很高的。如果能找到一种方法,既不用加锁,又能保证线程安全,很大可能能极大提升系统性能。ConcurrentLinkedQueue就是使用了这种无锁方法的一种队列。
    其实现思想借鉴了很通用,也很重要的CAS操作。CAS简介
    详情请见无锁队列 - ConcurrentLinkedQueue

    BlockingQueue

    BlockingQueue名为阻塞队列
    何为阻塞?以从队列中取数据为例,当队列为空时,Queue提供方法的表现为:

    • remove(): 抛出异常NoSuchElementException
    • poll(): 返回null
      这种情况下,如果想在队列不空时获取数据,只能通过循环不断调用remove()poll()。那么,如果能在队列为空的时候,方法不返回,而是等待数据,直到队列中有了数据,才继续进行,就方便很多了。这就是阻塞方法。

    BlockingQueueQueue的基础上主要扩展了以下几个阻塞方法:

    public interface BlockingQueue<E> extends Queue<E> {
        void put(E e) throws InterruptedException;
        E take() throws InterruptedException;
        boolean offer(E e, long timeout, TimeUnit unit)
           throws InterruptedException;
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    }
    
    • void put(E e): 作用同add(E e),当队列满时阻塞,直到队列不满。
    • boolean take(): 作用同remove(),当队列为空时阻塞,直到队列不空。
    • boolean offer(E e, long timeout, TimeUnit unit): 作用同offer(E e),只是增加了超时,允许等一段时间。如果过了超时时间还不能插入成功,则返回false。
    • E poll(long timeout, TimeUnit unit): 作用同poll(),也是增加了超时。如果过了超时时间还不能获取数据,则返回null。
    异常行为 插入数据 取数据
    抛出异常 add(E e) remove()
    返回null offer(E e) poll()
    阻塞 put(E e)/offer(E e, long timeout) take()/poll(long timeout)

    BlockingQueue的具体实现

    BlockingQueue只是个接口,其常用的具体实现有哪些呢?主要有以下这些:

    image.png
    • ArrayBlockingQueue: 基于数组的队列,有界队列,必须指定大小。
    • LinkedBlockingQueue: 基于链表的队列,无界队列,大小可指定,可不指定。
    • PriorityBlockingQueue: 优先队列,或者说有序队列。队列中的元素按照指定规则排好序。
    • SynchronousQueue: 没有任何容量(capacity)的队列,是一个比较特殊的队列。
    • DelayQueue: 延迟队列。放入的元素必须过了超时时间才能取出。放入的元素必须实现Delayed接口

    ArrayBlockingQueue

    主要特点:

    • 有界队列
    • 基于数组存储,数组长度固定,需要在构造函数中指定
    public class ArrayBlockingQueue<E> extends AbstractQueue<E> 
        implements BlockingQueue<E>, java.io.Serializable {
        final Object[] items;  //基于数组
        final ReentrantLock lock;  //线程同步锁
        private final Condition notEmpty;  //条件变量,用于取数据同时队列为空时阻塞线程
        private final Condition notFull;  //条件变量,用户插入数据同时队列满时阻塞线程
    }
    

    LinkedBlockingQueue

    主要特点:

    • 可以是无界队列,也可以是有界队列。区别在于是否设定队列大小。
    • 基于链表存储。
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        private transient Node<E> head;  //队列头
        private final ReentrantLock takeLock = new ReentrantLock();  //取数据线程同步锁
        private final Condition notEmpty = takeLock.newCondition(); //条件变量,用于取数据同时队列为空时阻塞线程
        private final ReentrantLock putLock = new ReentrantLock(); //插入数据线程同步锁
        private final Condition notFull = putLock.newCondition(); //条件变量,用户插入数据同时队列满时阻塞线程
    }
    

    注:put(E e)方法只有在队列满时才组设,因此,如果是无界队列,put(E e)永远不会阻塞。

    PriorityBlockingQueue

    优先队列PriorityQueue的线程安全版本,同时提供阻塞方法。
    主要特点同PriorityQueue

    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        private transient Object[] queue;
        
    }
    

    SynchronousQueue

    比较特殊的队列,没有任何存储空间。
    举个简单的例子:
    A是个快递员,要送快递给用户B,如果使用ArrayBlockingQueue或者LinkedBlockingQueue,会是这样:

    1. A把快递放到快递柜的箱子里(假设快递柜有20个箱子)
    2. 如果有空箱子,可以直接把快递放到空箱子中。
    3. 如果所有的箱子都满了,那么A等着,直到B取了任意快递,空出了箱子,A再把快递放到空箱子中。
    4. 如果所有的箱子都空了,取快递的人B会一直等待,直到有快递投递到箱子中。

    那么如果使用SynchronousQueue,情况就不同了

    1. 首先没有能临时存放快递的柜子和箱子
    2. A要送快递,会一直拿着快递等,直到B来取。
    3. B要取快递,也会一直等,直到A来送。
      SynchronousQueue就是类似这种“手到手”的交付方式,不经过任何媒介缓存。

    主要特点:

    • 没有任何数据结构可保存数据,不能调用peek()方法来看队列中是否有数据元素
    • 不能遍历队列,应为根本没有存储任何数据
    • 调用put()方法会等待,直到有其他线程调用了take()方法

    DelayQueue

    DelayQueue的继承结构如下:

    image.png
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
        private final PriorityQueue<E> q = new PriorityQueue<E>();  //存储数据
    }
    

    由类的声明可知,插入到DelayQueue中的元素必须实现Delayed接口。Delayed接口比较简单,只有一个getDelay(TimeUnit unit)方法。

    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);  //剩余时间
    }
    

    同时,DelayQueue存储数据既不像ArrayBlockingQueue使用数组,也不像LinkedBlockingQueue使用链表,而是使用现成的PriorityQueue。因此DelayQueue取数据的规则跟PriorityQueue类似。

    何谓延迟?
    延迟主要体现在取数据的时候。通过查看poll()的源码可知:

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    

    在取数的时候,getDelay()必须小于等于0才能把数取出来。通过实现getDelay()方法,就能实现过多长时间以后才能取出数据这种延迟效果。

    主要特点:

    • 无界队列,因此put(E e)不会阻塞
    • 调用取元素的方法:remove()poll()take()时,只有当元素超时以后才能取到。

    BlockingQueue的应用

    实现生产者-消费者
    生产者-消费者模式在工程之中应用广泛。BlockingQueue可极大简化实现生产者-消费者的难度。
    伪代码:

    //生产者
    public class Producer{
        private BlockingQueue queue;
        Producer(BlockingQueue queue){this.queue = queue}
        public void produce(E e){
            //生产者调用put()方法,队列满时阻塞等待。
            queue.put(e);
        }
    }
    
    //消费者
    public class Consumer{
        private BlockingQueue queue;
        Consumer(BlockingQueue queue){this.queue = queue}
        public E consume(){
            //消费者调用take()方法,队列空时阻塞等待
            queue.take(e);
        }
    }
    

    做为线程池的等待队列

    • 线程池ThreadPoolExecutor的构造函数接收BlockingQueue做为等待队列
    • Executors.newSingleThreadPool()/Executors.newFixedThreadPool()默认使用LinkedBlockingQueue做为等待队列
    • Executors.newCachedThreadPool()默认使用SynchronousQueue做为等待队列。可以做到一有请求,就创建新线程。

    求Top K大/小的元素
    比如有1亿个随机数字,找出最大的10个数。这种类似的求Top K的问题很常见。
    由于PriorityQueue/PriorityBlockingQueue底层结构是堆(大顶堆/小顶堆),而解决Top K问题的最好办法就是使用堆,因此PriorityQueue/PriorityBlockingQueue是解决该问题的不二选择。
    代码摘要:

    public class FixSizedPriorityQueue<E extends Comparable> {
        private PriorityQueue<E> queue;
        private int maxSize; // 堆的最大容量
    
        public FixSizedPriorityQueue(int maxSize) {
            if (maxSize <= 0)
                throw new IllegalArgumentException();
            this.maxSize = maxSize;
            this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
                public int compare(E o1, E o2) {
                    // 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 并修改 e.compareTo(peek) 比较规则
                    return (o2.compareTo(o1));
                }
            });
        }
    
        public void add(E e) {
            if (queue.size() < maxSize) { // 未达到最大容量,直接添加
                queue.add(e);
            } else { // 队列已满
                E peek = queue.peek();
                if (e.compareTo(peek) < 0) { // 将新元素与当前堆顶元素比较,保留较小的元素
                    queue.poll();
                    queue.add(e);
                }
            }
        }
    }
    

    延时需求
    如:考试时间为120分钟,30分钟后才可交卷。这种情况下,就需要使用到DelayQueue了。
    案例参考

    Deque

    Deque发音为'deck',为双向队列。
    Queue默认是单向队列,数据只能从一头放入,从另一头取出,这跟羽毛球的放取原理一样。双向队列就像放取乒乓球,可以从两端放入,也可以从两端取出。

    image.png

    Deque接口的部分组成

    public interface Deque<E> extends Queue<E> {
        //新增方法
        void addFirst(E e);
        void addLast(E e);
        boolean offerFirst(E e);
        boolean offerLast(E e);
        E removeFirst();
        E removeLast();
        E pollFirst();
        E pollLast();
        E getFirst();
        E getLast();
        E peekFirst();
        E peekLast();
        void push(E e);
        E pop();
    
        //Queue接口方法
        boolean add(E e);
        boolean offer(E e);
        E remove();
        E poll();
        E element();
        E peek();
    }
    

    从扩展的函数名字就能看出来,Deque分别针对插入和取出接口提供了对队列头和尾的操作。
    同时,从Queue继承过来方法的与扩展方法有如下对应关系:

    add(E e) == addLast(E e)
    offer(E e) == offerLast(E e)
    remove() == removeFirst()
    poll() == pollFirst()
    element() == getFirst()
    peek() == peekFirst()

    同时,Deque还扩展出了push(E e)pop()方法。其中:

    push(E e)等同于addFirst(E e)
    pop()等同于removeFirst()

    因此,当Deque只通过push(E e)pop()操作队列头时,Deque就演化成了一个栈Stack

    [图片上传失败...(image-f152e6-1551348671199)]

    扩展接口或实现类

    • ArrayDeque: 基于数组存储的双向队列
    • LinkedList: 基于链表存储的双向队列
    • ConcurrentLinkedDeque: 基于链表的线程安全版本
    • BlockingDeque: 扩展了Deque接口的阻塞接口,同时继承了BlockingQueue接口。
    • LinkedBlockingDeque: 实现了BlockingDeque的双向队列。

    线程安全的实现

    引申
    Java线程安全...

    BlockingQueue中的方法都是线程安全的,都使用了ReentrantLock做为锁。如:
    ArrayBlockingQueue
    ArrayBlockingQueue中有公共变量count来计数元素个数,因此需要一个全局的锁来保护。

    public ArrayBlockingQueue(int capacity, boolean fair) {
        lock = new ReentrantLock(fair);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //do something
        } finally {
            lock.unlock();
        }
    }
    
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //do something
        } finally {
            lock.unlock();
        }
    }
    

    LinkedBlockingQueue
    LinkedBlockingQueue中计数元素个数使用的是AtomicInteger类型,本身是线程安全的。因此没有使用全局锁,而是针对插入和获取分别创建了两个锁。

    private final ReentrantLock takeLock = new ReentrantLock();
    private final ReentrantLock putLock = new ReentrantLock();
    
    public boolean offer(E e) {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //do something
        } finally {
            putLock.unlock();
        }
    }
    
    public E poll() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //do something
        } finally {
            takeLock.unlock();
        }
    }
    

    引申
    上面实现线程安全的方式都是通过锁。而我们知道,锁是一个比较重的操作,在高并发系统中,能少用就少用。因此,在此介绍一个不用锁就能实现线程安全的队列:
    无锁队列 - ConcurrentLinkedQueue...
    无锁并且保证线程安全的思想是使用CAS...

    阻塞算法的实现

    引申
    Java线程通信...

    BlockingQueue的功能是个标准的生产者-消费者模式。线程间通信使用的是条件变量Condition。伪代码如下:

    ReentrantLock lock = new ReentrantLock(fair);
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
    
    public boolean put(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果队列已经满了,那么等待队列notFull
            while(count == container.size){
                notFull.await()
            }
            notEmpty.signal();
            //do other things
        } finally {
            lock.unlock();
        }
    }
    
    public E take() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //如果队列为空,那么等待队列notEmpty
            while (count == 0){
                notEmpty.await();
            }
            notFull.signal();
        } finally {
            lock.unlock();
        }
    }
    

    非阻塞型
    ConcurrentLinkedQueue: 无锁线程安全队列

    参考

    相关文章

      网友评论

        本文标题:Java - Queue

        本文链接:https://www.haomeiwen.com/subject/lhukuqtx.html