一、概述
ArrayBlockingQueue是一个有界的阻塞队列,底层通过数组来实现,会按照常规的先进先出(FIFO)的原则来操作队列,元素从队尾入队,从队头出队。同样,如果在获取队列元素的时候队列为空,则会阻塞;在往队列添加元素的时候,如果队列已满,则会阻塞;
- 该队列不是寻常意义上的队列,而是一个循环队列;
- 队列的容量固定,一旦创建,队列的容量就无法修改;
- 该队列支持可选的访问策略,默认是非公平的访问策略;
接下来我们来看一下该类的实现源码。
二、源码
1. 继承结构及构造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
可以看到,继承结构都比较常规,继承自AbstractQueue,实现了BlockingQueue,并且实现了序列化接口。
然后来看一下它的构造方法:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
先来看一下它前两个构造方法。因为是有界队列,所以要制定队列的容量大小;并且由于是通过ReentrantLock来实现锁,而ReentrantLock有公平锁和非公平锁之分,因此要制定对应的访问策略,默认是非公平锁;对于公平锁而言,先阻塞的会先获取到锁;而对于非公平锁,则是进行抢占式获取。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 构造方法
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
// 遍历集合
for (E e : c) {
// 校验集合中元素不能是null
checkNotNull(e);
// 往数组中添加元素
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
// 如果初始化容量小于传入集合容量,异常
throw new IllegalArgumentException();
}
// 设置队列数量
count = i;
// 初始化入队的索引
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
最后一个构造方法是通过一个给定的集合来创建队列,这里会把集合中的元素挨个添加到队列中,同时初始化的容量不能小于所给定集合的容量。
2. 属性
接下来,我们来看一下该类的一些属性:
/** 保存元素的数组 */
final Object[] items;
/** 获取元素(出队)的索引 */
int takeIndex;
/** 添加元素(入队)的索引 */
int putIndex;
/** 队列中元素的数量 */
int count;
/** 可重入锁 */
final ReentrantLock lock;
/** 获取元素(出队)的Condition条件 */
private final Condition notEmpty;
/** 添加元素(入队)的Condition条件 */
private final Condition notFull;
/**
* 队列的迭代器
*/
transient Itrs itrs = null;
可以看到,ArrayBlockingQueue定义了一个可重入锁,并且定义了两个Condition条件,分别用于出队和入队的时候进行阻塞。
3. 方法
3.1 add方法
add方法表示入队,将数据插入到队尾,入队成功返回true;如果队列已满,抛出异常。最终还会间接通过offer方法来实现:
public boolean add(E e) {
return super.add(e);
}
// 继承类AbstractQueue
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
3.2 offer方法
而对offer方法而言,如果队列已满,返回false,不会抛出异常:
public boolean offer(E e) {
// 非空校验
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 判断队列是否已满
if (count == items.length)
return false;
else {
// 没满,入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
可以看到,ArrayBlockingQueue不允许存储null元素,这里会调用enqueue方法进行入队操作:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 把元素放入到 数组的入队索引处
items[putIndex] = x;
// 元素放进去后,如果入队索引等于队列长度,表示已满
// 重置入队索引为0
if (++putIndex == items.length)
putIndex = 0;
// 数组元素加1
count++;
// 唤醒条件notEmpty上等待的线程
notEmpty.signal();
}
可以看到该方法用于将元素保存到数组对应的入队索引处,并且唤醒条件notEmpty对应的线程,提醒线程该队列已经不为空了(not empty)。
这里重置入队索引为0,表示队列其实是一种循环队列,也就是环形队列,队尾不一定要是物理上的队列末尾,而是逻辑上的队尾,通过这种环形队列的用法,可以减少不必要的元素拷贝(元素出队以后,不用把元素整体往前移动)。
3.3 put方法
put方法是一个阻塞方法,在入队的时候,如果队列已满,会一直阻塞,直到队列可用,并且该方法在当前线程被中断时会抛出异常。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 可中断线程
lock.lockInterruptibly();
try {
while (count == items.length)
// 阻塞notFull条件对应的线程
notFull.await();
// 入队方法
enqueue(e);
} finally {
lock.unlock();
}
}
3.4 offer(E, long, TimeUnit)方法
支持超时的offer方法,表示入队的时候,如果队列已满,则等待指定的超时时间,如果超时时间结束,队列仍然已满,则返回false;
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
// 获取超时时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,无限循环
while (count == items.length) {
// 如果超时(超时时间小于等于0),返回false
if (nanos <= 0)
return false;
// 没有超时,等待
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
3.5 poll方法
出队方法poll比较简单,表示获取并移除队头元素,如果队列为空,则返回null:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队头元素
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
这里,如果队列不为空的话,会调用出队方法dequeue方法,该方法和入队方法enqueue恰好相反:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取出队元素索引
E x = (E) items[takeIndex];
// 原位置设置为空
items[takeIndex] = null;
// 如果队列为空(如果新的出队索引==数组长度),重置出队索引
if (++takeIndex == items.length)
takeIndex = 0;
// 数组容量减1
count--;
// 迭代器维护
if (itrs != null)
itrs.elementDequeued();
// 唤醒notFull条件上的线程
notFull.signal();
return x;
}
首先获取出队元素索引处的值,然后判断队列是否为空,如果队列为空,重置出队索引为0,然后队列容量减1,出队后,唤醒notFull条件上的线程,提示线程表示队列已经not full了。
这里通过
++takeIndex == items.length
来判断队列是否为空,和入队中的++putIndex == items.length
来判断队列是否已满,可以看出ArrayBlockingQueue其实是一种环形队列。
3.6 take方法
出队方法take是一个阻塞方法,用于获取并移除队列中的队头元素,如果队列为空,则会一直阻塞,直到队列中元素可用;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,阻塞notEmpty条件对应的线程
while (count == 0)
notEmpty.await();
// 否则,出队
return dequeue();
} finally {
lock.unlock();
}
}
3.7 poll(long, TimeUnit)
支持超时的poll方法,表示获取并移除对头的元素,如果队列为空,则等待指定的超时时间,如果超时时间结束,队列仍然为空,返回false;
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 获取超时时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空
while (count == 0) {
// 超时,返回null
if (nanos <= 0)
return null;
// 没有超时,等待
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
3.8 peek方法
peek方法表示获取队列的对头元素,但不是个出队方法,也就是不移除元素:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列为空,返回null
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
从源码可以看出,peek方法直接获取数组出队索引处对应的元素。
3.9 remove(Object)方法
该方法表示从队列中删除指定的元素:
public boolean remove(Object o) {
// 如果元素是null,直接返回
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中有元素
if (count > 0) {
// 入队索引
final int putIndex = this.putIndex;
// 出队索引
int i = takeIndex;
// 循环判断,从队头直到队尾
do {
// 如果出队索引处(这里的出队索引不是原来的takeIndex,而是自定义的i,一直自增)元素
// 与要移除的元素相等,进行移除操作
if (o.equals(items[i])) {
// 移除元素
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
这里会调用removeAt方法来删除对应索引处的元素:
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
// 如果要移除元素就是出队索引处元素,按照一般出队方法移除即可
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
// 循环中,会从要移除的下标处,向后递增,把数据依次前移,直到队尾
int next = i + 1;
if (next == items.length)
next = 0;
// 判断是否到了队尾的入队索引处
if (next != putIndex) {
// 往前移动元素
items[i] = items[next];
i = next;
} else {
// 到了队尾索引处,表示元素移动完成,再重新设置下对尾索引
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 最后唤醒notFull对应线程
notFull.signal();
}
这里进行移除操作的时候,会先从要移除的下标处开始,后面的元素依次前移,最后直到队尾的入队索引,然后将队尾的入队索引前的最后一个元素设置为null,这样就移动完成,最后的时候唤醒notFull条件对应的线程,告诉该线程,队列已经not full了。
需要注意的是,这种删除元素的方式,和数组中删除非两端的元素一样,会移动数组中许多元素,从本质上来讲是一种缓慢且有破坏性的操作,因此官方建议我们只有在特殊情况下才进行这个操作。理想情况下,只有当已知队列不被其他线程访问时才应该这样做。
3.10 contains方法
contains方法用来判断队列中是否包含某元素,步骤和remove操作差不多,这里就不多说了。
public boolean contains(Object o) {
// 非空判断
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中有元素
if (count > 0) {
// 入队索引
final int putIndex = this.putIndex;
// 出队索引
int i = takeIndex;
do {
// 循环判断,从队头到队尾
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
3.11 clear方法
clear方法表示清空队列中所有的元素,
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取队列长度,如果队列中有元素
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 从队头开始遍历,直到队尾
do {
// 将对应索引处元素设置为null
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
// 迭代器处理
if (itrs != null)
itrs.queueIsEmpty();
// 唤醒所有等待notFull条件的线程
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
该方法会从队列出队索引开始遍历,直到入队索引(也就是从队头到队尾),然后将对应索引处的值都设置为null,并且最后唤醒所有等待notFull条件处理的线程。
4. 代码示例
接下来简单看一个简单的生产者与消费者例子,例子参考自官方API文档及本文底部的链接。首先来看下生产者代码:
/**
* 生产者
*/
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
queue.put(produce(i));
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public int produce(int i) {
System.out.println("=== put: " + i + ", thread: " + Thread.currentThread().getName());
return i;
}
}
然后再看下消费者代码:
/**
* 消费者
*/
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(500);
consume(queue.take());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public void consume(int x) {
System.out.println("--- take: " + x + ", thread: " + Thread.currentThread().getName());
}
}
最后,进行简单测试:
private static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(blockingQueue);
Consumer consumer1 = new Consumer(blockingQueue);
Consumer consumer2 = new Consumer(blockingQueue);
executorService.submit(producer);
executorService.submit(consumer1);
executorService.submit(consumer2);
}
目的很简单,就是为了测试阻塞队列,先打印的肯定是put线程的语句,然后才会打印take线程的语句,来看下运行结果:
=== put: 0, thread: pool-1-thread-1
=== put: 1, thread: pool-1-thread-1
--- take: 0, thread: pool-1-thread-2
--- take: 1, thread: pool-1-thread-3
=== put: 2, thread: pool-1-thread-1
--- take: 2, thread: pool-1-thread-2
=== put: 3, thread: pool-1-thread-1
--- take: 3, thread: pool-1-thread-3
=== put: 4, thread: pool-1-thread-1
--- take: 4, thread: pool-1-thread-2
=== put: 5, thread: pool-1-thread-1
--- take: 5, thread: pool-1-thread-3
=== put: 6, thread: pool-1-thread-1
--- take: 6, thread: pool-1-thread-2
=== put: 7, thread: pool-1-thread-1
--- take: 7, thread: pool-1-thread-3
=== put: 8, thread: pool-1-thread-1
--- take: 8, thread: pool-1-thread-2
=== put: 9, thread: pool-1-thread-1
--- take: 9, thread: pool-1-thread-3
四、总结
到这,ArrayBlockingQueue中大部分的方法源码都学习过了,剩余一些源码,等用到的时候再来了解下,接下来来简单总结下ArrayBlockingQueue的一些特性:
- ArrayBlockingQueue底层通过数组来实现,容量一旦确定,无法修改;
- ArrayBlockingQueue是通过ReentrantLock和Condition条件来保证多线程的安全访问;
- ArrayBlockingQueue中的ReentrantLock锁有公平和非公平策略,默认是非公平的;
- ArrayBlockingQueue不是一般的常规队列,而是一种环形队列。
本文参考自:
《Java并发编程实战》
Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析 - csdn.net
【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
网友评论