ArrayBlockingQueue
类实现了BlockingQueue
接口。阅读BlockingQueue
文本以获取有关的更多信息。
ArrayBlockingQueue
是一个有界的阻塞队列,它将元素存储在数组内部。有界意味着它无法存储无限量的元素,它可以同时存储的元素数量有一个上限。你需要在实例化时设置上限,之后无法更改,所以它和ArrayList
有些区别,不要因为它们的名称相似而将它们的功能混杂。
ArrayBlockingQueue
内部是以FIFO(先入先出)次序来存储元素的。队列的头部是在队列中存活时间最长的元素,而队列的尾部是在队列中存活时间最短的元素。
以下是实例化和使用ArrayBlockingQueue
的例子:
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();
这是一个使用Java 泛型的BlockingQueue
例子:
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
queue.put("1");
String string = queue.take();
源码
ArrayBlockingQueue
中使用了这几个成员变量来保证操作,其实内部使用了一个循环数组,其中takeIndex和putIndex其实相当于队列的头部和尾部。
/** 使用数组保存元素 */
final Object[] items;
/** 下一个take,poll,peek或remove方法调用时访问此下标的元素 */
int takeIndex;
/** 下一个put, offer, 或add方法调用时访问此下标的元素 */
int putIndex;
/**队列中的元素数量 */
int count;
/** 保护所有操作的主锁 */
final ReentrantLock lock;
/** 获取元素的等待条件 */
private final Condition notEmpty;
/** 放置元素的等待条件 */
private final Condition notFull;
构造函数如下:
/**
* 使用一个固定的数值和默认的访问规则创建,默认是使用非公平锁
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 使用一个固定的数值和指定的访问规则创建
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
*使用一个固定的数值和指定的访问规则创建,并将给定集合中的元素
* 增加到队列中,增加的顺序是指定的集合迭代器的遍历顺序
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
增加操作
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
// 内部重用offer方法
if (offer(e))
return true;
// 如果增加失败,抛出异常指示队列已满
else
throw new IllegalStateException("Queue full");
}
-------------------------------------------------------------------------
public boolean offer(E e) {
// 检查是否是否为null,如果是抛出NPE异常
Objects.requireNonNull(e);
// 加锁。 此处使用final的原因是将成员变量赋值为局部变量,
// 然后使用此变量就不需要经过两次访问,即先访问this,再
// 访问lock,轻微提升程序性能,后面此种方法的使用也是一样。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列满了,返回false
if (count == items.length)
return false;
// 否则,加入队列
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public static <T> T requireNonNull(T obj) {
if (obj == null)
throw new NullPointerException();
return obj;
}
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 插入元素
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
// 随机通知一个等待的线程
notEmpty.signal();
}
-------------------------------------------------------------------------
// 阻塞方法
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已经,在notFull上阻塞自己等待通知
// 关于等待-通知机制已经说过很多次,此处不再多说
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
-------------------------------------------------------------------------
// 超时方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
// 计算超时时间,转换为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,超时等待,如果时间用完,返回false
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
删除操作
// 删除指定元素
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中存在元素
if (count > 0) {
final Object[] items = this.items;
// 注意此处精彩的循环使用,因为内部是一个循环数组
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 如果删除的是头元素,只需修改头元素下标即可
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
// 此处是为了保持迭代器与队列的一致性
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
// 如果已经移到了最后一个元素,跳出循环
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
// 将元素前移一位
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
-------------------------------------------------------------------------
public E remove() {
// 重用poll方法,如果队列为空,抛出异常
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-------------------------------------------------------------------------
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
// 获取头元素,因为使用Object[]保存,所以要进行类型转换
// 因为只能增加指定类型的元素,所以可以确保类型转换一定
// 会成功,抑制此非受检警告
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
-------------------------------------------------------------------------
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
-------------------------------------------------------------------------
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
阻塞方法以及超时方法和增加操作一样,此处不多做讲解。
访问操作
// element()方法在AbstractQueue<E>类中,ArrayBlockingQueue继承自此类
public E element() {
// 重用peek方法,如果队列为空抛出异常
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
-------------------------------------------------------------------------
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
辅助方法
部分方法逻辑简单,有兴趣自己查看即可。
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k;
// 如果队列中存在元素,清空队列
if ((k = count) > 0) {
circularClear(items, takeIndex, putIndex);
takeIndex = putIndex;
count = 0;
// 使迭代器保持一致
if (itrs != null)
itrs.queueIsEmpty();
// 如果有线程等待插入元素,唤醒
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
// 将存在的元素全部置为null即可,等待 gc回收它们,此时等于清空了队列。
private static void circularClear(Object[] items, int i, int end) {
// assert 0 <= i && i < items.length;
// assert 0 <= end && end < items.length;
for (int to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++) items[i] = null;
if (to == end) break;
}
}
-------------------------------------------------------------------------
public int drainTo(Collection<? super E> c) {
// 重用drainTo(Collection<? super E> c, int maxElements)方法
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
// 如果指定的集合是自己,抛出异常,符合BlockingQueue接口文档中的定义
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取需要转移的元素数量
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
// 通过直接访问数组,比重复调用poll()方法再增加性能会高很多
while (i < n) {
@SuppressWarnings("unchecked")
E e = (E) items[take];
c.add(e);
items[take] = null;
if (++take == items.length) take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
// 做一些处理工作
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
核心要点
- 内部使用了一个循环数组
- 是一个有界数组,提供了容量后无法被更改
- 可以指定锁的公平性
网友评论