ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。下面是主要结构。
大概思想就是使用items数组存放元素,使用ReentrantLock和Condition实现线程安全。
每次入队的时候都会通过putIndex找到新元素应该存放的下标,如果putIndex到头了,那么再从0开始,循环使用。入队完成后会通知取元素的线程拿元素。
每次出队的时候都会通过takeIndex找到这次要取的元素的下标,如果takeIndex到头了,那么也从0开始,循环使用。出队完成后会通知存放元素的线程继续存元素。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
// 队列的数组
final Object[] items;
// 调用take, poll, peek or remove方法,元素的位置
int takeIndex;
// 调用 put, offer, or add方法,元素的位置
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
// 等待拿元素线程的等待条件队列
private final Condition notEmpty;
// 等待存元素线程的等待条件队列
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
入队API:
add,如果队列满了,则会抛出IllegalStateException
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列满了,返回false
if (count == items.length)
return false;
else {
// 否则入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// ArrayBlockingQueue存放元素的数组
final Object[] items = this.items;
// 把新元素放到该放的index
items[putIndex] = x;
// 如果加完这个元素队列满了,那么把putIndex置为0
if (++putIndex == items.length)
putIndex = 0;
// 元素计数
count++;
// 把等待拿元素的线程,从AQS的等待条件队列拿到同步队列,也就是告诉消费线程,可以消费了
notEmpty.signal();
}
offer,如果队列满了,则返回false,入队成功返回true,不会抛IllegalStateException
源码同上
put,将指定的元素插入此队列的尾部,如果队列已满,则等待空间。
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 加一个可中断的锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列满了,则等待,加入notFull等待条件队列。
while (count == items.length)
notFull.await();
// 直到有空间了,入队
enqueue(e);
} finally {
lock.unlock();
}
}
出队API:
poll,如果队列为空,直接返回null,不等待。并且删除元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 在可以拿元素的位置takeIndex,获取元素
E x = (E) items[takeIndex];
// 把takeIndex位置的元素置为null
items[takeIndex] = null;
// takeIndex到头了,那么再从0开始
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 通知存放元素的线程,可以存放元素了
notFull.signal();
return x;
}
take,如果队列为空,则加入到条件等待队列。并且删除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
peek,如果队列为空,返回null。不删除元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
网友评论