美文网首页Java集合
阻塞队列ArrayBlockingQueue原理简析

阻塞队列ArrayBlockingQueue原理简析

作者: Mars_M | 来源:发表于2017-06-25 12:27 被阅读73次

ArrayBlockingQueue属性与构造方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;

    int takeIndex;

    int putIndex;

    int count;

    final ReentrantLock lock;

    private final Condition notEmpty;

    private final Condition notFull;
    ...
}

ArrayBlockingQueue内部是由Object[]数组实现的。

takeIndex为队列取出位置指针,putIndex为队列插入位置指针。

同步操作依赖于ReentrantLock实现,notEmpty为非空条件,notFull为非满条件。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

可以通过构造方法的参数capacity指定数组长度,参数fair指定ReentrantLock的公平还是非公平模式,即是否允许后来的操作插队与刚唤醒的线程竞争锁。

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

第三个参数 Collection<? extends E> c可以将现有集合插入ArrayBlockingQueue,如果集合的长度大于参数capacity,会抛出ArrayIndexOutOfBoundsException。

阻塞插入 put

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock; //锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

与LinkedBlockingQueue不同,ArrayBlockingQueue的插入与取出都是用同一个锁,所以无法在插入的同时取出元素,吞吐量弱于LinkedBlockingQueue。

当队列长度达到上限,触发notFull条件让当前线程挂起等待。当notFull条件满足,执行enqueue方法插入元素:

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

putIndex即元素插入队列的位置,当元素插入后 putIndex自增如果等于队列长度上限,表示putIndex指针已越界,将putIndex置于0,这样就能保证重复利用数组,而不是通过创建新数组来扩容。

然后将count自增1,表示队列元素增加一个,最后通过notEmpty条件唤醒挂起等待消费元素的线程。

阻塞取出 take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

取出操作使用与插入操作相同的ReentrantLock保证同步块的安全,当队列长度为0,触发notEmpty条件让线程挂起等待。当队列非空,执行dequeue方法取出元素:

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

取出元素的原理同插入是相同的,首先从takeIndex标记的位置取出元素,接着将takeIndex自增,如果takeIndex此时等于队列长度上限,就将其置0从头开始。

接着将count自减1,表示队列的元素减少一个。

itrs 变量表示队列的iterator,如果队列元素减少一个通过elementDequeued方法同步更新iterator遍历器保证线程安全。

最后通过notFull条件唤醒一个等待非满条件的线程执行插入。

非阻塞插入

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

与阻塞插入的区别是,当队列元素数量已满,直接返回false。

非阻塞取出

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

与阻塞取出的区别是,当队列没有元素,直接返回null。

不超时阻塞插入

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

与阻塞插入的区别是,等待notFull条件并非永久的,当过了timeout长度的时间后如果队列还是满的,直接返回false。

不超时阻塞取出

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

与阻塞取出的区别是,等待notEmpty条件并非永久的,当过了timeout长度的时间后如果队列还是空的,直接返回null。

相关文章

网友评论

    本文标题:阻塞队列ArrayBlockingQueue原理简析

    本文链接:https://www.haomeiwen.com/subject/ksolcxtx.html