美文网首页
阻塞队列--ArrayBlockingQueue

阻塞队列--ArrayBlockingQueue

作者: 毛发旺盛 | 来源:发表于2020-08-24 19:24 被阅读0次

    什么是阻塞队列----阻塞队列概述

    ArrayBlockingQueue是一个用数组实现的有界阻塞队列,按先进先出的原则对元素进行排序。put和take方法分别为添加和删除的阻塞方法。默认情况下不保证线程公平。

    ArrayBlockingQueue整体结构

    首先了解一下ArrayBlockingQueue的成员变量:
    1)ArrayBlockingQueue内部是用items数组来存储元素的;
    2)takeIndex标记下一个take、poll、peek、remove方法被调用时获取数组元素的索引。putIndex标记下一个put、offer、add方法被调用时添加到数组中的索引。
    3)ReentrantLock对象用来控制添加线程和移除线程的并发访问;
    4)两个条件对象:notEmpty条件对象是用于存放等待或唤醒调用take方法的线程,通知他们队列中已有元素,可以执行获取操作。notFull条件对象则用于等待或唤醒调用put方法的线程,通知他们队列中未满,可以执行添加元素的操作。
    5)Itrs变量是迭代器Itr和队列的共享状态,是队列同迭代器Itr中数据及状态的同步的桥梁。

        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    
        /**
         * Shared state for currently active iterators, or null if there
         * are known not to be any.  Allows queue operations to update
         * iterator state.
         */
        transient Itrs itrs;
    

    ArrayBlockingQueue内部的阻塞队列是通过一个重入锁ReentrantLock和两个Condition条件队列实现的,所以ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别。

    非阻塞插入元素

    add和offer方法的实现比较简单,只是在队列满时插入失败抛异常或返回false,队列非满时,则将待插入元素保存在items[putIndex],同时更新putIndex值并发出队列非空的信号即完成了元素的插入操作。
    更新putIndex值时,如果当前putIndex==数组长度时,则将其置为0,这是由于元素插入总在队尾,将其当成一个环形队列就很好理解了。

        public boolean add(E e) {
            if (offer(e))
                return true;
            else
                throw new IllegalStateException("Queue full");
        }
    
        public boolean offer(E e) {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //当队列满了,则返回false
                if (count == items.length)
                    return false;
                else {
                    //队列非满,可执行元素插入
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void enqueue(E x) {
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();
        }
    

    阻塞插入元素

    有时限阻塞和无时限阻塞的执行逻辑基本相同,我们这里就只看无时限阻塞。
    如果当前队列已满,则条件对象notFull调用await方法,将当前线程挂起加入到等待队列中,直到有线程移除了元素,调用了notFull.signal,才会将等待队列中的线程唤醒,继续执行添加操作。
    如果当前队列非满,则直接调用enqueue完成元素的插入。

        public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    移除元素

    poll和take的执行逻辑同前面元素插入部分的逻辑相同。poll在队列为空时则直接返回null,take则会在队列为空时,调用notEmpty.await,使得当前线程挂起进入等待队列。在队列非空时,则调用dequeue方法移除元素。
    dequeue方法也比较简单,取出takeIndex位置的元素后将该位置置空,同时更新takeIndex的值。调用notFull.signal唤醒在等待队列的因调用put方法进入阻塞状态的线程。

        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return (count == 0) ? null : dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
        private E dequeue() {
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            //更新迭代器中的元素数据
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    

    remove(Object o)方法用于删除队列中的指定元素,删除逻辑相对复杂一点。
    该方法同样需要在重入锁的保护下执行。从takeIndex开始环形遍历,查找到要删除的元素的位置,再调用removeAt方法。
    removeAt方法中,如果待删除的元素不是队列头元素,则执行循环操作,从待删除的元素之后的元素都往前移一个位置,后一个元素覆盖了待删除的元素,也就完成了删除操作,同时更新putIndex的值。

        public boolean remove(Object o) {
            if (o == null) return false;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count > 0) {
                    final Object[] items = this.items;
                    final int putIndex = this.putIndex;
                    int i = takeIndex;
                    do {
                        //环形遍历队列,查找待删除元素的位置
                        if (o.equals(items[i])) {
                            removeAt(i);
                            return true;
                        }
                        if (++i == items.length) i = 0;
                    } while (i != putIndex);
                }
                return false;
            } finally {
                lock.unlock();
            }
        }
        void removeAt(final int removeIndex) {
            final Object[] items = this.items;
            //如果待删除元素位置恰好是下一个删除元素位置,则按直接删除并更新takeIndex
            if (removeIndex == takeIndex) {
                // removing front item; just advance
                items[takeIndex] = null;
                if (++takeIndex == items.length) takeIndex = 0;
                count--;
                if (itrs != null)
                    itrs.elementDequeued();
            } else {
                //
                for (int i = removeIndex, putIndex = this.putIndex;;) {
                    int pred = i;
                    if (++i == items.length) i = 0;
                    if (i == putIndex) {
                        items[pred] = null;
                        this.putIndex = pred;
                        break;
                    }
                    items[pred] = items[i];
                }
                count--;
                if (itrs != null)
                    itrs.removedAt(removeIndex);
            }
            notFull.signal();
        }
    

    小结

    ArrayBlockingQueue内部以一个数组对象来存储元素,同时使用一个重入锁ReentrantLock来实现多个线程之间插入和删除元素的同步;两个条件对象用于实现阻塞逻辑,通过调用其await和signal方法实现等待和唤醒。
    元素的插入删除操作同普通的数组操作相同,这里也就不赘述了。

    相关文章

      网友评论

          本文标题:阻塞队列--ArrayBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/ankujktx.html