BlockingQueue
什么是BlockingQueue?
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
一个队列,加之支持在检索一个元素时等待直到队列非空的操作,和等待队列的空间变得可用再进行排序。
BlockingQueue的方法有四种表现形式,在处理不能立刻满足但有可能在某一个时刻满足的操作时有不同的处理方式:(BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future)
- 抛出一个异常
- 返回一个特殊的值(null或false,取决于你的操作)
- 不限时的阻塞当前线程,直到这个操作能够成功
- 设置一个超时时间,超时就放弃
具体详情,请见下表
Function | ThrowsException | SpecialValue | Blocks | TimesOut |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
BlockingQueue不接受null元素。 实现在尝试add,put或offer null时会抛出NullPointerException。null用作标记值以指示轮询操作失败。
BlockingQueue可以是容量限制的。在任何给定时间,它可能具有剩余容量,超过该容量,不能在没有阻塞的情况下放置其他元素。没有任何内在容量限制的BlockingQueue始终报告Integer.MAX_VALUE的剩余容量。
BlockingQueue实现主要用于生产者-消费者队列,但另外支持Collection接口。因此,例如,可以使用remove(x)从队列中删除任意元素。然而,这些操作通常不是非常有效地执行,并且仅用于偶尔使用,例如当排队的消息被取消时。
BlockingQueue实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果。但是,除非在实现中另有说明,否则批量收集操作addAll,containsAll,retainAll和removeAll不一定以原子方式执行。 因此,例如,在仅添加c中的一些元素之后,addAll(c)可能会失败(抛出异常)。
BlockingQueue本质上不支持任何类型的“close”或“shutdown”操作,以指示不再添加任何item。这些功能的需求和使用倾向于依赖于实现。例如,一种常见的策略是生产者插入特殊的end-of-stream或poison对象,这些对象在被消费者采用时会相应地进行解释。
怎么用?
在这里,使用生产-消费者模式,举一个栗子
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
网友评论