我们都知道Queue是一种操作线性表,可以用数组、链表来实现。
Queue的特点是先进先出。
存在问题:
如果我们使用普通的数组来实现Queue,一定会存在一个问题,每次出对的时候,从数组的第一位开始取,那么只要出对完成,就会涉及到整个数组搬迁,会导致时间最差复杂度成为O(n) 。单纯从设计Queue来讲,我觉得是应该要避免这样的搬迁操作。
循环数组
我们都知道数组是一块连续的内存空间。但是循环数组,确实不多见。如何理解?
环形数组如上图,是一个大神的画的,看着挺复杂的,实现起来并没有那么复杂,只要抓住核心tail和head 即可,还有一个很重要的一个算法 (tail + 1) % n ,如下代码。
public class CircularQueue {
// 数组:items,数组大小:n
private String[] items;
private int n = 0;
// head表示队头下标,tail表示队尾下标
private int head = 0;
private int tail = 0;
// 申请一个大小为capacity的数组
public CircularQueue(int capacity) {
items = new String[capacity];
n = capacity;
}
// 入队
public boolean enqueue(String item) {
// 队列满了
if ((tail + 1) % n == head) return false;
items[tail] = item;
tail = (tail + 1) % n;
return true;
}
// 出队
public String dequeue() {
// 如果head == tail 表示队列为空
if (head == tail) return null;
String ret = items[head];
head = (head + 1) % n;
return ret;
}
}
这是一段使用循环数组实现简单Queue的代码,这样实现完全解决了上面的问题,出对的时候时间复杂度从O(n) 降成O(1)。
如果仔细看代码你会发现,在执行到临界点的时候,即队列满了,会存在有一个位置为空,内存如此宝贵怎么能忍。
有个简单操作,show you code。
public class CircularQueue {
// 数组:items,数组大小:n
private String[] items;
private int n = 0;
// 定义数组的长度
private int size = 0;
// head表示队头下标,tail表示队尾下标
private int head = 0;
private int tail = 0;
// 申请一个大小为capacity的数组
public CircularQueue(int capacity) {
items = new String[capacity];
n = capacity;
}
// 入队
public boolean enqueue(String item) {
if (size == n) {
return false;
}
// 队列满了
items[tail] = item;
tail = (tail + 1) % n;
size++;
return true;
}
// 出队
public String dequeue() {
// 如果head == tail 表示队列为空
if (size == 0) {
return null;
}
String ret = items[head];
head = (head + 1) % n;
size--;
return ret;
}
}
上述简单介绍了循环队列,设计确认比较巧妙,对于一些底层的编码,什么都要节省,如果有方式方法能节省就节省吧,毕竟现在我们都是天天把节省成本挂在嘴边。
重点看下ArrayBlockingQueue
之前一直用ArrayBlockingQueue,也看过几次源码,看了也忘记了。一直没有抓住核心点。这次看到了循环数组突然对ArrayBlockingQueue有些好奇。
是不是也是循环数组的方式来实现Queue
我们看下ArrayBlockingQueue源码,其他方法可以先不着急去看,先去看核心的方法enqueue和dequeue
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
从上述代码来看,涉及到几个成员变量和ArrayBlockingQueue构造函数
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
其中takeIndex、putIndex、count 都是int 类型,默认为0。
notEmpty 表示队列为空的时候,出对操作的线程状态变成等待状态,并释放锁,入队操作的时候,会通知唤醒等待时间最长的线程。
notFull 表示队列为满的时候,入队操作线程状态变成等待状态,并释放锁,出对操作的时候,会通知唤醒等待时间最长的线程。
从ArrayBlockingQueue 中enqueue 和dequeue 源码看很容易理解使用的就是循环数组来实现的,如下代码
if (++putIndex == items.length)
putIndex = 0;
if (++takeIndex == items.length)
takeIndex = 0;
光看enqueue 和dequeue 源码发现其实整个逻辑非常不严谨,不严谨也没关系,主要看他们俩都是私有方法。更何况有个核心成员变量 count还没有涉及到。
ArrayBlockingQueue 对外提供的出对和入对操作都是成对,我们一一看来
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
offer和poll方法中 通过count 来 判断队列是否满了,或者是否为空队列,前提都加了Lock,为了保证线程安全,出入对都统一锁了。
到这里可能会有疑问阻塞去哪了?
我们再看另外一对的出入对方法
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
notFull.await() 和 notEmpty.await() 使用上了,表示这俩个操作是阻塞的,很容易理解。
这里面有一个细节点需要注意
while (count == items.length)
notFull.await();
while (count == 0)
notEmpty.await();
我刚开始看的时候,我也有疑问为什么使用while而非if判断,其实是做了双重保证,防止过早或者意外通知唤醒。
再看下最后一对
/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
理解了上面俩对出入对的方法,看到这对也是比较容易理解,加了阻塞时间而已。
总结
- 在我们经常使用基于数组的队列,里面也是使用循环数组的思想
- 循环数组的核心,就是一个圆环,通过tail 和head指针来定义出入对的索引。
- 循环数组的实现,有很多种,只要理解了思想,怎么实现就都可以。
- ArrayBlockingQueue 是一种阻塞队列,也提供非阻塞的线程安全出入对方法。
- ArrayBlockingQueue 阻塞实现是通过lock Condition来实现的。
今天是辛丑年最后一天,除夕,回不了自己的家乡,只能待在杭城。
在此祝所有的亲朋好友除夕安康,来年虎虎生威!
网友评论