生产者/消费者问题

作者: Java_老男孩 | 来源:发表于2019-04-13 16:17 被阅读90次

    生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信(解耦),生产者将消费者需要的资源生产出来放到缓冲区,消费者把从缓冲区把资源拿走消费。

    在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者必须等待。其他时候就是一边在生产一边在消费。值得注意的是多线程对内存缓冲区的操作时必须保证线程安全,所以需要设计锁的策略。

    使用wait和notify实现生产这消费者

    我们在详解Thread多线程一文中提到了wait和notify来实现等待通知的功能,本篇文章则继续使用它们实现一个生产者、消费者模型。

    首先我们定义一个资源的类,资源类中初始时什么都没有,最多允许存放10个资源。

    1. 当生产者调用add方法时,i+1,即代表生产出了一件资源。当生产了一个资源以后就使用notifyAll通知所有等待在此资源文件的线程。如果当资源达到10个后则所有的生产者线程进入等待状态,等待消费者线程唤醒。

    2. 当消费者调用remove方法时,i-1,即代表消费了一件资源。当消费了一个资源以后就使用notifyAll通知所有等待在此资源文件的线程。如果当资源达到0个后则所有的消费者线程进入等待状态,等待生产者线程唤醒。

    public class WaitNotifyResouce {
        private int i=0;
        private int size=10;
        public synchronized void add(){
            if(i<size){
                i++;
                System.out.println(Thread.currentThread().getName()+"号线程生产一件资源,当前资源"+i+"个");
                notifyAll();
            }else {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public synchronized void remove(){
            if(i>0){
                i--;
                System.out.println(Thread.currentThread().getName()+"号线程拿走了一件资源,当前资源"+i+"个");
                notifyAll();
            }else {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    接下来我们创建3个生产者线程、2个消费者线程持续对资源进行生产和消费。

    public class WaitNotifyProducerConsumerDemo {
        static WaitNotifyResouce waitNotifyResouce = new WaitNotifyResouce();
        static class ProducerThreadDemo extends Thread {
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    waitNotifyResouce.add();
                }
            }
        }
    
        static class ConsumerThreadDemo extends Thread {
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    waitNotifyResouce.remove();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
            Thread p1 = new Thread(new ProducerThreadDemo(), "生产者p1");
            Thread p2 = new Thread(new ProducerThreadDemo(), "生产者p2");
            Thread p3 = new Thread(new ProducerThreadDemo(), "生产者p3");
            p1.start();
            p2.start();
            p3.start();
    
            Thread c1 = new Thread(new ConsumerThreadDemo(), "消费者c1");
            Thread c2 = new Thread(new ConsumerThreadDemo(), "消费者c2");
            c1.start();
            c2.start();
        }
    }
    

    接下来程序打印的结果就像预想中一样了:

    生产者p1号线程生产一件资源,当前资源1个
    生产者p2号线程生产一件资源,当前资源2个
    生产者p3号线程生产一件资源,当前资源3个
    消费者c1号线程拿走了一件资源,当前资源2个
    消费者c2号线程拿走了一件资源,当前资源1个
    生产者p1号线程生产一件资源,当前资源2个
    生产者p3号线程生产一件资源,当前资源3个
    生产者p2号线程生产一件资源,当前资源4个
    。。。
    

    使用Condition实现生产者消费者模型

    在文章:浅谈Java中的锁:Synchronized、重入锁、读写锁 中,我们了解了 Lock和Condition,现在我们使用它们配合实现一个生产者消费者模型

    首先同样创建一个资源文件,此资源文件所有的操作跟上方的资源文件是一样的,只不过使用Lock和Condition的组合代替了synchronize。

    public class LockConditionResouce {
        private int i = 0;
        private int size = 10;
        private Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
    
        public void add() {
            lock.lock();
            try {
                if (i < size) {
                    i++;
                    System.out.println(Thread.currentThread().getName() + "号线程生产一件资源,当前资源" + i + "个");
                    condition.signalAll();
                } else {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }
    
        }
    
        public void remove() {
            lock.lock();
            try {
                if (i > 0) {
                    i--;
                    System.out.println(Thread.currentThread().getName() + "号线程拿走了一件资源,当前资源" + i + "个");
                    condition.signalAll();
                } else {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }
    
        }
    }
    

    接下来使用生产者消费者线程操作资源:

    public class LockConditionProducerConsumerDemo {
        static LockConditionResouce lockConditionResouce = new LockConditionResouce();
        static class ProducerThreadDemo extends Thread {
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lockConditionResouce.add();
                }
            }
        }
        static class ConsumerThreadDemo extends Thread {
            @Override
            public void run() {
                while (true) {
                    try {
                        sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lockConditionResouce.remove();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
            Thread p1 = new Thread(new ProducerThreadDemo(), "生产者p1");
            Thread p2 = new Thread(new ProducerThreadDemo(), "生产者p2");
            Thread p3 = new Thread(new ProducerThreadDemo(), "生产者p3");
            p1.start();
            p2.start();
            p3.start();
    
            Thread c1 = new Thread(new ConsumerThreadDemo(), "消费者c1");
            Thread c2 = new Thread(new ConsumerThreadDemo(), "消费者c2");
            c1.start();
            c2.start();
        }
    }
    

    喜欢这篇文章的朋友可以点个喜欢,也可以关注一下我的个人专题:Java成长之路

    针对于上面所涉及到的知识点我总结出了有1到5年开发经验的程序员在面试中涉及到的绝大部分架构面试题及答案做成了文档和架构视频资料免费分享给大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术资料),希望能帮助到您面试前的复习且找到一个好的工作,也节省大家在网上搜索资料的时间来学习,也可以关注我一下以后会有更多干货分享。

    资料获取方式: QQ群搜索“708-701-457” 即可免费领取



    相关文章

      网友评论

        本文标题:生产者/消费者问题

        本文链接:https://www.haomeiwen.com/subject/twchwqtx.html