介绍
ArrayBlockingQueue是采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。
例子
public class ArrayBlockingQueueTest {
private static BlockingQueue<Food> queue = new ArrayBlockingQueue<Food>(2);
class Producer implements Runnable {
@Override
public void run() {
Food food = new Food();
food.setName("banana");
try {
queue.put(food);
System.out.println(Thread.currentThread().getName() + "provider : " + food);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
try {
Food food = queue.take();
System.out.println(Thread.currentThread().getName() + "consumer : " + food);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0 ; i < 5 ; i++) {
new Thread(new ArrayBlockingQueueTest().new Producer()).start();
}
new Thread(new ArrayBlockingQueueTest().new Consumer()).start();
new Thread(new ArrayBlockingQueueTest().new Consumer()).start();
}
}
结果:
data:image/s3,"s3://crabby-images/2a004/2a004a46981d9a19dfff526593c1f1ff39cc56e9" alt=""
源码分析
继承与实现关系
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
类属性源码
/** 存放元素的数组 */
final Object[] items;
/** 取元素的下标索引 */
int takeIndex;
/** 存元素的下标索引 */
int putIndex;
/** 队列中元素的数量 */
int count;
/** 数据访问的重入锁 */
final ReentrantLock lock;
/** 取元素的等待队列 */
private final Condition notEmpty;
/** 存放元素的等待队列 */
private final Condition notFull;
构造器源码
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
解释:初始化指定容量的数组,初始化非公平的重入锁,初始化读等待队列,初始化写等待队列。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 初始化构造器
this(capacity, fair);
// 获取重入锁
final ReentrantLock lock = this.lock;
// 将当前线程锁定
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
// 遍历集合
for (E e : c) {
// 检查元素是否为空,为空抛出空指针异常
checkNotNull(e);
// 存放元素
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
// 初始化数组中元素的个数为计算后的i
count = i;
// 插入元素的下标索引,如果i的容量达到了指定的容量
// 插入元素的下标为0,否则为i
putIndex = (i == capacity) ? 0 : i;
} finally {
// 释放当前线程的锁
lock.unlock();
}
}
put方法源码
public void put(E e) throws InterruptedException {
// 检查值是否为空,为空抛出空指针异常
checkNotNull(e);
// 获取当前的重入锁
final ReentrantLock lock = this.lock;
// 如果当前的线程没有发生中断,那么就将当前线程锁定
lock.lockInterruptibly();
try {
//队列满了,一直阻塞在这里
while (count == items.length)
notFull.await();
// 插入元素到当前存放的位置
insert(e);
} finally {
// 操作完毕,释放当前线程的锁
lock.unlock();
}
}
// 插入元素到当前存放的位置
private void insert(E x) {
// 将当前值插入到数组中
items[putIndex] = x;
// 计算新的存放元素的下标
putIndex = inc(putIndex);
// 将存放元素的数组的数量加1
++count;
// 唤醒等待的读线程
notEmpty.signal();
}
take方法源码
public E take() throws InterruptedException {
// 获取重入锁
final ReentrantLock lock = this.lock;
// 将当前线程锁住
lock.lockInterruptibly();
try {
// 如果数组中的元素数量为0,那么取元素的线程阻塞在此处
while (count == 0)
// 将当前线程进行等待
notEmpty.await();
// 返回取出元素
return extract();
} finally {
// 释放当前线程的锁
lock.unlock();
}
}
// 取出元素
private E extract() {
// 获取存放元素的数组
final Object[] items = this.items;
// 获取当前值
E x = this.<E>cast(items[takeIndex]);
// 将当前数组中取出的元素的位置设置为空
items[takeIndex] = null;
// 计算新的取元素的下标
takeIndex = inc(takeIndex);
// 将数组存放元素的数量减1
--count;
// 唤醒等待队列中的一个等待线程
notFull.signal();
// 返回取出的值+
return x;
}
网友评论