美文网首页
7. BlockingQueue

7. BlockingQueue

作者: shallowinggg | 来源:发表于2019-03-20 17:14 被阅读0次

    java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue

    BlockingQueue 使用

    一个BlockingQueue通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:

    生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。

    消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。

    BlockingQueue 方法

    BlockingQueue有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:

    抛出异常 返回特殊值 阻塞 超时
    插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
    删除 remove(o) poll() take() poll(timeout, timeunit)
    访问 element() peek()

    这4种不同的行为意味着:

    1. 抛出异常:
      如果请求的操作现在无法完成,则抛出异常。
    2. 特殊值:
      如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).
    3. 阻塞:
      如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。
    4. 超时:
      如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)

    无法插入nullBlockingQueue中。如果你尝试插入nullBlockingQueue则会抛出一个NullPointerException异常。

    你可以访问BlockingQueue内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection中的这些方法。

    BlockingQueue 实现

    由于BlockingQueue是一个接口,你需要使用它的一个实现来使用它。java.util.concurrent包中具有以下实现BlockingQueue接口的类:

    Java BlockingQueue 示例

    这是一个Java BlockingQueue示例。该示例使用实现BlockingQueue接口的ArrayBlockingQueue类。

    首先, BlockingQueueExample类在不同的线程中启动ProducerConsumerProducer将一个字符串插入共享的BlockingQueue,而Consumer使用它们。

    public class BlockingQueueExample {
    
        public static void main(String[] args) throws Exception {
    
            BlockingQueue queue = new ArrayBlockingQueue(1024);
    
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            new Thread(producer).start();
            new Thread(consumer).start();
    
            Thread.sleep(4000);
        }
    }
    

    这是Producer类。注意每次put()调用之间它都会睡一秒钟。这将导致Consumer阻塞,为了等待获取队列中的对象。

    public class Producer implements Runnable{
    
        protected BlockingQueue queue = null;
    
        public Producer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                queue.put("1");
                Thread.sleep(1000);
                queue.put("2");
                Thread.sleep(1000);
                queue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    这是Consumer类。它从队列中取出对象,然后将它们打印到System.out

    public class Consumer implements Runnable{
    
        protected BlockingQueue queue = null;
    
        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        public void run() {
            try {
                System.out.println(queue.take());
                System.out.println(queue.take());
                System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    下面是一个测试:

    import org.junit.Test;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingExample {
        @Test
        public void test() throws Exception {
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
    
            Thread thread1 = new Thread(producer);
            Thread thread2 = new Thread(consumer);
    
            thread1.start();
            thread2.start();
            
            thread1.join();
            thread2.join();
        }
    
        private static class Producer implements Runnable {
            private BlockingQueue<String> queue;
    
            Producer(BlockingQueue<String> queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    queue.put("1");
                    Thread.sleep(1000);
                    queue.put("2");
                    Thread.sleep(1000);
                    queue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private static class Consumer implements Runnable {
            private BlockingQueue<String> queue = null;
    
            Consumer(BlockingQueue<String> queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    System.out.println(queue.take());
                    System.out.println(queue.take());
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    结果如下:


    可以看出执行了2s 15ms才执行完

    下面是BlockingQueue的源码。注意每个方法文档中提及的异常,同时此接口还提供了两个方法int drainTo(Collection<? super E> c)int drainTo(Collection<? super E> c, int maxElements),这两个方式的作用是将队列中的元素增加到参数指定的集合中。

    public interface BlockingQueue<E> extends Queue<E> {
        /**
         * 将指定值插入到队列中,如果没有超出容量限制那么立刻完成并返回
         * {@code true} 成功
         * {@code IllegalStateException} 如果现在没有足够容量
         * 当使用一个容量限制的队列,使用{@link #offer(Object) offer}更好
         *
         * @param e the element to add
         * @return {@code true} (as specified by {@link Collection#add})
         * @throws IllegalStateException if the element cannot be added at this
         *         time due to capacity restrictions
         * @throws ClassCastException if the class of the specified element
         *         prevents it from being added to this queue
         * @throws NullPointerException if the specified element is null
         * @throws IllegalArgumentException if some property of the specified
         *         element prevents it from being added to this queue
         */
        boolean add(E e);
    
        /**
         * Inserts the specified element into this queue if it is possible to do
         * so immediately without violating capacity restrictions, returning
         * {@code true} upon success and {@code false} if no space is currently
         * available.  When using a capacity-restricted queue, this method is
         * generally preferable to {@link #add}, which can fail to insert an
         * element only by throwing an exception.
         *
         * @param e the element to add
         * @return {@code true} if the element was added to this queue, else
         *         {@code false}
         * @throws ClassCastException if the class of the specified element
         *         prevents it from being added to this queue
         * @throws NullPointerException if the specified element is null
         * @throws IllegalArgumentException if some property of the specified
         *         element prevents it from being added to this queue
         */
        boolean offer(E e);
    
        /**
         * Inserts the specified element into this queue, waiting if necessary
         * for space to become available.
         *
         * @param e the element to add
         * @throws InterruptedException if interrupted while waiting
         * @throws ClassCastException if the class of the specified element
         *         prevents it from being added to this queue
         * @throws NullPointerException if the specified element is null
         * @throws IllegalArgumentException if some property of the specified
         *         element prevents it from being added to this queue
         */
        void put(E e) throws InterruptedException;
    
        /**
         * Inserts the specified element into this queue, waiting up to the
         * specified wait time if necessary for space to become available.
         *
         * @param e the element to add
         * @param timeout how long to wait before giving up, in units of
         *        {@code unit}
         * @param unit a {@code TimeUnit} determining how to interpret the
         *        {@code timeout} parameter
         * @return {@code true} if successful, or {@code false} if
         *         the specified waiting time elapses before space is available
         * @throws InterruptedException if interrupted while waiting
         * @throws ClassCastException if the class of the specified element
         *         prevents it from being added to this queue
         * @throws NullPointerException if the specified element is null
         * @throws IllegalArgumentException if some property of the specified
         *         element prevents it from being added to this queue
         */
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element becomes available.
         *
         * @return the head of this queue
         * @throws InterruptedException if interrupted while waiting
         */
        E take() throws InterruptedException;
    
        /**
         * Retrieves and removes the head of this queue, waiting up to the
         * specified wait time if necessary for an element to become available.
         *
         * @param timeout how long to wait before giving up, in units of
         *        {@code unit}
         * @param unit a {@code TimeUnit} determining how to interpret the
         *        {@code timeout} parameter
         * @return the head of this queue, or {@code null} if the
         *         specified waiting time elapses before an element is available
         * @throws InterruptedException if interrupted while waiting
         */
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Returns the number of additional elements that this queue can ideally
         * (in the absence of memory or resource constraints) accept without
         * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
         * limit.
         *
         * <p>Note that you <em>cannot</em> always tell if an attempt to insert
         * an element will succeed by inspecting {@code remainingCapacity}
         * because it may be the case that another thread is about to
         * insert or remove an element.
         *
         * @return the remaining capacity
         */
        int remainingCapacity();
    
        /**
         * Removes a single instance of the specified element from this queue,
         * if it is present.  More formally, removes an element {@code e} such
         * that {@code o.equals(e)}, if this queue contains one or more such
         * elements.
         * Returns {@code true} if this queue contained the specified element
         * (or equivalently, if this queue changed as a result of the call).
         *
         * @param o element to be removed from this queue, if present
         * @return {@code true} if this queue changed as a result of the call
         * @throws ClassCastException if the class of the specified element
         *         is incompatible with this queue
         * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
         * @throws NullPointerException if the specified element is null
         * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
         */
        boolean remove(Object o);
    
        /**
         * Returns {@code true} if this queue contains the specified element.
         * More formally, returns {@code true} if and only if this queue contains
         * at least one element {@code e} such that {@code o.equals(e)}.
         *
         * @param o object to be checked for containment in this queue
         * @return {@code true} if this queue contains the specified element
         * @throws ClassCastException if the class of the specified element
         *         is incompatible with this queue
         * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
         * @throws NullPointerException if the specified element is null
         * (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
         */
        boolean contains(Object o);
    
        /**
         * 从队列中删除所有能访问的元素并且将他们增加到指定的集合中。
         * 这个操作可能比重复的调用poll方法效率更高。当尝试将元素增加
         * 到集合中时如果失败啊,那么可能队列和集合中都包含或都不包含
         * 此元素。如果指定的集合是它自己,那么抛出一个IllegalArgumentException。
         * 当此操作进行时,如果指定的集合被修改,那么这个操作将是未定义的。
         *
         * @param c the collection to transfer elements into
         * @return the number of elements transferred
         * @throws UnsupportedOperationException if addition of elements
         *         is not supported by the specified collection
         * @throws ClassCastException if the class of an element of this queue
         *         prevents it from being added to the specified collection
         * @throws NullPointerException if the specified collection is null
         * @throws IllegalArgumentException if the specified collection is this
         *         queue, or some property of an element of this queue prevents
         *         it from being added to the specified collection
         */
        int drainTo(Collection<? super E> c);
    
        /**
         * Removes at most the given number of available elements from
         * this queue and adds them to the given collection.  A failure
         * encountered while attempting to add elements to
         * collection {@code c} may result in elements being in neither,
         * either or both collections when the associated exception is
         * thrown.  Attempts to drain a queue to itself result in
         * {@code IllegalArgumentException}. Further, the behavior of
         * this operation is undefined if the specified collection is
         * modified while the operation is in progress.
         *
         * @param c the collection to transfer elements into
         * @param maxElements the maximum number of elements to transfer
         * @return the number of elements transferred
         * @throws UnsupportedOperationException if addition of elements
         *         is not supported by the specified collection
         * @throws ClassCastException if the class of an element of this queue
         *         prevents it from being added to the specified collection
         * @throws NullPointerException if the specified collection is null
         * @throws IllegalArgumentException if the specified collection is this
         *         queue, or some property of an element of this queue prevents
         *         it from being added to the specified collection
         */
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    相关文章

      网友评论

          本文标题:7. BlockingQueue

          本文链接:https://www.haomeiwen.com/subject/gzbrmqtx.html