美文网首页Java 基础概念
Java 并发编程——生产者与消费者

Java 并发编程——生产者与消费者

作者: 极速24号 | 来源:发表于2018-11-03 10:51 被阅读8次

    1. 生产者与消费者

    1.1 程序基本实现(问题引出)

    生产者与消费者是线程操作的经典案例,即:生产者不断生产,消费者不断取走生产者生产的产品。接下来,我们以买早餐为例讲解如何实现生产者与消费者。

    假设,一个早餐店只卖下面两种套餐:

    • 煎饼果子,无糖豆浆
    • 锅盔辣子荚膜,豆腐脑

    接下来,用程序实现这个过程:

    //源码:
    //早餐
    public class Breakfast_201811011939 {
    
        private String food;
        private String drinking;
        
        public Breakfast_201811011939(){}
        
        public Breakfast_201811011939(String f, String d){
            this.food = f;
            this.drinking = d;
        }
    
        public String getFood() {
            return food;
        }
    
        public void setFood(String food) {
            this.food = food;
        }
    
        public String getDrinking() {
            return drinking;
        }
    
        public void setDrinking(String drinking) {
            this.drinking = drinking;
        }
    }
    
    //早餐店
    public class Restaurant_201811011951 implements Runnable {
    
        private Breakfast_201811011939 breakfast;
        
        public Restaurant_201811011951(Breakfast_201811011939 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            boolean isFirstSetMeal = false;
            for (int i = 0; i < 25; i++) {
                if(isFirstSetMeal){
                    this.breakfast.setFood("煎饼果子");
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    this.breakfast.setDrinking("无糖豆浆");
                    isFirstSetMeal = false;
                }else{
                    this.breakfast.setFood("锅盔辣子荚膜");
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    this.breakfast.setDrinking("豆腐脑");
                    isFirstSetMeal = true;
                }
            }
        }
    
    }
    
    //顾客
    public class Consumer_201811011958 implements Runnable {
    
        private Breakfast_201811011939 breakfast;
        
        public Consumer_201811011958(Breakfast_201811011939 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 25; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("套餐: " + breakfast.getFood() + "  " + breakfast.getDrinking() + "  " + (i+1));
            }
        }
    
    }
    
    //Main
    public class Main_201811012005 {
    
        public static void main(String[] args) {
            Breakfast_201811011939 breakfast = new Breakfast_201811011939();
            Restaurant_201811011951 restaurant = new Restaurant_201811011951(breakfast);
            Consumer_201811011958 consumer = new Consumer_201811011958(breakfast);
            new Thread(restaurant).start();
            new Thread(consumer).start();
        }
    
    }
    
    //执行结果:
    套餐: 煎饼果子  豆腐脑  1
    套餐: 锅盔辣子荚膜  无糖豆浆  2
    套餐: 煎饼果子  豆腐脑  3
    套餐: 锅盔辣子荚膜  无糖豆浆  4
    套餐: 煎饼果子  豆腐脑  5
    套餐: 锅盔辣子荚膜  无糖豆浆  6
    套餐: 煎饼果子  豆腐脑  7
    套餐: 锅盔辣子荚膜  无糖豆浆  8
    套餐: 煎饼果子  豆腐脑  9
    套餐: 锅盔辣子荚膜  无糖豆浆  10
    套餐: 煎饼果子  豆腐脑  11
    套餐: 锅盔辣子荚膜  无糖豆浆  12
    套餐: 煎饼果子  豆腐脑  13
    套餐: 锅盔辣子荚膜  无糖豆浆  14
    套餐: 煎饼果子  豆腐脑  15
    套餐: 锅盔辣子荚膜  无糖豆浆  16
    套餐: 煎饼果子  豆腐脑  17
    套餐: 锅盔辣子荚膜  无糖豆浆  18
    套餐: 煎饼果子  豆腐脑  19
    套餐: 锅盔辣子荚膜  无糖豆浆  20
    套餐: 煎饼果子  豆腐脑  21
    套餐: 锅盔辣子荚膜  无糖豆浆  22
    套餐: 煎饼果子  豆腐脑  23
    套餐: 锅盔辣子荚膜  无糖豆浆  24
    套餐: 锅盔辣子荚膜  无糖豆浆  25
    

    由执行结果可知,上面的程序有两个问题:

    • 信息错乱
      Restaurant 线程刚添加完 Food 信息还没有来得及添加 Drinking 信息,Consumer 线程就过来取 Breakfast 了,此时信息就会错乱。
    • 连续存储
      Restaurant 添加了 N 次 Breakfast 信息之后,Consumer 线程才过来取 Breakfast 或者 Consumer 线程过来取了 N 次信息之后,Restaurant 才往 Breakfast 里面添加新的信息,此时就会造成连续存取的问题。

    1.2 解决信息错乱问题——添加同步

    信息错乱明显是因为资源共享未同步造成的,所以解决方法当然是同步。

    接下来,根据分析结果对上述代码进行修改:

    //源码:
    //早餐
    public class Breakfast_201811012009 {
    
        private String food;
        private String drinking;
        
        public Breakfast_201811012009(){}
        
        public Breakfast_201811012009(String f, String d){
            this.food = f;
            this.drinking = d;
        }
        
        public synchronized void setBreakfast(String f, String d) {
            setFood(f);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            setDrinking(d);
        }
        
        public synchronized void getBreakfast(){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("套餐: " + getFood() + "  " + getDrinking());
        }
    
        public String getFood() {
            return food;
        }
    
        public void setFood(String food) {
            this.food = food;
        }
    
        public String getDrinking() {
            return drinking;
        }
    
        public void setDrinking(String drinking) {
            this.drinking = drinking;
        }
    }
    
    //早餐店
    public class Restaurant_201811012032 implements Runnable {
    
        private Breakfast_201811012009 breakfast;
        
        public Restaurant_201811012032(Breakfast_201811012009 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            boolean isFirstSetMeal = false;
            for (int i = 0; i < 25; i++) {
                if(isFirstSetMeal){
                    this.breakfast.setBreakfast("煎饼果子", "无糖豆浆" + "  " + (i+1));
                    isFirstSetMeal = false;
                }else{
                    this.breakfast.setBreakfast("锅盔辣子荚膜", "豆腐脑" + "  " + (i+1));
                    isFirstSetMeal = true;
                }
            }
        }
    
    }
    
    //顾客
    public class Consumer_201811012018 implements Runnable {
    
        private Breakfast_201811012009 breakfast;
        
        public Consumer_201811012018(Breakfast_201811012009 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 25; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.breakfast.getBreakfast();
            }
        }
    
    }
    
    //Main
    public class Main_201811012005 {
    
        public static void main(String[] args) {
            Breakfast_201811012009 breakfast = new Breakfast_201811012009();
            Consumer_201811012018 restaurant = new Consumer_201811012018(breakfast);
            Restaurant_201811012032 consumer = new Restaurant_201811012032(breakfast);
            new Thread(restaurant).start();
            new Thread(consumer).start();
        }
        
    }
    
    //执行结果:
    套餐: 锅盔辣子荚膜  豆腐脑  1
    套餐: 煎饼果子  无糖豆浆  2
    套餐: 锅盔辣子荚膜  豆腐脑  3
    套餐: 煎饼果子  无糖豆浆  4
    套餐: 锅盔辣子荚膜  豆腐脑  5
    套餐: 煎饼果子  无糖豆浆  6
    套餐: 煎饼果子  无糖豆浆  8
    套餐: 锅盔辣子荚膜  豆腐脑  9
    套餐: 煎饼果子  无糖豆浆  10
    套餐: 锅盔辣子荚膜  豆腐脑  11
    套餐: 煎饼果子  无糖豆浆  12
    套餐: 煎饼果子  无糖豆浆  16
    套餐: 锅盔辣子荚膜  豆腐脑  17
    套餐: 煎饼果子  无糖豆浆  18
    套餐: 锅盔辣子荚膜  豆腐脑  19
    套餐: 煎饼果子  无糖豆浆  20
    套餐: 锅盔辣子荚膜  豆腐脑  21
    套餐: 煎饼果子  无糖豆浆  22
    套餐: 锅盔辣子荚膜  豆腐脑  23
    套餐: 煎饼果子  无糖豆浆  24
    套餐: 锅盔辣子荚膜  豆腐脑  25
    套餐: 锅盔辣子荚膜  豆腐脑  25
    套餐: 锅盔辣子荚膜  豆腐脑  25
    套餐: 锅盔辣子荚膜  豆腐脑  25
    套餐: 锅盔辣子荚膜  豆腐脑  25
    

    由执行结果可知,经过同步处理之后,信息错乱的问题解决了,但连续存取的问题还在。

    之所以在执行结果中会漏掉某些数字,如:7,13,14,15,是因为 Restaurant 线程添加完 Breakfast 信息,Consumer 线程没有及时取走,于是信息展示的时候,漏掉了这些数字;
    之所以在执行结果中会出现连续几次数字一样的情况,如:25 连续出现了 5 次,是因为 Restaurant 线程已经执行完毕,即:此时 Breakfast 信息已经不更改,而 Consumer 线程还没有执行完毕。其实归根结底还是因为 Restaurant 线程和 Consumer 线程没有交替操作 Breakfast 对象。

    那如何才能使 Restaurant 线程和 Consumer 线程交替操作 Breakfast 对象呢?——加入等待和唤醒。

    1.3 解决连续存取问题——添加等待与唤醒

    在 Object 类中定义了线程等待(wait)与唤醒(notify)的方法:

    //Wait 线程等待
    /**
     * Causes the current thread to wait until another thread invokes the
     * {@link java.lang.Object#notify()} method or the
     * {@link java.lang.Object#notifyAll()} method for this object.
     * …
     */
    public final void wait() throws InterruptedException {
        wait(0);
    }
    
    //Notify 唤醒第一个等待的线程
    /**
     * Wakes up a single thread that is waiting on this object's
     * monitor. 
     * …
     */
    public final native void notify();
    

    那等待与唤醒如何添加呢?

    可以通过一个标志位来控制,假设标志位是一个 Boolean 变量,当标志位内容为 true 时,表示可以 Restaurant 线程可以添加 Breakfast 信息,此时如果 CPU 调度了 Consumer 线程则等待,反之亦然。

    接下来,根据分析结果对上述代码进行修改:

    //源码:
    //早餐
    public class Breakfast_201811012300 {
    
        private String food;
        private String drinking;
        private boolean canCookBreakfast = true;
        
        public Breakfast_201811012300(){}
        
        public Breakfast_201811012300(String f, String d){
            this.food = f;
            this.drinking = d;
        }
        
        public synchronized void setBreakfast(String f, String d) {
            if(!canCookBreakfast){
                try {
                    super.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            setFood(f);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            setDrinking(d);
            canCookBreakfast = false;
            super.notify();
        }
        
        public synchronized void getBreakfast(){
            if(canCookBreakfast){
                try {
                    super.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("套餐: " + getFood() + "  " + getDrinking());
            canCookBreakfast = true;
            super.notify();
        }
    
        public String getFood() {
            return food;
        }
    
        public void setFood(String food) {
            this.food = food;
        }
    
        public String getDrinking() {
            return drinking;
        }
    
        public void setDrinking(String drinking) {
            this.drinking = drinking;
        }
    }
    
    //早餐店
    public class Restaurant_201811012300 implements Runnable {
    
        private Breakfast_201811012300 breakfast;
        
        public Restaurant_201811012300(Breakfast_201811012300 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            boolean isFirstSetMeal = false;
            for (int i = 0; i < 25; i++) {
                if(isFirstSetMeal){
                    this.breakfast.setBreakfast("煎饼果子", "无糖豆浆" + "  " + (i+1));
                    isFirstSetMeal = false;
                }else{
                    this.breakfast.setBreakfast("锅盔辣子荚膜", "豆腐脑" + "  " + (i+1));
                    isFirstSetMeal = true;
                }
            }
        }
    
    }
    
    //顾客
    public class Consumer_201811012300 implements Runnable {
    
        private Breakfast_201811012300 breakfast;
        
        public Consumer_201811012300(Breakfast_201811012300 b){
            this.breakfast = b;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 25; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.breakfast.getBreakfast();
            }
        }
    
    }
    
    //Main
    public class Main_201811012005 {
    
        public static void main(String[] args) {
            Breakfast_201811012300 breakfast = new Breakfast_201811012300();
            Consumer_201811012300 restaurant = new Consumer_201811012300(breakfast);
            Restaurant_201811012300 consumer = new Restaurant_201811012300(breakfast);
            new Thread(restaurant).start();
            new Thread(consumer).start();
        }
    
    }
    
    //执行结果:
    套餐: 锅盔辣子荚膜  豆腐脑  1
    套餐: 煎饼果子  无糖豆浆  2
    套餐: 锅盔辣子荚膜  豆腐脑  3
    套餐: 煎饼果子  无糖豆浆  4
    套餐: 锅盔辣子荚膜  豆腐脑  5
    套餐: 煎饼果子  无糖豆浆  6
    套餐: 锅盔辣子荚膜  豆腐脑  7
    套餐: 煎饼果子  无糖豆浆  8
    套餐: 锅盔辣子荚膜  豆腐脑  9
    套餐: 煎饼果子  无糖豆浆  10
    套餐: 锅盔辣子荚膜  豆腐脑  11
    套餐: 煎饼果子  无糖豆浆  12
    套餐: 锅盔辣子荚膜  豆腐脑  13
    套餐: 煎饼果子  无糖豆浆  14
    套餐: 锅盔辣子荚膜  豆腐脑  15
    套餐: 煎饼果子  无糖豆浆  16
    套餐: 锅盔辣子荚膜  豆腐脑  17
    套餐: 煎饼果子  无糖豆浆  18
    套餐: 锅盔辣子荚膜  豆腐脑  19
    套餐: 煎饼果子  无糖豆浆  20
    套餐: 锅盔辣子荚膜  豆腐脑  21
    套餐: 煎饼果子  无糖豆浆  22
    套餐: 锅盔辣子荚膜  豆腐脑  23
    套餐: 煎饼果子  无糖豆浆  24
    套餐: 锅盔辣子荚膜  豆腐脑  25
    

    由上面的执行结果可知,加入等待与唤醒之后,连续存取的问题得到解决,即:Restaurant 线程添加完 Breakfast 信息之后,只有当 Consumer 线程取走 Breakfast 之后才添加新的 Breakfast 信息。当 Restaurant 线程添加完 Breakfast 信息之后,如果 CPU 又调度了一次 Restaurant 线程,这个时候,Restaurant 线程就会等待(wait),直至被 Notify(切换至 Consumer 线程,Consumer 线程取走 Breakfast 信息之后,通知(Notify)第一个等待的线程,此时 Restaurant 线程才被唤醒),当 CPU 重复调用 Consumer 线程时,处理的逻辑是一样,因此不在此赘述。


    参考文档

    1)《Java 开发实战经典》
    2)《Thinking in Java》
    3)Android Developer Document
    4)Java Tutorials

    相关文章

      网友评论

        本文标题:Java 并发编程——生产者与消费者

        本文链接:https://www.haomeiwen.com/subject/azckxqtx.html