一、模式特点
生产者与消费者模式中,生产者和消费者各自做着自己的工作,生产者生产物品,将物品放入缓冲区。消费者从缓冲区中拿走物品来使用。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

二、java实现
考虑到生产者消费者主要要点有:生产消费工作互不干扰,生产者生产不被消费者限制,而消费者消费仅取决于缓冲区中还有没有物品,没有物品则阻塞。
因此生产者和消费者应当在两个线程,然后公用一个缓冲区。并且在缓冲区满时,生产者线程阻塞。缓冲区为空时,消费者线程阻塞。
对缓冲队列加锁,因为缓冲队列是线程共享的,因此防止生产者和消费者同时操纵缓冲队列,出现同步问题。
Producer.java:
package com.meituan.huangdanyang.pcp;
public class Producer extends Thread {
private BufferQueue bufferQueue;
public Producer(BufferQueue bufferQueue){
this.bufferQueue = bufferQueue;
}
@Override
public void run() {
while (true){
try {
bufferQueue.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java:
package com.meituan.huangdanyang.pcp;
public class Consumer extends Thread {
private BufferQueue bufferQueue;
public Consumer(BufferQueue bufferQueue) {
this.bufferQueue = bufferQueue;
}
@Override
public void run() {
while (true) {
try {
bufferQueue.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
BufferdQueue.java:
package com.meituan.huangdanyang.pcp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class BufferQueue {
final static int size = 10;
public List<Double> products = new ArrayList<>();
public Object signal = new Object();
public void produce() throws InterruptedException {
Thread.sleep(1000);
Double product = Math.random();
System.out.println("produce " + Thread.currentThread().getId() + " " + product);
synchronized (products) {
if(products.size() == size) {
System.out.println("producer waiting");
products.wait();
} else {
products.add(product);
System.out.println("load into queue " + Thread.currentThread().getId() + " " + product);
products.notify();
}
}
}
public void consume() throws InterruptedException {
synchronized (products) {
if (products.size() == 0) {
System.out.println("consumer waiting");
products.wait();
} else {
System.out.println("consume " + Thread.currentThread().getId() + " " + products.get(0));
products.remove(0);
products.notify();
}
}
}
}
测试代码:
BufferQueue bufferQueue = new BufferQueue();
Thread producerThread = new Producer(bufferQueue);
Thread consumerThread = new Consumer(bufferQueue);
producerThread.start();
consumerThread.start();
运行结果:

网友评论