ArrayBlockingQueue
1、基于数组实现的有界阻塞队列(FIFO先进先出队列)
2、其数据结构为数组,是一个环形数组结构。
3、其实现了生产-消费队列功能
4、基于非公平锁的锁
一、对象属性
final Object[] items;// 存储数据载体,环形数组(首尾相连)
int takeIndex;// 下一个出队列元素的数组下标
int putIndex;// 下一个入队列元素的数组下标
int count;// 统计数组中的元素个数
final ReentrantLock lock;// 锁,保证队列数据安全线(基于AQS、CAS)
private final Condition notFull;// 条件锁
private final Condition notEmpty;// 条件锁
二、构造器
// 有界队列的"有界",即队列最大容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 默认非公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 默认非公平锁
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
...
// (重要) 构成了环形数组(元素等于数组长度的时候,从头开始),记录下一次入队列的下标
putIndex = (i == capacity) ? 0 : i;
...
}
三、核心方法
// 入队列
private void enqueue(E x) {
final Object[] items = this.items;
// putIndex为下一个入队列元素的数组下标
items[putIndex] = x;
// 生成下一个入队列元素的下标。队列如果满了,下标从0开始,构成了环形数组
if (++putIndex == items.length)
putIndex = 0;
count++;// 统计数组元素数量
// 队列增加数据,唤醒因队列为空而阻塞的线程
notEmpty.signal();
}
// 出队列
private E dequeue() {
final Object[] items = this.items;
// takeIndex为下一个出队列元素的数组下标
E x = (E) items[takeIndex];
items[takeIndex] = null;// help GC
// 生成下一个出队列元素的下标,队列满了,下标从0开始,构成了环形数组
if (++takeIndex == items.length)
takeIndex = 0;
count--;// 统计数组元素数量
if (itrs != null)
itrs.elementDequeued();
// 队列减少数据,唤醒因队列满了而阻塞的线程
notFull.signal();
return x;
}
// 队列中删除数据
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 移除数据的下标刚好等于出队列的下标,按照出队列方式处理
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
// 移动removeIndex后的数据位置,保持队列的顺序
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;// 最后的数据置为null
this.putIndex = i;// putIndex - 1
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 唤醒线程
notFull.signal();
}
四、普通方法(都是基于核心方法之上的操作)
// 入队列
public boolean add(E e) {
return super.add(e);
}
// 抽象父类的方法,调用offer方法
public boolean add(E e) {
if (offer(e))
return true;
else// 队列满了会抛异常
throw new IllegalStateException("Queue full");
}
// 入队列,队列满了返回false
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);// 核心入队列方法,外层判断队列是否满了
return true;
}
} finally {
lock.unlock();
}
}
// 入队列
public void put(E e) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 线程中途停止会抛异常
try {// 队列满了,会阻塞,等待队列消费数据后,会唤醒noFull
while (count == items.length)
notFull.await();
enqueue(e);// 核心入队列方法,外层判断队列是否满了
} finally {
lock.unlock();
}
}
// 出队列
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();// 出队列核心方法
} finally {
lock.unlock();
}
}
// 出队列
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列为空,会进行阻塞,等待队列加入数据,会唤醒notEmpty
while (count == 0)
notEmpty.await();
return dequeue();// 出队列核心方法
} finally {
lock.unlock();
}
}
网友评论