*实例一
线程通过实现接口Runnable实现
通过wait和notify来实现生产者和消费者模式,通过synchronized同步代码块实现线程的同步操作,从而保证数据的一致性。
//生产者类
public class Producer implements Runnable {
private PublicBox box;
public Producer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
int i=0;
while(true){
try {
System.out.println("生产者序号:" + i);
i++;
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
box.increace();
}
}
}
//消费者类
public class Consumer implements Runnable {
private PublicBox box;
public Consumer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
int i=0;
while(true){
try {
System.out.println("消费者序号" + i);
i++;
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
box.decreace();
}
}
}
//仓库类
public class PublicBox {
private int product = 0;
public synchronized void increace() {
while (product == 5) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product++;
System.out.println("产品生产成功!目前产品的存储量:"+product);
notify();
}
public synchronized void decreace() {
while (product == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product--;
System.out.println("产品消费成功!目前产品的存储量:"+product);
notify();
}
public static void main(String[] args) {
PublicBox box = new PublicBox();
Consumer con = new Consumer(box);
Producer pro = new Producer(box);
Thread t1 = new Thread(con);
Thread t2 = new Thread(pro);
t1.start();
t2.start();
}
}
在这里因为生产者所休眠的时间比消费者短,所以生产者出现的频率会比消费者高一些。
- 首先是生产者和消费者都新建了各自的序号并打印出来。
- 因为是消费者先启动的,所以首先访问decreace同步块,可是因为条件不符合所以被wait了。
- 消费者被wait之后,生产者就开始启动increace同步块生产了。生产者一生产就会调用notify方法,这个时候第二步已经被wait的线程就会被唤醒,接着执行wait之后的代码。但是这里需要注意的是并不是生产者调用notify方法,消费者就会马上被唤醒执行接下来的代码。因为唤醒和执行都需要时间,这个过程可能生产者又生成新的产品了吗,也有可能是消费者马上被执行。
- 之后的过程就是按照前面三步骤进行循环输出的。
这个模式下的生产者消费者主要是通过synchronized 同步代码块来保证product这个变量的一致性。保证product变量在多个线程的调用的过程中,线程之间不会发生互相干扰,按正确的顺序执行这些过程。
- 实例二
线程通过继承Thread实现
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
/**
* 生产者消费者模式:使用Object.wait() / notify()方法实现
*/
public class ProducerConsumer {
private static final int CAPACITY = 5;
//申请一个容量最大的仓库
public static void main(String args[]){
Queue<Integer> queue = new LinkedList<Integer>();
Thread producer1 = new Producer("P1", queue, CAPACITY);
Thread producer2 = new Producer("P2", queue, CAPACITY);
Thread consumer1 = new Consumer("C1", queue, CAPACITY);
Thread consumer2 = new Consumer("C2", queue, CAPACITY);
Thread consumer3 = new Consumer("C3", queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
/**
* 生产者
*/
public static class Producer extends Thread{
private Queue<Integer> queue;
//队列作为仓库
String name;
int maxSize;
int i = 0;
public Producer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
//while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
//(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
//地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
//了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
//止。
synchronized(queue){
//给队列加锁,保证线程安全
while(queue.size() == maxSize){
//当队列是满的时候,生产者线程等待,由消费者线程进行操作
try {
System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
//队列不为空的时候,生产者被唤醒进行操作
System.out.println("[" + name + "] Producing value : +" + i);
queue.offer(i++);
//因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一
//个 unchecked 异常,而调用 offer() 方法会返回 false
queue.notifyAll();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 消费者
*/
public static class Consumer extends Thread{
private Queue<Integer> queue;
String name;
int maxSize;
public Consumer(String name, Queue<Integer> queue, int maxSize){
super(name);
this.name = name;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run(){
while(true){
synchronized(queue){
while(queue.isEmpty()){
try {
//队列为空,说明没有生产者生产的商品,消费者进行等待
System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
int x = queue.poll();
//如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程
// 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。
System.out.println("[" + name + "] Consuming value : " + x);
queue.notifyAll();
//唤醒所有队列,消费者和生产者根据队列情况进行操作
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
网友评论