生产者消费者模型
生产者消费者模型是一个典型的多线程问题,涉及生产者、消费者、产品仓库。生产者生产的产品放入仓库中、消费者从仓库中取走产品。仓库可看成是阻塞队列,有如下关系。
- 仓库放满产品后生产者停止生产
- 仓库中没有产品时消费者停止消费
- 仓库不满的时候所有生产者都进行工作,仓库有产品后通知消费者消费
- 仓库有产品的时候所有消费者都工作,仓库不满时通知生产者生产
代码实现
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author HXJ
* @date 2018/7/26
*/
public class ProduceConsume {
static class Produce {
public Produce(String name) {
this.name = name;
}
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Produce{" +
"name='" + name + '\'' +
'}';
}
}
//单向链表,用于构建队列
static class Node {
Produce produce;
Node next;
Node(Produce produce) { this.produce = produce; }
}
static class ProduceDepot {
private int capacity;
private final AtomicInteger count = new AtomicInteger();
private Node head;
private Node tail;
private Object notFull = new Object();
private Object notEmpty = new Object();
public ProduceDepot(int capacity) {
this.capacity = capacity;
this.head = this.tail = new Node(null);
}
public void produce(Produce produce) {
Node node = new Node(produce);
int num;
synchronized (notFull) {
while (count.get() == this.capacity) {
try {
//如果仓库满了,停止生产
System.out.println(Thread.currentThread().getName() + " 仓库满了,停止生产");
notFull.wait();
} catch (InterruptedException e) {
}
}
//新生产的产品排队放入仓库,放在队列末尾
this.tail = this.tail.next = node;
num = count.getAndIncrement();
if (num + 1 < this.capacity) {
//如果队列不满了通知生产
notFull.notifyAll();
System.out.println(Thread.currentThread().getName() + " 仓库没满,通知其他生产者");
}
}
if (num == 0) {
synchronized (notEmpty) {
System.out.println(Thread.currentThread().getName() + " 仓库有产品,通知消费者消费");
notEmpty.notifyAll();
}
}
}
public Produce consume() {
int num;
Produce produce;
synchronized (notEmpty) {
while (count.get() == 0) {
try {
System.out.println(Thread.currentThread().getName() + " 仓库空了,停止消费");
notEmpty.wait();
} catch (InterruptedException e) {
}
}
//把先进仓库的产品取出
Node h = this.head;
Node first = this.head.next;
this.head = first;
h.next = null; //gc
produce = first.produce;
first.produce = null;
num = count.getAndDecrement();
if (num > 1) {
System.out.println(Thread.currentThread().getName() + " 仓库有产品,通知其他消费者");
notEmpty.notifyAll();
}
}
if (num == this.capacity) {
//队列不满了,通知生产者
synchronized (notFull) {
System.out.println(Thread.currentThread().getName() + " 仓库不满了,通知继续生产");
notFull.notifyAll();
}
}
return produce;
}
}
public static void main(String[] args) throws InterruptedException {
ProduceDepot depot = new ProduceDepot(10);
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
depot.produce(new Produce("produce-" + i));
i ++;
}
}
}, "produce").start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("消费产品:" + depot.consume());
}
}
}, "consume").start();
Thread.currentThread().join();
}
}
运行结果:
...
消费产品:Produce{name='produce-67303'}
consume 仓库空了,停止消费
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库有产品,通知消费者消费
produce 仓库没满,通知其他生产者
consume 仓库有产品,通知其他消费者
消费产品:Produce{name='produce-67304'}
consume 仓库有产品,通知其他消费者
消费产品:Produce{name='produce-67305'}
消费产品:Produce{name='produce-67306'}
consume 仓库空了,停止消费
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库有产品,通知消费者消费
produce 仓库没满,通知其他生产者
consume 仓库有产品,通知其他消费者
消费产品:Produce{name='produce-67307'}
consume 仓库有产品,通知其他消费者
消费产品:Produce{name='produce-67308'}
消费产品:Produce{name='produce-67309'}
consume 仓库空了,停止消费
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库有产品,通知消费者消费
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库没满,通知其他生产者
produce 仓库满了,停止生产
consume 仓库有产品,通知其他消费者
消费产品:Produce{name='produce-67310'}
consume 仓库有产品,通知其他消费者
consume 仓库不满了,通知继续生产
...
结果说明:
仓库(阻塞队列),实现的关键有两个地方,一是仓库产品数量的变更是通过AtomicInteger进行原子操作的,即库存的增加或减少都是按顺序执行的;二是对生产者和消费者进行了分组,生产者阻塞/唤醒在notFull监视器上,消费者阻塞/唤醒在notEmpty监视器上。两组线程相互独立,通过监视器进行通信,生产者工作的条件是仓库不满,停止工作的条件是仓库已满,阻塞等待在notFull对象上,换句话说就是,仓库装满了,到notFull这个地方歇着吧!当生产者把生产的产品放入仓库后,通知消费者消费;仓库空了,消费者停止工作,仓库不满的时候通知生产者。
网友评论