Java源码分析-ArrayBlockingQueue

作者: gatsby_dhn | 来源:发表于2016-10-05 21:44 被阅读376次

    ArrayBlockingQueue是JDK1.5开始concurrent包中提供的并发工具类,是一个基于数组的有界的先进先出队列,如果加入元素时,数组已满,或数组为空时,取元素都会阻塞当前线程,是一个典型的生产者消费者模式。类似的类还有LinkedBlockingQueue,PriorityBlockingQueue。

    继承关系

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable
    
    public interface BlockingQueue<E> extends Queue<E>
    

    核心成员变量

        final Object[] items;    //保存元素
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;    //指向队头
    
        /** items index for next put, offer, or add */
        int putIndex;     //指向队尾
    
        /** Number of elements in the queue */
        int count;        //队中元素计数
    
        /*
         * Concurrency control uses the classic two-condition algorithm
         * found in any textbook.
         */
    
        /** Main lock guarding all access */
        final ReentrantLock lock;          //可重入锁
    
        /** Condition for waiting takes */
        private final Condition notEmpty;     //条件变量
    
        /** Condition for waiting puts */
        private final Condition notFull;        //条件变量
    

    构造方法

        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];      //初始化指定大小的有界队列
            lock = new ReentrantLock(fair);  //fair指示该锁是否是公平的
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    添加元素:put方法

       public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();     //添加元素时,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒
                enqueue(e);    //此时队列肯定未满
            } finally {
                lock.unlock();
            }
        }
    
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;              //将元素加入队尾
            if (++putIndex == items.length) //将putIndex加1,如果到达数组尾部,则从0开始
                putIndex = 0;
            count++;                //增加计数
            notEmpty.signal();    //增加一个元素后,如果有线程等待在noEmpty条件变量上,通知并唤醒一种一个线程。
        }
    

    put方法首先判断队列是否已满,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒。当队列非空时,将该元素加入队尾,增加元素计数,唤醒因为进行去操作时数组为空而等待在notEmpty上的线程,通知它此时队列已经有元素了,你可以进行取操作了。

    取元素:take方法

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await(); //取元素但是队列为空时,阻塞线程,等待在notEmpty上
                return dequeue();   //此时,队列肯定非空,进行出队操作
            } finally {
                lock.unlock();
            }
        }
    
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];         //获取队头元素
            items[takeIndex] = null;             
            if (++takeIndex == items.length)
                takeIndex = 0;                    
            count--;                                    //计数减1
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();     //通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了
            return x;   //返回队头元素
        }
    

    take方法和put方法类似,先检查队列是否为空,队列为空时,阻塞线程,等待在notEmpty上。当队列非空时,进行出队操作。同时还要通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了。

    支持原创,转载请注明出处。
    github:https://github.com/gatsbydhn

    相关文章

      网友评论

        本文标题:Java源码分析-ArrayBlockingQueue

        本文链接:https://www.haomeiwen.com/subject/yjfeyttx.html