ArrayBlockingQueue使用场景
我们先写个程序看下如何使用ArrayBlockingQueue:
public class CB {
public static void main(String[] args) {
int capacity = 10;
ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);
for (int i = 0; i < 20; i++) {
new Thread(new Producer(queue)).start();
}
new Thread(new Consumer(queue)).start();
}
static class Producer implements Runnable {
//容器
private final ArrayBlockingQueue<Bread> queue;
public Producer(ArrayBlockingQueue<Bread> queue) {
this.queue = queue;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (true) {
produce();
}
}
public void produce() {
/**
* put()方法是如果容器满了的话就会把当前线程挂起
* offer()方法是容器如果满的话就会返回false,也正是我在前一篇中实现的那种策略。
*/
try {
Bread bread = new Bread();
Thread.sleep(2000);
queue.put(bread);
System.out.println("Producer:" + bread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
//容器
private final ArrayBlockingQueue<Bread> queue;
public Consumer(ArrayBlockingQueue<Bread> queue) {
this.queue = queue;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (true) {
consume();
}
}
public void consume() {
/**
* take()方法和put()方法是对应的,从中拿一个数据,如果拿不到线程挂起
* poll()方法和offer()方法是对应的,从中拿一个数据,如果没有直接返回null
*/
try {
Thread.sleep(2000);
Bread bread = queue.take();
System.out.println("consumer:" + bread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Bread {
long time;
Bread() {
this.time = System.currentTimeMillis() / 1000;
}
@Override
public String toString() {
return String.valueOf(time);
}
}
}
这是一个生产者消费者模型的例子,代码中定义了20个生产者线程,1个消费者线程,产品池中最多容纳10个面包,输出如下:
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
Producer:1544429143
consumer:1544429143
Producer:1544429143
Producer:1544429143
consumer:1544429143
Producer:1544429145
可以看到,生产和消费能够正确并发执行。
ArrayBlockingQueue构造器和成员变量
提供了三个构造器:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
可以看出,这个类本质是一个由数组支持的有界阻塞队列,其加锁机制还是依赖ReentrantLock。
类有以下成员变量:
//真正保存数据的数组
final Object[] items;
///** take, poll, peek or remove的下一个索引 */
int takeIndex;
///** put, offer, or add的下一个索引 */
int putIndex;
//元素个数
int count;
//可重入锁
final ReentrantLock lock;
//队列不为空的条件
private final Condition notEmpty;
//队列未满的条件
private final Condition notFull;
//遍历使用的迭代器
transient Itrs itrs = null;
ArrayBlockingQueue核心方法
有以下核心方法:
- 放入数据
- add(object)队列没满的话,放入成功。否则抛出异常。
- offer(object):表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
- offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
- put(object)把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续.
- 获取数据
- poll()取走BlockingQueue里排在首位的对象,
- poll(long timeout, TimeUnit unit)从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
- take()取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
- drainTo()一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
存放数据方法
看下add(object)
源码:
public boolean add(E e) {
return super.add(e);
}
super.add(e)源码在父类AbstractQueue中:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
所以看下offer的实现:
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
还有一个具有超时功能的offer(Object, timeout, timeUnit)
方法:
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
上面的三个方法都是非阻塞的,下面的put方法是阻塞的:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
可以看到,数据满的话会进入notFull.await()
状态等待唤醒,然后执行enqueue(E x)
:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
enqueue方法会给一些变量赋值,比如非空信号量notEmpty、元素个数count、下一个加入的索引putIndex。
获取数据方法
poll()
不会抛出异常,当元素个数为0是,返回null,否则,调用dequeue函数,并唤醒等待notFull条件的线程并返回,与offer相对应:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
take函数用于从ArrayBlockingQueue中获取一个元素,在当前线程被中断时会抛出异常,并且当队列为空时,会阻塞一直等待,其与put函数相对应。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
ArrayBlockingQueue遍历
ArrayBlockingQueue提供了两个内部类辅助遍历:Itr
和Itrs
。
Itrs
源码:
class Itrs {
private class Node extends WeakReference<Itr> {
Node next;
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
/** Incremented whenever takeIndex wraps around to 0 */
int cycles = 0;
/** Linked list of weak iterator references */
private Node head;
/** Used to expunge stale iterators */
private Node sweeper = null;
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
Itrs(Itr initial) {
register(initial);
}
void doSomeSweeping(boolean tryHarder) {
// assert lock.getHoldCount() == 1;
// assert head != null;
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
final Node sweeper = this.sweeper;
boolean passedGo; // to limit search to one full sweep
if (sweeper == null) {
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}
for (; probes > 0; probes--) {
if (p == null) {
if (passedGo)
break;
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.isDetached()) {
// found a discarded/exhausted iterator
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
p.clear();
p.next = null;
if (o == null) {
head = next;
if (next == null) {
// We've run out of iterators to track; retire
itrs = null;
return;
}
}
else
o.next = next;
} else {
o = p;
}
p = next;
}
this.sweeper = (p == null) ? null : o;
}
void register(Itr itr) {
// assert lock.getHoldCount() == 1;
head = new Node(itr, head);
}
void takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.takeIndexWrapped()) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}
void removedAt(int removedIndex) {
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.removedAt(removedIndex)) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}
void queueIsEmpty() {
// assert lock.getHoldCount() == 1;
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
}
Itr
源码:
private class Itr implements Iterator<E> {
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;
private static final int REMOVED = -2;
/** Special value for prevTakeIndex indicating "detached mode" */
private static final int DETACHED = -3;
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
private boolean invalidated(int index, int prevTakeIndex,
long dequeues, int length) {
if (index < 0)
return false;
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return dequeues > distance;
}
private void incorporateDequeues() {
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// Check indices for invalidation
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
private void detach() {
if (prevTakeIndex >= 0) {
// assert itrs != null;
prevTakeIndex = DETACHED;
// try to unlink from itrs (but not too hard)
itrs.doSomeSweeping(true);
}
}
public boolean hasNext() {
// assert lock.getHoldCount() == 0;
if (nextItem != null)
return true;
noNext();
return false;
}
private void noNext() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// assert cursor == NONE;
// assert nextIndex == NONE;
if (!isDetached()) {
// assert lastRet >= 0;
incorporateDequeues(); // might update lastRet
if (lastRet >= 0) {
lastItem = itemAt(lastRet);
// assert lastItem != null;
detach();
}
}
// assert isDetached();
// assert lastRet < 0 ^ lastItem != null;
} finally {
lock.unlock();
}
}
public E next() {
// assert lock.getHoldCount() == 0;
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
incorporateDequeues();
// assert nextIndex != NONE;
// assert lastItem == null;
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
// assert nextItem != null;
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
void shutdown() {
// assert lock.getHoldCount() == 1;
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
if (lastRet >= 0) {
lastRet = REMOVED;
lastItem = null;
}
prevTakeIndex = DETACHED;
// Don't set nextItem to null because we must continue to be
// able to return it on next().
//
// Caller will unlink from itrs when convenient.
}
private int distance(int index, int prevTakeIndex, int length) {
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return distance;
}
boolean takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {
// All the elements that existed at the time of the last
// operation are gone, so abandon further iteration.
shutdown();
return true;
}
return false;
}
}
网友评论