美文网首页java基础学习
生产者消费者demo

生产者消费者demo

作者: 迷糊银儿 | 来源:发表于2020-02-17 21:09 被阅读0次

    *实例一
    线程通过实现接口Runnable实现
    通过wait和notify来实现生产者和消费者模式,通过synchronized同步代码块实现线程的同步操作,从而保证数据的一致性。

    //生产者类
    public class Producer implements Runnable {
        private PublicBox box;
    
        public Producer(PublicBox box) {
            this.box = box;
        }
    
        @Override
        public void run() {
            int i=0;
            while(true){
                try {
                    System.out.println("生产者序号:" + i);
                    i++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                box.increace();
            }
        }
    }
    
    //消费者类
    public class Consumer implements Runnable {
        private PublicBox box;
    
        public Consumer(PublicBox box) {
            this.box = box;
        }
    
        @Override
        public void run() {
            int i=0;
            while(true){
                try {
                    System.out.println("消费者序号" + i);
                    i++;
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
    
                box.decreace();
            }
        }
    }
    
    //仓库类
    public class PublicBox {
        private int product = 0;
    
        public synchronized void increace() {
            while (product == 5) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
            product++;
            System.out.println("产品生产成功!目前产品的存储量:"+product);
            notify();
        }
    
        public synchronized void decreace() {
            while (product == 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            product--;
            System.out.println("产品消费成功!目前产品的存储量:"+product);
            notify();
        }
    
        public static void main(String[] args) {
            PublicBox box = new PublicBox();
            Consumer con = new Consumer(box);
            Producer pro = new Producer(box);
            Thread t1 = new Thread(con);
            Thread t2 = new Thread(pro);
            t1.start();
            t2.start();
        }
    }
    

    在这里因为生产者所休眠的时间比消费者短,所以生产者出现的频率会比消费者高一些。

    1. 首先是生产者和消费者都新建了各自的序号并打印出来。
    2. 因为是消费者先启动的,所以首先访问decreace同步块,可是因为条件不符合所以被wait了。
    3. 消费者被wait之后,生产者就开始启动increace同步块生产了。生产者一生产就会调用notify方法,这个时候第二步已经被wait的线程就会被唤醒,接着执行wait之后的代码。但是这里需要注意的是并不是生产者调用notify方法,消费者就会马上被唤醒执行接下来的代码。因为唤醒和执行都需要时间,这个过程可能生产者又生成新的产品了吗,也有可能是消费者马上被执行。
    4. 之后的过程就是按照前面三步骤进行循环输出的。
      这个模式下的生产者消费者主要是通过synchronized 同步代码块来保证product这个变量的一致性。保证product变量在多个线程的调用的过程中,线程之间不会发生互相干扰,按正确的顺序执行这些过程。
    • 实例二
      线程通过继承Thread实现
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.Random;
     
    /**
     * 生产者消费者模式:使用Object.wait() / notify()方法实现
     */
    public class ProducerConsumer {
        private static final int CAPACITY = 5;
    //申请一个容量最大的仓库
        public static void main(String args[]){
            Queue<Integer> queue = new LinkedList<Integer>();
     
            Thread producer1 = new Producer("P1", queue, CAPACITY);
            Thread producer2 = new Producer("P2", queue, CAPACITY);
            Thread consumer1 = new Consumer("C1", queue, CAPACITY);
            Thread consumer2 = new Consumer("C2", queue, CAPACITY);
            Thread consumer3 = new Consumer("C3", queue, CAPACITY);
     
            producer1.start();
            producer2.start();
            consumer1.start();
            consumer2.start();
            consumer3.start();
        }
     
        /**
         * 生产者
         */
        public static class Producer extends Thread{
            private Queue<Integer> queue;
           //队列作为仓库
            String name;
            int maxSize;
            int i = 0;
     
            public Producer(String name, Queue<Integer> queue, int maxSize){
                super(name);
                this.name = name;
                this.queue = queue;
                this.maxSize = maxSize;
            }
     
            @Override
            public void run(){
                while(true){
    //while(condition)为自旋锁,为防止该线程没有收到notify()调用也从wait()中返回
    //(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全
    //地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行
    //了,自旋锁当终止条件满足时,才会停止自旋,这里设置了一直执行,直到程序手动停
    //止。
                    synchronized(queue){
                        //给队列加锁,保证线程安全
                        while(queue.size() == maxSize){
                            //当队列是满的时候,生产者线程等待,由消费者线程进行操作
                            try {
                                System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                                queue.wait();
                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        }
                        //队列不为空的时候,生产者被唤醒进行操作
                        System.out.println("[" + name + "] Producing value : +" + i);
                        queue.offer(i++);
                        //因此如果想在一个满的队列中加入一个新项,调用 add() 方法就会抛出一
                       //个 unchecked 异常,而调用 offer() 方法会返回 false
                        queue.notifyAll();
     
                        try {
                            Thread.sleep(new Random().nextInt(1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
     
            }
        }
     
        /**
         * 消费者
         */
        public static class Consumer extends Thread{
            private Queue<Integer> queue;
            String name;
            int maxSize;
     
            public Consumer(String name, Queue<Integer> queue, int maxSize){
                super(name);
                this.name = name;
                this.queue = queue;
                this.maxSize = maxSize;
            }
     
            @Override
            public void run(){
                while(true){
                    synchronized(queue){
                        while(queue.isEmpty()){
                            try {
                                //队列为空,说明没有生产者生产的商品,消费者进行等待
                                System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                                queue.wait();
                            } catch (Exception ex) {
                                ex.printStackTrace();
                            }
                        }
                        int x = queue.poll();
                        //如果队列元素为空,调用remove() 的行为与 Collection 接口的版本相似会抛出异常,这里是模拟消费者取走商品的过程
                        // 但是新的 poll() 方法在用空集合调用时只是返回 null。因此新的方法更适合容易出现异常条件的情况。
                        System.out.println("[" + name + "] Consuming value : " + x);
                        queue.notifyAll();
                         //唤醒所有队列,消费者和生产者根据队列情况进行操作
     
                        try {
                            Thread.sleep(new Random().nextInt(1000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:生产者消费者demo

        本文链接:https://www.haomeiwen.com/subject/qkiifhtx.html