美文网首页Android开发Java学习笔记技术干货
Java并发集合_BlockingQueue原理分析

Java并发集合_BlockingQueue原理分析

作者: wo883721 | 来源:发表于2018-04-03 09:54 被阅读56次

    假如我们现在有这样的需求,有一个仓库,我们可以存东西和取东西,仓库有存储上限。当仓库已满的时候,存东西的人就必须等待,直到有人取走东西。当仓库为空的时候,取东西的人必须等待,直到有人存入东西。

    这是一个典型的生产者消费者问题。这里有两个条件,仓库已满的条件和仓库为空的条件,用条件让线程等待,这个让我们想到了并发框架下的Condition。

    自己实现这个功能也不难,但是java中给我们提供了很好的实现,就是并发集合中的阻塞队列BlockingQueue。

    // 阻塞队列的接口
    public interface BlockingQueue<E> extends Queue<E> {}
    

    阻塞队列BlockingQueue继承自Queue队列接口。而Queue接口我们在LinkedList(源码解析)文章中详细介绍过了,这里先回顾一下。

    一. Queue接口

    队列是一种FIFO(先入先出)的数据结构,也就是说每次插入元素都是插入在队列尾,每次取出元素都是在队列头取出。
    所以队列应该有三个重要方法:

    1. boolean offer(E e); 向队列尾添加元素。
    2. E poll(); 移除队列头元素,并返回它。
    3. E peek(); 查看队列头元素。

    下面我们来看一下Queue接口的代码

      public interface Queue<E> extends Collection<E> {
    
        // 向队列末尾新添加元素,返回true表示添加成功
       // 不会返回false,因为添加失败直接抛出IllegalStateException异常。
       // 一般调用offer方法实现。
        boolean add(E e);
    
        // 向队列末尾新添加元素,返回true表示添加成功,返回false,添加失败
        boolean offer(E e);
    
        // 这个与Collection中的remove方法不一样,因为Collection中的remove方法都要提供一个元素或者集合,用于删除。
        // 这里不穿任何参数,就是代表删除队列第一个元素(即队列头),并返回它
        // 还需要注意的时,如果队列是空的,即队列头是null,这个方法会抛出NoSuchElementException异常。
        E remove();
    
        // 这个方法也是删除队列第一个元素(即队列头),并返回它
        // 但是它和remove()方法不同的时,如果队列是空的,即队列头是null,它不会抛出异常,而是会返回null。
        E poll();
    
        // 查看队列头的元素,如果队列是空的,就抛出异常
        E element();
    
        // 查看队列头的元素。如果队列是空的,不会抛出异常,而是返回null
        E peek();
    }
    

    可以看出继承自Collection接口,但是Queue接口还提供了是三个好像重复的方法。

    1. 向队列尾添加元素的方法:add(E e)与offer(E e)。区别就是队列是满的,添加失败时,add方法会抛出异常,而offer方法只会返回false。
    2. 移除队列头元素的方法:remove()与poll()。区别就是队列为空的时候,remove方法会抛出异常,poll方法只会返回null。
    3. 查看队列头元素的方法:element()与peek()。区别就是队列为空的时候,element方法会抛出异常,peek方法只会返回null。

    二. BlockingQueue接口

    阻塞队列BlockingQueue与普通队列Queue相比较,不同有两点:

    1. 当队列已满的时候,当前线程会阻塞等待,直到队列有空余位置,当前线程会被唤醒,将元素插入队列尾。
    2. 当队列为空的时候,当前线程会阻塞等待,直到队列中有元素,当前线程会被唤醒,从队列头取出元素。

    在BlockingQueue中有这样两个方法:

        // 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
        void put(E e) throws InterruptedException;
    
        /**
         * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
         */
        E take() throws InterruptedException;
    

    但是有这样一个问题,当队列已满的时候,当前线程会阻塞等待,它只能祈祷有别的线程取出队列中的元素,如果一直没有线程取走元素,那么当前线程就会一直阻塞等待,整个过程对当前线程是不可控的。
    如果我们希望对等待时间有一定的控制,不能出现一直等待的现象,那么就要增加超时时间,所以要添加有超时时间的对应方法。

         /**
         * 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
         * 如果等待时间超过timeout了,那么返回false,表示添加失败
         */
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
         * 如果等待时间超过timeout了,那么返回false,表示获取元素失败
         */
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    

    所以与Queue接口进行对比,BlockingQueue阻塞队列的插入、移除和查看的方法不同:

    1. 向队列尾添加元素的方法:
      1). add(E e)与offer(E e)方法,和Queue接口作用一样,只不过加了多线程同步安全的操作。
      2). put(E e)方法,添加元素时,当前线程可能会阻塞等待。
      3). offer(E e, long timeout, TimeUnit unit)方法,与put(E e)方法作用相同,只不过加了超时时间。(其实这个方法名用put更好,用offer容易让人和offer(E e)方法产生混淆)

    2. 移除队列头元素的方法:
      1). remove()与poll()方法,和Queue接口作用一样,只不过加了多线程同步安全的操作。(注:这个没有在BlockingQueue接口中重新声明)
      2). take()方法,移除元素时,当前线程可能会阻塞等待。
      3). poll(long timeout, TimeUnit unit) 方法,与take()方法作用相同,只不过加了超时时间。(其实这个方法名用take更好,用poll容易让人和poll()方法产生混淆)

    3. 查看队列头元素的方法:BlockingQueue接口没有添加新的关于查看队列头的新方法,因为查看元素不会改变队列,所以不需要增加阻塞的方法。还是element()与peek()这两个方法。

    package java.util.concurrent;
    
    import java.util.Collection;
    import java.util.Queue;
    
    // 阻塞队列的接口
    public interface BlockingQueue<E> extends Queue<E> {
        /**
         * 向队列尾添加元素。如果队列已满,添加失败,抛出IllegalStateException异常
         */
        boolean add(E e);
    
        // 向队列末尾新添加元素。返回true表示添加成功,false表示添加失败,不会抛出异常
        boolean offer(E e);
    
        // 向队列末尾新添加元素,如果队列已满,当前线程就等待。响应中断异常
        void put(E e) throws InterruptedException;
    
        /**
         * 向队列末尾新添加元素,如果队列中没有可用空间,当前线程就等待,
         * 如果等待时间超过timeout了,那么返回false,表示添加失败
         */
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。响应中断异常
         */
        E take() throws InterruptedException;
    
        /**
         * 返回并移除队列第一个元素,如果队列是空的,就前线程就等待。
         * 如果等待时间超过timeout了,那么返回false,表示获取元素失败
         */
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // 返回队列剩余空间的个数。(即还能添加多少个元素)
        int remainingCapacity();
    
    
        // 移除队列中包含的所有o对象。返回true表示队列有变化,即删除了元素,返回false表示没有删除任何元素。
        boolean remove(Object o);
    
    
        // 队列中是否包含对象o
        public boolean contains(Object o);
    
    
        /**
         * 移除阻塞队列中所有有效元素,把它们添加到另一个集合c中。返回值转移的元素个数。
         * @param c
         * @return 实际转移的元素个数。
         */
        int drainTo(Collection<? super E> c);
    
        /**
         * 最多移除阻塞队列中maxElements个数的有效元素,把它们添加到另一个集合c中。
         * 返回值实际转移的元素个数。
         * @param c
         * @param maxElements 最多添加的元素个数
         * @return 实际转移的元素个数。
         */
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    相关文章

      网友评论

        本文标题:Java并发集合_BlockingQueue原理分析

        本文链接:https://www.haomeiwen.com/subject/jetqhftx.html