基本概念
ArrayBlockingQueue 具有以下特点:数组实现、有界阻塞队列、元素按照先进先出(FIFO)原则排序。
该队列实现的关键在其内部有两个位置标识,putIndex 和 takeIndex,分别表示元素的入/出队位置。
多线程同时操作该队列时,通过可重入锁+锁条件来控制,竞争包括公平竞争和非公平竞争。
实例说明
1.代码
// 初始化
ArrayBlockingQueue queue = new ArrayBlockingQueue(8)
// 入队
queue.add(0)
queue.add(1)
queue.add(2)
// 出队
queue.remove()
queue.remove()
// 再次入队
queue.add(3)
queue.add(4)
queue.add(5)
queue.add(6)
queue.add(7)
2.步骤
- 新建一个容量为 8 的 ArrayBlockingQueue,初始队列元素为 0,此时 putIndex=takeIndex=0
-
往队列放置 3 个元素,此时 putIndex=3,takeIndex=0
入队
- 从队列中取出 2 个元素,此时 putIndex=3,takeIndex=2
- 再往队列中存放 5 个元素,此时 putIndex =0, takeIndex=2 (标识到达队列末尾时自动置 0)
源码分析
1.签名
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E>
2.参数
// 队列元素,通过数组表示
final Object[] items;
// 可重入锁,可以是公平锁或非公平锁
final ReentrantLock lock;
// 锁条件
private final Condition notEmpty;
private final Condition notFull;
// 队列元素数量
int count;
// 标识
int putIndex;
int takeIndex;
3.构造函数
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0){
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
this(capacity, fair);
// 需要加锁
final ReentrantLock lock = this.lock;
lock.lock();
// 元素转移
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
// 指定元素入队位置,到达队尾重新置 0
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
4.入队操作
// 1.add,失败抛出异常
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e)){
return true;
}else{
throw new IllegalStateException("Queue full");
}
}
// 2.offer,失败返回 false
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length){
return false;
}else {
// 关键 -> 元素入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 3.put,阻塞等待
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 尝试获取锁,竞争锁失败则阻塞等待
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length){
// 满队列进入 notFull 条件队列等待
notFull.await();
}
enqueue(e);
} finally {
lock.unlock();
}
}
// enqueue
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 到达队尾重新置 0
if (++putIndex == items.length){
putIndex = 0;
}
count++;
// 唤醒 notEmpty 条件队列中等待的线程
notEmpty.signal();
}
5.出队操作
// 1.remove,失败抛出异常
public E remove() {
E x = poll();
if (x != null){
return x;
}else{
throw new NoSuchElementException();
}
}
// 2.poll,失败返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 关键->出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 3.take,等待阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0){
notEmpty.await();
}
return dequeue();
} finally {
lock.unlock();
}
}
// 出队操作,dequeue
private E dequeue() {
final Object[] items = this.items;
// 出队操作,置 null
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 到达队尾重新置 0
if (++takeIndex == items.length){
takeIndex = 0;
}
count--;
// 存在遍历,同样执行出队
if (itrs != null){
itrs.elementDequeued();
}
notFull.signal();
return x;
}
5.查询操作
// 1.element,失败抛出异常
public E element() {
E x = peek();
if (x != null){
return x;
}else{
throw new NoSuchElementException();
}
}
// 2.peek,失败则返回 null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
6.转移操作
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
// 入参校验
checkNotNull(c);
if (c == this){
throw new IllegalArgumentException();
}
if (maxElements <= 0){
return 0;
}
// 加锁
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 确定转移的元素数量
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
// 元素出队,装进集合 c
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length){
take = 0;
}
i++;
}
return n;
} finally {
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0){
itrs.queueIsEmpty();
}else if (i > take){
itrs.takeIndexWrapped();
}
}
for (; i > 0 && lock.hasWaiters(notFull); i--){
notFull.signal();
}
}
}
} finally {
lock.unlock();
}
}
网友评论