深入理解ArrayBlockingQueue

作者: 一只小哈 | 来源:发表于2016-07-31 12:30 被阅读2187次

因为后面要学习线程池,所以在在这里先分析下ArrayBlockingQueue,为以后做准备。

什么是ArrayBlockingQueue?#####

首先通过名字我们可以理解到这是一个用Array实现的,操作是会产生阻塞的队列。其实大概意思差不多。
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。

Queue家族的成员:#####
Paste_Image.png

上面这些是比较全的Queue体系,但是我们常用的其实可能不多,主要有:
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue, (基于数组的并发阻塞队列)
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue, (带优先级的无界阻塞队列)
9.SynchronousQueue (并发同步阻塞队列)
这里主要看一下ArrayBlockingQueue的实现。

初始化:#####
/**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

这里可以看出来,ArrayBlockingQueue内部是通过一个Object数组和一个ReentrantLock实现的。同时ReentrantLock在使用时也提供了公平和非公平两种。因为数组是有界的,所以在数组为空和数组已满两种情况下需要阻塞线程,所以使用了Condition来实现线程的阻塞。

添加元素的方法:#####
 /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

首先offer方法是加锁的,在当前的Array的count等于数组的容量时,也就是数组满的时候返回false,如果没满,那么插入数据到Array中,最后解锁返回true。
之后是add:

    public boolean add(E e) {
        return super.add(e);
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

add方法其实也是使用了offer方法,但是不同的是,如果数组是满的那么add会抛出IllegalStateException异常,add成功后会返回true。
最后一个添加元素的方法是put:

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

   /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

在put的实现里用到了Condition—notFull,在put的时候,如果数组已经满了,那么添加元素是不成功的(offer的实现),但是此时如果希望能等待数组有空间添加元素,那么可以使用put,如果数据已满,那么在notFull上等待。如果有数组的元素移除的操作就会唤醒这个put,让元素能添加到数组中。同时在调用insert方法是会调用notEmpty.signal() 唤醒在notEmpty上等待的线程。最后解锁返回。

移除元素方法:#####

首先看poll方法:

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

poll方法是从数组中弹出一个元素,但是如果当前的数组内没有元素则直接返回null,如果有元素,那么返回指定的索引位置的元素,并删除原来的元素。同时在弹出元素后唤醒等待在notFull上的线程。
之后看下take方法:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

如果我们希望在获取元素的时候,如果没有元素,我们希望线程阻塞指导有元素可取。那么这个实现就是take方法了。在take时,如果当前的数组内没有元素的话,那么线程等待在notEmpty上,直到insert是唤醒在notEmpty上等待的线程。
最后看一下peek方法:

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }

peek方法是比较简单的,只是在数组存在元素的时候,获取指定的索引的元素,但是不会移除这个元素。

这里的takeIndex是指定的数组索引,代码如下:

    /**
     * Circularly increment i.
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

从上面的代码可以看出来,数组内的元素在添加元素的时候并没有进行元素移动的,移动的是takeIndex,如果takeIndex==items.length,那么回到0.
而putInde的移动策略和takeIndex是一致的。所以整个的添加和移除操作是一个环状的操作过程:
以put和take为例:
在put的时候利用putIndex为索引进行元素添加,每一次put都会检测count和数组容量,如果count小于数组容量,证明可以添加元素,而添加以putindex为准,如果putindex增长到数组容量,putindex回滚到0.此时添加元素又从头开始。而takeIndex也是一样,在takeIndex等于数组容量的时候,会回滚到0,从头开始take。可以说ArrayBlockingQueue利用数组的index的有序性和ReentrantLock实现的。保证了ArrayBlockingQueue线程安全性,但是ArrayBlockingQueue是有局限性的,容量需要在初始化的时候指定,所以必须在初始化的时候就考虑好容量,否则会对使用的性能有很大的影响。

相关文章

网友评论

    本文标题:深入理解ArrayBlockingQueue

    本文链接:https://www.haomeiwen.com/subject/biitsttx.html