所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。
在实现生产者消费者问题时,可以采用三种方式:
- 使用 Object 的
wait()/notify()
的消息通知机制; - 使用 Lock 的 Condition 的
await()/signal()
的消息通知机制; - 使用 BlockingQueue 实现。
1、基于 Object 的 wait()/notifyAll()
实现方式
1.1 理论基础
关于 wait()
、notify()
和 notifyAll()
的理论基础,可以参考笔者的文章,
《并发编程专题 2:使用多线程编程》 的 《线程控制 wait()、notify() 和 notifyAll()》 一节。
1.2 基于 Object 的 wait()/notifyAll() 生产者消费者模式
// 生产者
private static class Consumer implements Runnable {
private List<Object> products;
// 传入的对象是产品,也就是说,生产者和消费者通过产品建立联系
public Consumer(List<Object> products) {
this.products = products;
}
public void run() {
while (true) { // 使用循环来不断消费
synchronized (products) { // 对产品加锁 products
while (products.isEmpty()) { // 1. 没有产品了
try { // 调用 wait() 的时候使用 tru...catch 防止线程终端
products.wait(); // 已经没有产品可以消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.remove(0); // 消费一个
products.notifyAll(); // 通知其他线程
System.out.println("Eat one. Left : " + products.size());
}
}
}
}
// 消费者
private static class Producer implements Runnable {
private final int max; // 产品的上限
private List<Object> products;
// 参数是产品的上限和产品列表(理解成仓库和仓库的最大容量亦可)
public Producer(int max, List<Object> products) {
this.max = max;
this.products = products;
}
public void run() {
while (true) {
synchronized (products) {
while (products.size() > max) { // 2. 大于上限就停止生产
try {
products.wait(); // 暂停生成
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(new Object()); // 生成一个
products.notifyAll(); // 唤醒
System.out.println("Made one. Total : " + products.size());
}
}
}
}
// 模拟 & 验证
public static void main(String...args) {
final int MAX_PRODUCTS = 20;
List<Object> products = new LinkedList<>();
Executor executor = Executors.newCachedThreadPool(); // 使用线程池
executor.execute(new Producer(MAX_PRODUCTS, products));
executor.execute(new Consumer(products));
executor.execute(new Consumer(products));
}
上面的生产者和消费者模式示例代码的注释已经非常详尽了,这里有几个问题需要说明一下。
首先是上述注释中的 1 和 2,这里我们使用的是 while 循环而不是 if 语句。这是因为假如存在多个消费者并且都因为产品不足而发生阻塞,当消费者调用了 notifyAll()
将它们唤醒的时候,其中的一个被唤醒并且消费了产品,释放了锁,而此时生产者还没有生成出新的产品,另一个生产者获得了锁,并进行消费,那么此时会因为没有可以消费的产品而抛出异常。所以,在消费者和生产者中,我们都使用 while 循环而不是 if. 这样,按照上述情形,另一个生产者会继续判断是否有可以消费的产品,如果没有的话,将会继续 wait.
另外就是使用 notifyAll()
而不是 notify()
唤醒。假如我们存在多个消费者在等待,此时一个消费者使用 notify()
唤醒的依然是消费者,那么线程将进入假死状态。
2、Lock 的 Condition 的 await()/signal()
的实现方式
使用 Lock 的实现方式与使用 Object 的 wait()
和 notifyAll()
的实现方式原理是一致的。
// 定义锁、仓库已满的条件和仓库为空的条件
private static ReentrantLock lock = new ReentrantLock();
private static final Condition full = lock.newCondition();
private static final Condition empty = lock.newCondition();
// 消费者
private static class Consumer implements Runnable {
private List<Object> products;
public Consumer(List<Object> products) {
this.products = products;
}
public void run() {
while (true) {
lock.lock(); // 加锁
try {
while (products.isEmpty()) { // 没有可消费的产品
try {
empty.await(); // 没有可用的产品了,等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.remove(0); // 消费一个
full.signalAll(); // 唤醒所有生产者
empty.signalAll(); // 唤醒所有消费者
System.out.println("Eat one. Left : " + products.size());
} finally {
lock.unlock(); // 释放锁
}
}
}
}
// 生产者
private static class Producer implements Runnable {
private final int max;
private List<Object> products;
public Producer(int max, List<Object> products) {
this.max = max;
this.products = products;
}
public void run() {
while (true) {
lock.lock(); // 加锁
try {
while (products.size() > max) { // 已经达到了最大的产量
try {
full.await(); // 已达最大产量,等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
products.add(new Object()); // 生产一个
full.signalAll();
empty.signalAll();
System.out.println("Made one. Total : " + products.size());
} finally {
lock.unlock(); // 释放锁
}
}
}
}
public static void main(String...args) {
final int MAX_PRODUCTS = 20;
List<Object> products = new LinkedList<>();
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Producer(MAX_PRODUCTS, products));
executor.execute(new Consumer(products));
executor.execute(new Consumer(products));
}
与第一种情形类似,在 wait()/notifyAll()
的实现中,加锁是对 products 进行的,也就是说,生产者和消费者通过产品(或者 “仓库”)联系到了一起。而上面的情形中,生产者和消费者通过 lock
, full
和 empty
三个条件关联。
3、基于 BlockingQueue 实现
从上面的两种实现方式中,我们也可以看出,实现生产和消费协调的关键,就是当生产达到最大数量或者可以消费的达到了最小值的时候,使生产者和消费者线程进行阻塞。既然是阻塞,那我们为什么不能直接通过阻塞队列来实现呢?
private static class Consumer implements Runnable {
private BlockingQueue<Object> products;
public Consumer(BlockingQueue<Object> products) {
this.products = products;
}
@Override
public void run() {
while (true) {
try {
products.take(); // 取不到数据的时候自动阻塞
System.out.println("Consumed one, Total " + products.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 生产者
private static class Producer implements Runnable {
private BlockingQueue<Object> products;
public Producer(BlockingQueue<Object> products) {
this.products = products;
}
@Override
public void run() {
while (true) {
try {
products.put(new Object()); // 当达到了最大数量的时候会阻塞
System.out.println("Produced one, Total " + products.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String...args) {
final int MAX_PRODUCTS = 20;
BlockingQueue<Object> products = new LinkedBlockingDeque<>(MAX_PRODUCTS);
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Producer(products));
executor.execute(new Consumer(products));
executor.execute(new Consumer(products));
}
从上面我们可以看出,这种实现方式的思想非常接近与我们第一种实现方式,即都是通过产品来在生产者和消费者之间建立联系。使用阻塞队列的实现方式优势明显,没有线程控制,代码更加简洁。
网友评论