美文网首页
七大阻塞队列的使用

七大阻塞队列的使用

作者: 永远的太阳0123 | 来源:发表于2019-06-17 22:28 被阅读0次

    1 Queue<E>接口

    public interface Queue<E> extends Collection<E> {
        // 插入指定元素
        // 插入成功返回true;如果队列没有可用空间,抛出异常
        boolean add(E e);
    
        // 插入指定元素
        // 插入成功返回true;如果队列没有可用空间,返回false
        boolean offer(E e);
    
        // 获取和删除头部元素
        // 如果队列中没有元素,抛出异常
        E remove();
    
        // 获取和删除头部元素
        // 如果队列中没有元素,返回null
        E poll();
    
        // 获取头部元素
        // 如果队列中没有元素,抛出异常
        E element();
    
        // 获取头部元素
        // 如果队列中没有元素,返回null
        E peek();
    }
    

    2 BlockingQueue<E>接口

    public interface BlockingQueue<E> extends Queue<E> {
        // 插入指定元素
        // 插入成功返回true;如果队列没有可用空间,抛出异常
        boolean add(E e);
    
        // 插入指定元素
        // 插入成功返回true;如果队列没有可用空间,返回false
        boolean offer(E e);
    
        // 插入指定元素
        // 插入成功直接返回;如果队列没有可用空间,阻塞当前线程
        void put(E e) throws InterruptedException;
    
        // 插入指定元素
        // 插入成功返回true;如果队列没有可用空间,阻塞当前线程;如果在指定时间之内队列一直没有可用空间,返回false
        boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // 获取和删除头部元素
        // 如果队列中没有元素,阻塞当前线程
        E take() throws InterruptedException;
    
        // 获取和删除头部元素
        // 如果队列中没有元素,阻塞当前线程;如果在指定时间之内队列中一直没有元素,返回null
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        // 获取队列的剩余容量
        int remainingCapacity();
    
        // 删除指定元素
        // 如果队列发生改变,返回true
        boolean remove(Object o);
    
        // 判断队列中是否包含指定元素
        public boolean contains(Object o);
    
        // 删除队列中的全部元素,并将这些元素添加到指定集合中
        int drainTo(Collection<? super E> c);
    
        // 删除队列中指定数量的元素,并将这些元素添加到指定集合中
        int drainTo(Collection<? super E> c, int maxElements);
    }
    

    3 ArrayBlockingQueue

    3.1 ArrayBlockingQueue的特点

    (1)数据结构:数组。
    (2)有界无界:有界。
    (3)出队入队:FIFO。

    3.2 ArrayBlockingQueue的构造方法

        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        // 传入的fair用于创建ReentrantLock
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    
        // 传入的fair用于创建ReentrantLock
        public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) {
            this(capacity, fair);
    
            final ReentrantLock lock = this.lock;
            lock.lock(); // 获取锁,保证可见性
            try {
                int i = 0;
                try {
                    for (E e : c) {
                        checkNotNull(e);
                        items[i++] = e;
                    }
                } catch (ArrayIndexOutOfBoundsException ex) {
                    throw new IllegalArgumentException();
                }
                count = i;
                putIndex = (i == capacity) ? 0 : i;
            } finally {
                lock.unlock();
            }
        }
    

    4 LinkedBlockingQueue

    4.1 LinkedBlockingQueue的特点

    (1)数据结构:单向链表。
    (2)有界无界:可选。
    (3)出队入队:FIFO。

    4.2 LinkedBlockingQueue的构造方法

        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            putLock.lock(); // 获取锁,保证可见性
            try {
                int n = 0;
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
                putLock.unlock();
            }
        }
    

    5 SynchronousQueue

    5.1 SynchronousQueue的特点

    (1)SynchronousQueue是一个特殊的队列,它不存储任何元素。
    (2)一个线程向队列中添加元素,不会立即返回,直至另一个线程从队列中获取元素;一个线程从队列中获取元素,不会立即返回,直至另一个线程向队列中添加元素。
    (3)这里的Synchronous是指读线程和写线程同步。

    5.2 SynchronousQueue中的构造方法

        public SynchronousQueue() {
            this(false);
        }
    
        public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }
    

    6 PriorityBlockingQueue

    6.1 PriorityBlockingQueue的特点

    (1)数据结构:数组。
    (2)有界无界:无界。支持自动扩容,最大容量为Integer.MAX_VALUE - 8。
    (3)出队入队:优先级大小。

    6.2 PriorityBlockingQueue中的构造方法

        // DEFAULT_INITIAL_CAPACITY为11
        public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    
        public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    
        public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    
        public PriorityBlockingQueue(Collection<? extends E> c) {
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            // heapify表示是否进行堆排序
            boolean heapify = true;
            // screen表示是否检查集合中的元素
            boolean screen = true;
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                this.comparator = (Comparator<? super E>) ss.comparator();
                heapify = false;
            }
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                this.comparator = (Comparator<? super E>) pq.comparator();
                screen = false;
                // 如果类型匹配
                if (pq.getClass() == PriorityBlockingQueue.class)
                    heapify = false;
            }
            Object[] a = c.toArray();
            int n = a.length;
            // If c.toArray incorrectly doesn't return Object[], copy it.
            if (a.getClass() != Object[].class)
                a = Arrays.copyOf(a, n, Object[].class);
    
    
    
    
    
            // 如果集合是PriorityBlockingQueue,不检查集合中的元素
            // 如果集合是SortedSet并且集合中只有一个元素,检查集合中的元素
            // 如果集合是SortedSet并且集合中含有多个元素并且SortedSet中的comparator不等于null,检查集合中的元素
            // 如果集合是SortedSet并且集合中含有多个元素并且SortedSet中的comparator等于null,不检查集合中的元素
            // 如果集合既不是PriorityBlockingQueue又不是SortedSet并且集合中只有一个元素,检查集合中的元素
            // 如果集合既不是PriorityBlockingQueue又不是SortedSet并且集合中含有多个元素,不检查集合中的元素
    
    
    
            if (screen && (n == 1 || this.comparator != null)) {
                for (int i = 0; i < n; ++i)
                    if (a[i] == null)
                        throw new NullPointerException();
            }
            this.queue = a;
            this.size = n;
            if (heapify)
                heapify();
        }
    

    7 DelayQueue

    一个使用优先级队列实现的无界阻塞队列。

        public DelayQueue() {}
    
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
    

    8 LinkedTransferQueue

    一个由链表结构组成的无界阻塞队列。

    9 LinkedBlockingDeque

    一个由链表结构组成的双向阻塞队列。

        public LinkedBlockingDeque() {
            this(Integer.MAX_VALUE);
        }
    
        public LinkedBlockingDeque(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
        }
    
        public LinkedBlockingDeque(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock lock = this.lock;
            lock.lock(); // 获取锁,保证可见性
            try {
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (!linkLast(new Node<E>(e)))
                        throw new IllegalStateException("Deque full");
                }
            } finally {
                lock.unlock();
            }
        }
    

    ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。

    LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。

    SynchronousQueue 本身不带有空间来存储任何元素,使用上可以选择公平模式和非公平模式。

    PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。

    相关文章

      网友评论

          本文标题:七大阻塞队列的使用

          本文链接:https://www.haomeiwen.com/subject/usltqctx.html