美文网首页java基础学习
生产者消费者demo

生产者消费者demo

作者: 迷糊银儿 | 来源:发表于2020-02-17 21:09 被阅读0次

*实例一
线程通过实现接口Runnable实现
通过wait和notify来实现生产者和消费者模式,通过synchronized同步代码块实现线程的同步操作,从而保证数据的一致性。

//生产者类
public class Producer implements Runnable {
    private PublicBox box;

    public Producer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            try {
                System.out.println("生产者序号:" + i);
                i++;
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            box.increace();
        }
    }
}

//消费者类
public class Consumer implements Runnable {
    private PublicBox box;

    public Consumer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        int i=0;
        while(true){
            try {
                System.out.println("消费者序号" + i);
                i++;
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO: handle exception
                e.printStackTrace();
            }

            box.decreace();
        }
    }
}

//仓库类
public class PublicBox {
    private int product = 0;

    public synchronized void increace() {
        while (product == 5) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        product++;
        System.out.println("产品生产成功!目前产品的存储量:"+product);
        notify();
    }

    public synchronized void decreace() {
        while (product == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        product--;
        System.out.println("产品消费成功!目前产品的存储量:"+product);
        notify();
    }

    public static void main(String[] args) {
        PublicBox box = new PublicBox();
        Consumer con = new Consumer(box);
        Producer pro = new Producer(box);
        Thread t1 = new Thread(con);
        Thread t2 = new Thread(pro);
        t1.start();
        t2.start();
    }
}

在这里因为生产者所休眠的时间比消费者短,所以生产者出现的频率会比消费者高一些。

  1. 首先是生产者和消费者都新建了各自的序号并打印出来。
  2. 因为是消费者先启动的,所以首先访问decreace同步块,可是因为条件不符合所以被wait了。
  3. 消费者被wait之后,生产者就开始启动increace同步块生产了。生产者一生产就会调用notify方法,这个时候第二步已经被wait的线程就会被唤醒,接着执行wait之后的代码。但是这里需要注意的是并不是生产者调用notify方法,消费者就会马上被唤醒执行接下来的代码。因为唤醒和执行都需要时间,这个过程可能生产者又生成新的产品了吗,也有可能是消费者马上被执行。
  4. 之后的过程就是按照前面三步骤进行循环输出的。
    这个模式下的生产者消费者主要是通过synchronized 同步代码块来保证product这个变量的一致性。保证product变量在多个线程的调用的过程中,线程之间不会发生互相干扰,按正确的顺序执行这些过程。
  • 实例二
    线程通过继承Thread实现
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
 
/**
 * 生产者消费者模式:使用Object.wait() / notify()方法实现
 */
public class ProducerConsumer {
    private static final int CAPACITY = 5;
//申请一个容量最大的仓库
    public static void main(String args[]){
        Queue<Integer> queue = new LinkedList<Integer>();
 
        Thread producer1 = new Producer("P1", queue, CAPACITY);
        Thread producer2 = new Producer("P2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);
 
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }
 
    /**
     * 生产者
     */
    public static class Producer extends Thread{
        private Queue<Integer> queue;
       //队列作为仓库
        String name;
        int maxSize;
        int i = 0;
 
        public Producer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
 
        @Override
        public void run(){
            while(true){
//while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
//(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
//地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
//了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
//止。
                synchronized(queue){
                    //给队列加锁,保证线程安全
                    while(queue.size() == maxSize){
                        //当队列是满的时候,生产者线程等待,由消费者线程进行操作
                        try {
                            System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    //队列不为空的时候,生产者被唤醒进行操作
                    System.out.println("[" + name + "] Producing value : +" + i);
                    queue.offer(i++);
                    //因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一
                   //个 unchecked 异常,而调用 offer() 方法会返回 false
                    queue.notifyAll();
 
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
 
        }
    }
 
    /**
     * 消费者
     */
    public static class Consumer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;
 
        public Consumer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }
 
        @Override
        public void run(){
            while(true){
                synchronized(queue){
                    while(queue.isEmpty()){
                        try {
                            //队列为空,说明没有生产者生产的商品,消费者进行等待
                            System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    int x = queue.poll();
                    //如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程
                    // 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。
                    System.out.println("[" + name + "] Consuming value : " + x);
                    queue.notifyAll();
                     //唤醒所有队列,消费者和生产者根据队列情况进行操作
 
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

相关文章

网友评论

    本文标题:生产者消费者demo

    本文链接:https://www.haomeiwen.com/subject/qkiifhtx.html