这一篇文章开始,将重点学习 ArrayBlockingQueue 的源码,并在啃 ArrayBlockingQueue 源码中,学习API设计者的思想,对于咱们写代码的能力也会有帮助!
介绍:
ArrayBlockingQueue 从字面意思上不难看出,这是一个基于数组的阻塞队列。那么在学习其源码之前,先提出一些疑问:
1.ArrayBlockingQueue 有哪些特性?
2.数组索引是怎样得出的?
带着这些疑问往下看吧~
先从 ArrayBlockingQueue 的 Demo 示例作为看源码的入口。
public class ArrayBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 初始化 ArrayBlockingQueue 对象,必须指定数组的容量,公平锁策略默认是非公平锁
ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
for (int i = 1; i < 20; i++) {
// queue.add(i);// 不能继续插入元素时会抛异常
queue.offer(i);// 当不能继续插入元素时不会抛异常,而是返回false跳出offer方法的执行
}
// peek方法只会读取数组第一个元素
System.out.println(queue.peek());
System.out.println("执行peek()方法后,数组的长度为:" + queue.size());
// poll方法会读取并删除数组第一个元素
System.out.println(queue.poll());
System.out.println("执行poll()方法后,数组的长度为:" + queue.size());
// 往数组塞一个元素,当在不可执行的时候会阻塞线程,直到可以被其它线程唤醒
queue.put(100);
System.out.println("执行put()方法后,数组的长度为:" + queue.size());
// take方法会获取数组第一个元素,当在不可执行的时候会阻塞当前线程,直到可以被其它线程唤醒
System.out.println(queue.take());
System.out.println("执行take()方法后,数组的长度为:" + queue.size());
System.out.println("队列存在“5”这个元素:" + queue.contains(5));
System.out.println("队列存在“5”这个元素并删除:" + queue.remove(5));
System.out.println("队列存在“5”这个元素:" + queue.contains(5));
queue.clear();
System.out.println("执行clear()方法后,数组的长度为:" + queue.size());
}
}
运行结果:
1
执行peek()方法后,数组的长度为:10
1
执行poll()方法后,数组的长度为:9
执行put()方法后,数组的长度为:10
2
执行take()方法后,数组的长度为:9
队列存在“5”这个元素true
队列存在“5”这个元素并删除:true
队列存在“5”这个元素:false
执行clear()方法后,数组的长度为:0
示例中,在关键的方法加上了注释,如果对运行的结果感到不解,那就带着疑问看继续往下看,然后再回过头思考一下,我相信你一定会豁然开朗的!譬如源码中多次出现 takeIndex 和 putIndex 这两个变量,一开始你可能不清楚有什么作用,但你看完这篇源码剖析和总结,你就知道这两个变量是多么妙!不仅提高性能,还减少代码量!
源码剖析
在学习任何一个技术之前,先看下其官方介绍,这样会对这项技术有一个基本的了解。那么 ArrayBlockingQueue 的官方介绍在哪里呢?其实就是在 ArrayBlockingQueue 类的注释呀!(废话!hhhhh)
简单的翻译下 ArrayBlockingQueue 类的注释可以得出以下信息:
- 有界的阻塞数组,数组大小一旦创建,则不能再改变(数组不会动态扩容)
- 数组的读写顺序是先进先出(FIFO)
- 当队列满的情况下,put操作会被阻塞;当队列为空的时候,take操作会被阻塞
- 等待策略可以选择公平或非公平行为(公平行为:数组元素是顺序的新增;非公平行为:数组元素新增的顺序是随机的)
成员变量
// 使用数组保存的元素
final Object[] items;
// 下次读取元素的索引
int takeIndex;
// 下次添加元素的索引
int putIndex;
// 当前队列中元素的个数
int count;
// 可重入锁
final ReentrantLock lock;
// 等待获取元素的锁
private final Condition notEmpty;
// 等待添加元素的锁
private final Condition notFull;
// 迭代器状态
transient Itrs itrs = null;
ArrayBlockingQueue 构造方法
// 根据参数初始化数组容量,默认为非公平锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 根据参数初始化数组容量和设置是否是公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 使用一个 Collection 来作为队列的默认元素
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue 只有三个构造方法,但都要在创建对象的时候指定数组容量,还可以根据需求设置 ReentrantLock 是否为公平锁。【对 ReentrantLock 不熟悉的朋友,可以参考我相关的拙作:https://www.jianshu.com/p/842aa75f77b7】
返回数组元素个数:size()
// 返回数组元素个数
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
新增元素:add(E e)、offer(E e) 和 put(E e)
// 实际上是调用父类(AbstractQueue)的add方法,AbstractQueue.add(E e)实际上是调用子类offer插入元素
public boolean add(E e) {
return super.add(e);
}
// AbstractQueue 抽象类的add方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
// 添加元素,如果队列满的话不会阻塞,直接返回false
public boolean offer(E e) {
// 判断元素是否为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
// 如果数组已经满了,返回false
if (count == items.length)
return false;
else {
// 调用 enqueue 插入数据并返回true
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
// 只有当加锁成功才会调用该方法
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 如果数组索引达到数组的阀值,重新将 putIndex 重置为 0
if (++putIndex == items.length)
putIndex = 0;
//入列成功,当前队列元素数量自增
count++;
// 因为已经有数据了,就可以唤醒 notEmpty Condition了
notEmpty.signal();
}
// 添加元素,队列满的话会阻塞
public void put(E e) throws InterruptedException {
// 检查元素是否为null
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,锁定线程,除非当前线程是interrupted
lock.lockInterruptibly();
try {
// 判断数组是否已满
while (count == items.length)
notFull.await();// 数组已满,线程等待
// 将指定元素添加到队列尾部
enqueue(e);
} finally {
lock.unlock();
}
}
从源码中可以知道,使用 add 方法的时候实际上是调用 offer 方法,如果队列满的情况下,add 方法直接抛出Queue full
异常;而使用 offer 方法通过可重入锁(ReentrantLock)保证线程安全,当队列满的情况下会直接返回 false;put 方法则会在队列满的情景下阻塞线程,直到队列数组有位置。
值得一说的是,enqueue 方法中,当队列数组满的时候,会将 putIndex 设置为 0。这是什么意思呢?这就是上面提到的 takeIndex 和 putIndex 妙处。实际上,你可以认为 ArrayBlockingQueue 的数组是一个环形数组(这不禁让我想起高性能并发框架 Disruptor 的核心 RingBuffer,可以参考我另外一篇文章对 Disruptor 的介绍:https://www.jianshu.com/p/0f35c3866780)。
参考上图,假设队列数组的长度为 8,当队列数组索引 0~7 位置都塞满数据的情景下,设置 putIndex 为 0,意味着下一次添加数组索引位置是 0,当执行 take 方法的时候会从数组头部,也就是索引为 0 的位置开始读取删除数据,那么现在索引位置 1~7 都是数据,而索引为 0 的位置是不是空啦?所以当执行 offer 方法或 put 方法的时候就会将数据新增到索引为 0 的位置上。
读取元素:peek() 和 poll() 和 take()
// 读取队列元素,但不删除
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取索引位置为 takeIndex 的元素
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
@SuppressWarnings("unchecked")
final E itemAt(int i) {
// 根据索引从队列数组中获取元素
return (E) items[i];
}
// 读取队列元素并删除
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中元素的个数为0,返回null;否则执行dequeue方法
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取队列数组索引为 takeIndex 的元素
E x = (E) items[takeIndex];
// 把数组索引为 takeIndex 的元素设置为 null
items[takeIndex] = null;
// 下一次取元素的索引值累加1
if (++takeIndex == items.length)
takeIndex = 0;
// 出列成功,数组元素数量 -1
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
// 读取队列元素并删除,如果队列空的话会阻塞
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 判断当前数组元素个数是否为 0
while (count == 0)
notEmpty.await();// 如果数组为空则当前线程阻塞,直到其它线程唤醒才继续执行
// 读取数组元素
return dequeue();
} finally {
lock.unlock();
}
}
读取队列数组元素是根据 takeIndex 作为索引获取的,与新增方法类似,当读取数组元素的索引值到了数组的末端,则会从头开始读取,也就是 dequeue 方法的 takeIndex = 0
删除数组元素:remove(Object o)
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 循环数组元素,找到要移除的元素调用removeAt()方法并返回true,如果循环到最后没找到元素则跳出循环返回false
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 如果移除的元素就是索引位置为 takeIndex 的元素
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
// 循环队列,查找到元素,再移除元素
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
// 数组数量减1
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 唤醒 notFull
notFull.signal();
}
查找包含指定的元素:contains(Object o)
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 判断数组是否有元素,如果没有直接返回false
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 循环数组元素,如果找到对应的元素返回true,若一直循环到插入索引的位置都没找到元素就跳出循环返回false
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
清空链表所有的节点:clear()
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 遍历数组元素,都设置为null,便于GC回收
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
// 唤醒等待线程,防止其它线程一直阻塞,损耗CPU的资源
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
通过以上 ArrayBlockingQueue 源码的剖析,相信你应该不难回答文章开头的问题啦。
总结:
通过上面的源码不难看出,ArrayBlockingQueue 的核心就是使用 ReentrantLock
控制队列的访问,通过两个 Condition
控制线程之间的通信;takeIndex
和 putIndex
两个变量分别指定队列入队和出队的位置,当 takeIndex 或 putIndex 达到边界则重置为0,相当于 ArrayBlockingQueue 数组是一个环形数组
。环形数组比常见的顺序数组有什么好处?通过上面的增删元素的源码分析,你有发现需要移动数组的元素吗?如果是顺序数组的话,还要维护数组索引的位置,当增删元素的时候要移动大量的元素,性能差!而环形数组在增删元素的时候只需要改变入队和出队的位置就行,大大提高性能!
下一篇为:J.U.C 阻塞队列源码剖析系列(三)之 LinkedBlockingQueue
如果觉得源码剖析不错的话,麻烦点个赞哈!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~
网友评论