Java源码分析-ArrayBlockingQueue

作者: gatsby_dhn | 来源:发表于2016-10-05 21:44 被阅读376次

ArrayBlockingQueue是JDK1.5开始concurrent包中提供的并发工具类,是一个基于数组的有界的先进先出队列,如果加入元素时,数组已满,或数组为空时,取元素都会阻塞当前线程,是一个典型的生产者消费者模式。类似的类还有LinkedBlockingQueue,PriorityBlockingQueue。

继承关系

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

public interface BlockingQueue<E> extends Queue<E>

核心成员变量

    final Object[] items;    //保存元素

    /** items index for next take, poll, peek or remove */
    int takeIndex;    //指向队头

    /** items index for next put, offer, or add */
    int putIndex;     //指向队尾

    /** Number of elements in the queue */
    int count;        //队中元素计数

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;          //可重入锁

    /** Condition for waiting takes */
    private final Condition notEmpty;     //条件变量

    /** Condition for waiting puts */
    private final Condition notFull;        //条件变量

构造方法

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];      //初始化指定大小的有界队列
        lock = new ReentrantLock(fair);  //fair指示该锁是否是公平的
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

添加元素:put方法

   public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();     //添加元素时,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒
            enqueue(e);    //此时队列肯定未满
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;              //将元素加入队尾
        if (++putIndex == items.length) //将putIndex加1,如果到达数组尾部,则从0开始
            putIndex = 0;
        count++;                //增加计数
        notEmpty.signal();    //增加一个元素后,如果有线程等待在noEmpty条件变量上,通知并唤醒一种一个线程。
    }

put方法首先判断队列是否已满,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒。当队列非空时,将该元素加入队尾,增加元素计数,唤醒因为进行去操作时数组为空而等待在notEmpty上的线程,通知它此时队列已经有元素了,你可以进行取操作了。

取元素:take方法

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //取元素但是队列为空时,阻塞线程,等待在notEmpty上
            return dequeue();   //此时,队列肯定非空,进行出队操作
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];         //获取队头元素
        items[takeIndex] = null;             
        if (++takeIndex == items.length)
            takeIndex = 0;                    
        count--;                                    //计数减1
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();     //通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了
        return x;   //返回队头元素
    }

take方法和put方法类似,先检查队列是否为空,队列为空时,阻塞线程,等待在notEmpty上。当队列非空时,进行出队操作。同时还要通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了。

支持原创,转载请注明出处。
github:https://github.com/gatsbydhn

相关文章

网友评论

    本文标题:Java源码分析-ArrayBlockingQueue

    本文链接:https://www.haomeiwen.com/subject/yjfeyttx.html