ArrayBlockingQueue是JDK1.5开始concurrent包中提供的并发工具类,是一个基于数组的有界的先进先出队列,如果加入元素时,数组已满,或数组为空时,取元素都会阻塞当前线程,是一个典型的生产者消费者模式。类似的类还有LinkedBlockingQueue,PriorityBlockingQueue。
继承关系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
public interface BlockingQueue<E> extends Queue<E>
核心成员变量
final Object[] items; //保存元素
/** items index for next take, poll, peek or remove */
int takeIndex; //指向队头
/** items index for next put, offer, or add */
int putIndex; //指向队尾
/** Number of elements in the queue */
int count; //队中元素计数
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock; //可重入锁
/** Condition for waiting takes */
private final Condition notEmpty; //条件变量
/** Condition for waiting puts */
private final Condition notFull; //条件变量
构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //初始化指定大小的有界队列
lock = new ReentrantLock(fair); //fair指示该锁是否是公平的
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
添加元素:put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //添加元素时,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒
enqueue(e); //此时队列肯定未满
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; //将元素加入队尾
if (++putIndex == items.length) //将putIndex加1,如果到达数组尾部,则从0开始
putIndex = 0;
count++; //增加计数
notEmpty.signal(); //增加一个元素后,如果有线程等待在noEmpty条件变量上,通知并唤醒一种一个线程。
}
put方法首先判断队列是否已满,如果已满,则阻塞当前线程,等待在notFull这个条件变量上,等待notFull调用signal唤醒。当队列非空时,将该元素加入队尾,增加元素计数,唤醒因为进行去操作时数组为空而等待在notEmpty上的线程,通知它此时队列已经有元素了,你可以进行取操作了。
取元素:take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //取元素但是队列为空时,阻塞线程,等待在notEmpty上
return dequeue(); //此时,队列肯定非空,进行出队操作
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //获取队头元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--; //计数减1
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了
return x; //返回队头元素
}
take方法和put方法类似,先检查队列是否为空,队列为空时,阻塞线程,等待在notEmpty上。当队列非空时,进行出队操作。同时还要通知因为添加元素但是队列已满时而阻塞的线程,队列此时可插入了。
支持原创,转载请注明出处。
github:https://github.com/gatsbydhn
网友评论