引言:前面多线程文章中讲线程池的时候你应该注意到了阻塞队列,就是我们在
new ThreadPoolExecutor()
传参的时候使用了new LinkedBlockingQueue<Runnable>(4)
,那篇文章中并没有去过多的介绍BlockQueue
接口相关的知识,那么这篇文章我们就来重点的介绍一下队列以及阻塞队列相关的知识。
队列
首先要说的就是 队列
这种数据结构它的特点,FIFO
(先进先出),先进队的任务先出队,就比如平常我们去排队买早餐,谁先来谁就在前面谁先走,大家时间都着急你也别插队。这是严格意义上的队列解释,也就是说先出队的一定是先进队的,但是也有不严格的,比如 PriorityQueue
,使用该队列的 pool 或 peek 方法的时候,返回的都是队列中最小的元素而不是先进入队列的元素,因为它对队列中的元素进行了排序,这个做一了解即可。Java集合框架 Collection
中给我们提供了一个接口 Queue(队列)
,即 Queue implements Collection
,同时提供了几个基本的方法来操作队列,比如
- add(E e):添加元素至队列尾部,如果队列已满 不会 抛出异常;
- offer(E e):添加元素至队列尾部,如果队列已满 会 抛出异常;
- E peek():返回队首元素,不会删除元素,如果队列为空 返回 null;
- E element():返回队首元素,不会删除元素,如果队列为空 抛出异常;
- E poll():返回并删除队首元素,如果队列为空 返回 null;
- E remove():返回并删除队首元素,如果队列为空 抛出异常。
当然,大部分该接口的实现类都重写了上面的方法(下面会提到一些)。关于队列的基本知识我们就暂且说到这里,接下来就来说说今天的重点:阻塞队列。
阻塞队列
我们还是结合线程池来讲讲阻塞队列,上一讲我们说了常用的三种线程池创建的方式,也使用 ThreadPoolExecutor 去创建了线程池,为什么一般不建议使用下面这三种呢?本节我们就来一探究竟。
- newFixedThreadPool(int nThreads):底层实现使用 new LinkedBlockingQueue<Runnable>()。
- newSingleThreadExecutor():底层实现使用 new LinkedBlockingQueue<Runnable>()。
- newCachedThreadPool():底层实现使用 new SynchronousQueue<Runnable>()。
分别解释一下这两种阻塞队列(顺带说一下 ArrayBlockingQueue):
- LinkedBlockingQueue:一个基于链表实现的“有界”阻塞队列(但是这个界限是 Integer.MAX_VALUE = 2147483647,通常也就理解为无界);
public class QueueTest {
public static void main(String[] args) {
BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque(3);
try {
linkedBlockingDeque.put("1");
linkedBlockingDeque.put("2");
linkedBlockingDeque.put("3");
linkedBlockingDeque.put("4"); // 程序发生阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作会阻塞直到其他线程调用出队操作;
public class QueueTest {
public static void main(String[] args) {
BlockingQueue synchronousQueue = new SynchronousQueue();
try {
synchronousQueue.put("1");
synchronousQueue.put("2"); // 程序发生阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- ArrayBlockingQueue:一个基于数组实现的有界阻塞队列。
public class QueueTest {
public static void main(String[] args) {
BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("1")); // true
System.out.println(arrayBlockingQueue.add("2")); // true
System.out.println(arrayBlockingQueue.add("3")); // true
System.out.println(arrayBlockingQueue.add("4")); // 会抛出异常
// 虽然 ArrayBlockingQueue 重写了add 方法,但是却调用了父类的 add 方法,
// 它的父类是AbstractQueue,AbstractQueue类实现了 add 方法,代码如下,比较容易理解
System.out.println(arrayBlockingQueue.offer("4")); // false(队列已满返回 false)
}
}
public boolean add(E e) {
// offer 方法在队列满了的时候会返回false
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
对于api的使用这里就不做过多说明,相信大家都能看懂源代码的注释(因为我都能看懂),有兴趣可以自己写代码试试,看看具体的效果是什么样的。这里对一些方法做一个分类与小结:
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检索 | element() | peek() | 无 | 无 |
抛出异常:
当阻塞队列 满 时,再往队列里 add 元素会抛出 IllegalStateException:Queue full 异常;
当阻塞队列 空 时,再从队列里 remove 元素会抛出 NoSuchElementException 异常。
特殊值:
插入方法,成功返回 true, 失败返回 false;
移除元素,成功返回被移除的元素,失败返回 null。
阻塞:
当阻塞队列 满 时,生产者线程继续往队列里 put 元素,队列会一直阻塞生产线程直到 put 数据或响应终端退出;
当阻塞队列 空 时,消费者线程试图从往队列里 take 元素,队列会一直阻塞消费者线程直到队列可用。
超时:
当阻塞队列 满 时,队列会阻塞生产者线程一段时间,超过指定的时间后生产者线程会退出。
当阻塞队列 空 时,队列会阻塞消费者线程一段时间,超过指定的时间后消费者线程会退出。
下面就来使用阻塞队列的知识写一个简版的生产者消费者demo,结合 SynchronousQueue 的特性,模拟生产者生产一个消息,消费者消费一个消息。
简单版
public class QueueTest {
public static void main(String[] args) {
BlockingQueue synchronousQueue = new SynchronousQueue();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "aaa").start();
new Thread(() -> {
try {
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "bbb").start();
}
}
改造升级版
public class ProducerAndConsumerTest {
public static void main(String[] args) {
ProducerAndConsumer producerAndConsumer = new ProducerAndConsumer(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 生产线程启动");
producerAndConsumer.produce();
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 消费线程启动");
System.out.println();
System.out.println();
producerAndConsumer.consume();
}, "Consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println();
System.out.println();
System.out.println(Thread.currentThread().getName() + " 5s后 模拟故障叫停,结束生产与消费");
producerAndConsumer.stop();
}
}
class ProducerAndConsumer {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public ProducerAndConsumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
// 生产消息
public void produce() {
String value;
boolean result;
while (FLAG) {
value = atomicInteger.incrementAndGet() + "";
try {
result = blockingQueue.offer(value, 2L, TimeUnit.SECONDS);
if (result) {
System.out.println(Thread.currentThread().getName() + " 生产 " + value + " 任务成功");
} else {
System.out.println(Thread.currentThread().getName() + " 生产 " + value + " 任务失败");
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " 故障终止,FALG = false");
}
// 消费消息
public void consume() {
String result;
while (FLAG) {
try {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result || "".equals(result)) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + " 超过 2s 没有取到任务");
return;
}
System.out.println(Thread.currentThread().getName() + " 消费 " + result + " 成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 模拟故障
public void stop() {
this.FLAG = false;
}
}
改造升级版的涉及到的知识还是很多的,如果有看前面几篇文章的话,相信也不难理解!
好了,本次分享到此结束!
网友评论