因为后面要学习线程池,所以在在这里先分析下ArrayBlockingQueue,为以后做准备。
什么是ArrayBlockingQueue?#####
首先通过名字我们可以理解到这是一个用Array实现的,操作是会产生阻塞的队列。其实大概意思差不多。
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。
Queue家族的成员:#####

上面这些是比较全的Queue体系,但是我们常用的其实可能不多,主要有:
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue, (基于数组的并发阻塞队列)
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue, (带优先级的无界阻塞队列)
9.SynchronousQueue (并发同步阻塞队列)
这里主要看一下ArrayBlockingQueue的实现。
初始化:#####
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
这里可以看出来,ArrayBlockingQueue内部是通过一个Object数组和一个ReentrantLock实现的。同时ReentrantLock在使用时也提供了公平和非公平两种。因为数组是有界的,所以在数组为空和数组已满两种情况下需要阻塞线程,所以使用了Condition来实现线程的阻塞。
添加元素的方法:#####
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
首先offer方法是加锁的,在当前的Array的count等于数组的容量时,也就是数组满的时候返回false,如果没满,那么插入数据到Array中,最后解锁返回true。
之后是add:
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add方法其实也是使用了offer方法,但是不同的是,如果数组是满的那么add会抛出IllegalStateException异常,add成功后会返回true。
最后一个添加元素的方法是put:
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
在put的实现里用到了Condition—notFull,在put的时候,如果数组已经满了,那么添加元素是不成功的(offer的实现),但是此时如果希望能等待数组有空间添加元素,那么可以使用put,如果数据已满,那么在notFull上等待。如果有数组的元素移除的操作就会唤醒这个put,让元素能添加到数组中。同时在调用insert方法是会调用notEmpty.signal() 唤醒在notEmpty上等待的线程。最后解锁返回。
移除元素方法:#####
首先看poll方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
poll方法是从数组中弹出一个元素,但是如果当前的数组内没有元素则直接返回null,如果有元素,那么返回指定的索引位置的元素,并删除原来的元素。同时在弹出元素后唤醒等待在notFull上的线程。
之后看下take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
如果我们希望在获取元素的时候,如果没有元素,我们希望线程阻塞指导有元素可取。那么这个实现就是take方法了。在take时,如果当前的数组内没有元素的话,那么线程等待在notEmpty上,直到insert是唤醒在notEmpty上等待的线程。
最后看一下peek方法:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
peek方法是比较简单的,只是在数组存在元素的时候,获取指定的索引的元素,但是不会移除这个元素。
这里的takeIndex是指定的数组索引,代码如下:
/**
* Circularly increment i.
*/
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
从上面的代码可以看出来,数组内的元素在添加元素的时候并没有进行元素移动的,移动的是takeIndex,如果takeIndex==items.length,那么回到0.
而putInde的移动策略和takeIndex是一致的。所以整个的添加和移除操作是一个环状的操作过程:
以put和take为例:
在put的时候利用putIndex为索引进行元素添加,每一次put都会检测count和数组容量,如果count小于数组容量,证明可以添加元素,而添加以putindex为准,如果putindex增长到数组容量,putindex回滚到0.此时添加元素又从头开始。而takeIndex也是一样,在takeIndex等于数组容量的时候,会回滚到0,从头开始take。可以说ArrayBlockingQueue利用数组的index的有序性和ReentrantLock实现的。保证了ArrayBlockingQueue线程安全性,但是ArrayBlockingQueue是有局限性的,容量需要在初始化的时候指定,所以必须在初始化的时候就考虑好容量,否则会对使用的性能有很大的影响。
网友评论