java concurrent <概述>
java 队列 BlockingQueue 的使用案例
如上图所示:
生产线程将继续生成新对象并将其插入到队列中,直到队列达到可包含的上限。 换句话说,这是极限的。 如果阻塞队列达到其上限,则尝试插入新对象时,生成线程被阻止。 它仍然被阻塞,直到消耗的线程将对象从队列中取出。
消耗线程不断将对象从阻塞队列中取出,并进行处理。 如果消费线程尝试从空队列中取出一个对象,则消耗的线程将被阻塞,直到生成线程将对象放入队列。
接口 BlockingQueue 定义的方法
方法为四类:
- 第一种是抛出一个异常
- 第二种是返回一个特殊值(null 或 false,具体取决于操作
- 第三种是在操作可以成功前,无限期地阻塞当前线程
- 第四种是在放弃前只在给定的最大时间限制内阻塞
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove(e) | poll(e) | take(e) | offer(e, time, unit) |
检查 | element(e) | peek(e) | 不可用 | 不可用 |
BlockingQueue 不接受null 否则跑出NullPointException
BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。
BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释
BlockingQueue 的相关实现
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
队列的demo
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while(true){
System.out.println(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
网友评论