0. 前言
生产者消费者是考察多线程的常见问题。最近尝试手写生产者和消费者时,发现这个问题并不止是考察多线程,还可以考察泛型、设计模式等。这里总结下如何手写生产者消费者。
1. 预备知识
- Java多线程基本知识
- 泛型
- 简单的设计模式
2. 设计与整体架构
生产者消费者模式,需要生产者生产产品,消费者消耗产品。因此围绕着产品,我们需要:
- 产品
-
容器:盛放产品,既生产者生产后放入容器中,消费者到容器中取出产品。本文使用数据结构中队列(
Queue
)——尾部插入,头部取出 - 生产者:生产产品
- 消费者:消费产品
- 线程池:生产者和消费者跑在不同线程中,用过同步机制实现生产和消费。
分析完,上代码。
↓面向抽象(接口)的编程,无论如何逼格先起来~~
//产品接口
interface Product<T> {
T get();
}
//生产者接口
interface Producer<T> {
Product<T> produce();
}
//消费者接口
interface Consumer<T> {
void consume(Product<T> product);
}
然后是各个实现类,以产生long为例:↓
//产品类
static class ProductImpl<T> implements Product<T> {
private T data;
ProductImpl(T data) {
this.data = data;
}
@Override
public T get() {
return data;
}
}
//生产者类
static class LongProducer implements Producer<Long> {
@Override
public Product<Long> produce() {
costTime(3000);
long data = System.currentTimeMillis();
System.out.println("produce:" + data);
return new ProductImpl<>(data);
}
}
//消费者类
static class LongConsumer implements Consumer<Long> {
@Override
public void consume(Product<Long> product) {
costTime(2000);
System.out.println("consume:" + product.get());
}
}
//模拟生产或消费时花费的时间
private static void costTime(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
↓实际上生产者和消费者需要跑在Thread
中,因此我们还需要用Runnable
包裹一下,并循环生产/消费。利用代理模式,我们达到一个类只做一件事情
的目的。其中produceOnce()
和consumeOnce()
待实现
//利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
static class RunnableProducer<T> implements Producer<T>, Runnable {
Producer<T> delegate;
final Queue<Product<T>> queue;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
this.delegate = producer;
this.queue = queue;
}
@Override
public Product<T> produce() {
return delegate.produce();
}
@Override
public void run() {
while (!canceled) {
produceOnce();
}
}
private void produceOnce() {
// TODO: 19/3/18
}
}
//利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
static class RunnableConsumer<T> implements Consumer<T>, Runnable {
Consumer<T> delegate;
final Queue<Product<T>> queue;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
this.delegate = consumer;
this.queue = queue;
}
@Override
public void consume(Product<T> product) {
delegate.consume(product);
}
@Override
public void run() {
while (!canceled) {
consumeOnce();
}
}
private void consumeOnce() {
// TODO: 19/3/18
}
}
↓最后写下测试代码,使用LinkedList
作为Queue
,跑在ThreadPool
中。至此整体架构就完成了。
public static void main(String[] args) {
Consumer<Long> consumer = new LongConsumer();
Producer<Long> producer = new LongProducer();
testSimpleProducerConsumer(producer, consumer);
}
private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
Queue<Product<T>> queue = new LinkedList<>();
RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(realConsumer);
executorService.execute(realProducer);
costTime(30 * 1000);
executorService.shutdown();
realConsumer.cancel();
realProducer.cancel();
}
3. 实现
3.1 PlanA: Object的 wait()notify()
常用的同步/互斥就是Object
的wait()
和notify()
系列方法了。我们先用它来实现生产和消费的同步。
private void produceOnce() {
synchronized (queue) {
try {
//先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
while (queue.peek() != null) {
queue.wait();
}
//唤醒后执行生产
queue.add(produce());
//生产后发送notify()唤醒消费者
queue.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void consumeOnce() {
synchronized (queue) {
try {
//先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
while (queue.peek() == null) {
queue.wait();
}
//唤醒后执行消费
consume(queue.poll());
//消费完后发送notify()唤醒生产者
queue.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注释都写得很明白啦。跑一波看看结果。

总结:使用
Object
的wait()
和notify()
需要自己实现生产者和消费者的配合,要小心各种逻辑的处理。下面使用可阻塞的BlockingQueue
代替Queue
,实现起来更简单,也更不容易出错。
3.2 PlanB:使用BlockingQueue
//使用BlockingQueue代替Queue,实现可阻塞的队列
BlockingQueue<Product<T>> queue;
private void produceOnce() {
try {
Product<T> produce = produce();
queue.put(produce);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void ConsumeOnce() {
try {
Product<T> product = queue.take();
consume(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
//使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(realProducer);
executorService.execute(realConsumer);
costTime(30 * 1000);
executorService.shutdown();
realConsumer.cancel();
realProducer.cancel();
}
public static void main(String[] args) {
Consumer<Long> consumer = new LongConsumer();
Producer<Long> producer = new LongProducer();
// testSimpleProducerConsumer(producer, consumer);
testBetterProducerConsumer(producer, consumer);
}
↓跑一把

4. 总结
手写生产者消费者看似考察多线程知识,但这既是挑战也是机遇,如果用心思考,可以展示你更多的能力。本文不止是多线程知识,还有设计模式(面向接口的抽象思想,接口隔离原则,单一职责原则,代理模式,装饰器模式等
)、泛型
的使用等。另外,我们看到RunnableConsumer
和BetterRunnableConsumer
中有大量重复代码,因此可以再抽象一层出来。而且他们都依赖了Queue
,并且是构造方法中传入的依赖。这其实并不好,可以使用依赖倒置原则(依赖注入)
进一步优化。
5. 其他
参考:
https://www.jianshu.com/p/e29632593057
这边文章还提到了了错过notify信号
,wait条件变化
,唤醒同类导致“假死”状态
等。
6. 代码
package com.zz.multithreaddemo;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
public class ProducerConsumerDemo {
public static void main(String[] args) {
Consumer<Long> consumer = new LongConsumer();
Producer<Long> producer = new LongProducer();
testSimpleProducerConsumer(producer, consumer);
// testBetterProducerConsumer(producer, consumer);
}
private static <T> void testSimpleProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
Queue<Product<T>> queue = new LinkedList<>();
RunnableConsumer<T> realConsumer = new RunnableConsumer<>(consumer, queue);
RunnableProducer<T> realProducer = new RunnableProducer<>(producer, queue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(realConsumer);
executorService.execute(realProducer);
costTime(10 * 1000);
executorService.shutdown();
realConsumer.cancel();
realProducer.cancel();
}
private static <T> void testBetterProducerConsumer(Producer<T> producer, Consumer<T> consumer) {
//使用SynchronousQueue实现同步队列,当然也可以选择其他BlockingQueue的实现,但有各种不同的特点
BlockingQueue<Product<T>> queue = new SynchronousQueue<>();
BetterRunnableProducer<T> realProducer = new BetterRunnableProducer<>(producer, queue);
BetterRunnableConsumer<T> realConsumer = new BetterRunnableConsumer<>(consumer, queue);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(realProducer);
executorService.execute(realConsumer);
costTime(10 * 1000);
executorService.shutdown();
realConsumer.cancel();
realProducer.cancel();
}
//产品接口
interface Product<T> {
T get();
}
//生产者接口
interface Producer<T> {
Product<T> produce();
}
//消费者接口
interface Consumer<T> {
void consume(Product<T> product);
}
//产品类
static class ProductImpl<T> implements Product<T> {
private T data;
ProductImpl(T data) {
this.data = data;
}
@Override
public T get() {
return data;
}
}
//生产者类
static class LongProducer implements Producer<Long> {
@Override
public Product<Long> produce() {
costTime(3000);
long data = System.currentTimeMillis();
System.out.println("produce:" + data);
return new ProductImpl<>(data);
}
}
//消费者类
static class LongConsumer implements Consumer<Long> {
@Override
public void consume(Product<Long> product) {
costTime(2000);
System.out.println("consume:" + product.get());
}
}
//模拟生产或消费时花费的时间
private static void costTime(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//利用修饰模式or代理模式包裹的生产者,并实现Runnable接口达到循环生产的目的
static class RunnableProducer<T> implements Producer<T>, Runnable {
Producer<T> delegate;
final Queue<Product<T>> queue;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
public RunnableProducer(Producer<T> producer, Queue<Product<T>> queue) {
this.delegate = producer;
this.queue = queue;
}
@Override
public Product<T> produce() {
return delegate.produce();
}
@Override
public void run() {
while (!canceled) {
produceOnce();
}
}
private void produceOnce() {
synchronized (queue) {
try {
//先查询,如果有未消费的就wait(),直到消费者消费完成后发送notify()唤醒
while (queue.peek() != null) {
queue.wait();
}
//唤醒后执行生产
queue.add(produce());
//生产后发送notify()唤醒消费者
queue.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//利用修饰模式or代理模式包裹的消费者,并实现Runnable接口达到循环消费的目的
static class RunnableConsumer<T> implements Consumer<T>, Runnable {
Consumer<T> delegate;
final Queue<Product<T>> queue;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
public RunnableConsumer(Consumer<T> consumer, Queue<Product<T>> queue) {
this.delegate = consumer;
this.queue = queue;
}
@Override
public void consume(Product<T> product) {
delegate.consume(product);
}
@Override
public void run() {
while (!canceled) {
consumeOnce();
}
}
private void consumeOnce() {
synchronized (queue) {
try {
//先查询,如果没有产品就wait()。直到生产者生产后发送notify()唤醒
while (queue.peek() == null) {
queue.wait();
}
//唤醒后执行消费
consume(queue.poll());
//消费完后发送notify()唤醒生产者
queue.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class BetterRunnableProducer<T> implements Producer<T>, Runnable {
//使用BlockingQueue代替Queue,实现可阻塞的队列
BlockingQueue<Product<T>> queue;
Producer<T> delegate;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
BetterRunnableProducer(Producer<T> producer, BlockingQueue<Product<T>> queue) {
this.delegate = producer;
this.queue = queue;
}
@Override
public Product<T> produce() {
return delegate.produce();
}
@Override
public void run() {
while (!canceled) {
produceOnce();
}
}
private void produceOnce() {
try {
Product<T> produce = produce();
queue.put(produce);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class BetterRunnableConsumer<T> implements Consumer<T>, Runnable {
BlockingQueue<Product<T>> queue;
Consumer<T> delegate;
volatile boolean canceled = false;
public void cancel() {
canceled = true;
}
BetterRunnableConsumer(Consumer<T> consumer, BlockingQueue<Product<T>> queue) {
this.delegate = consumer;
this.queue = queue;
}
@Override
public void consume(Product<T> product) {
delegate.consume(product);
}
@Override
public void run() {
while (!canceled) {
ConsumeOnce();
}
}
private void ConsumeOnce() {
try {
Product<T> product = queue.take();
consume(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
网友评论