美文网首页Java 并发编程
Java并发编程(五):生产者和消费者

Java并发编程(五):生产者和消费者

作者: yeonon | 来源:发表于2018-12-09 16:51 被阅读0次

    1 概述

    维基百科上有对“生产者和消费者模型”的名词解释:

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多进程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

    那为什么要使用生产者消费者模型呢?考虑Web场景(尤其是秒杀活动),如果同一时间请求量很大,服务端处理速率远远比不上请求速率,这样请求方(即用户)就会明显感受到很大延迟,影响用户体验,甚至可能会导致数据库被“击穿”,整个系统崩溃等。而生产者消费者模型就可以解决这个问题,加入一个中间层(即缓冲区),请求先打到中间层,服务端根据自身的处理能力从中间层“取”出请求,并处理。可能你要问了,这样干是能防止数据库被“击穿”等问题,但服务端处理能力没变,请求时间还是会比较长是吧?对,确实是这样,但对于用户来说,关键是请求开始到获得响应这段时间的大小。所以,只要请求发送到中间层成功,那么就可以给用户响应了(例如“您已成功参加活动,请到个人中心参看详情”),至于什么时候“真正”处理请求,这就要看服务端的处理能力了。

    可见,生产者消费者模型能解决并发环境下,请求方和处理方处理速率不匹配的问题,同时还将请求发和处理方“解耦”了,即请求方(生产者)不需要关心处理方(消费者)如何处理数据,只需要将数据“扔”到中间层缓冲区即可。

    下面我将介绍几种生产者和消费者模型的实现方案:

    • 使用wait()和notify()或者notifyAll()来实现
    • 使用阻塞队列BlockingQueue来实现
    • 使用信号量Semaphore来实现

    2 使用wait()和notify()或者notifyAll()来实现

    这是最基础的实现方案,当缓冲区慢或者空的时候,线程调用wait()方法进入阻塞等待状态,当生产者生产了产品(数据)或者消费者消费了产品(数据)之后,调用notify()或者notifyAll()唤醒线程,然后继续进行生产或者消费。

    下面是一个示例:

    //生产者
    public class Producer {
    
        //缓冲区
        private final int[] values;
    
        //索引
        private final AtomicInteger index;
    
        //锁对象
        private final Object lock;
    
        //产品
        private final AtomicInteger count;
    
        public Producer(int[] values, AtomicInteger index, Object lock) {
            this.values = values;
            this.index = index;
            this.lock = lock;
            count = new AtomicInteger(0);
        }
    
        public void produce() {
            //需要先加锁,否则在多线程竞争的环境下,会出现线程安全问题
            synchronized (lock) {
                //判断当前索引是否等于缓冲区大小
                if (index.get() == values.length) {
                    //如果是,就说明缓冲区满了,应该调用wait()方法进入等待阻塞状态
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //否则,就将产品放入到缓冲区中
                values[index.getAndIncrement()] = count.incrementAndGet();
                System.out.println("produce produce value : " + count.get());
                //如果之前的状态是空的状态,就说明可能会有线程被阻塞了,那么就调用notifyAll()唤醒线程
                if (index.get() == 1)
                    lock.notifyAll();
            }
        }
    }
    
    //消费者
    public class Consumer {
    
    
        private final int[] values;
    
        private final AtomicInteger index;
    
        private final Object lock;
    
        public Consumer(int[] values, AtomicInteger index, Object lock) {
            this.values = values;
            this.index = index;
            this.lock = lock;
        }
    
        public void consume() {
            //和生产者一样,需要加锁
            synchronized (lock) {
                //判断缓冲区是否为空
                if (index.get() == 0) {
                    try {
                        //如果是,那么就应该进入阻塞状态
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //否则消费该产品
                System.out.println("consumer consume : " + values[index.decrementAndGet()]);
                //如果之前缓冲器是满的状态,也许会有生产者被阻塞,那么应该尝试唤醒生产者
                if (index.get() == values.length-1)
                    lock.notifyAll();
            }
        }
    }
    
    //测试类
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
            int[] value = new int[3]; //缓冲区容量为10
            AtomicInteger index = new AtomicInteger(0); //索引从0开始
            final Object lock = new Object();
            Producer producer = new Producer(value, index, lock);
            Consumer consumer = new Consumer(value, index, lock);
            ExecutorService service = Executors.newFixedThreadPool(4);
    
            service.execute(() -> {
                for (int i = 0; i < 10; i++) {
                    producer.produce();
                }
            });
    
            service.execute(() -> {
                for (int i = 0; i < 10; i++) {
                    consumer.consume();
                }
            });
    
            service.shutdown();
            service.awaitTermination(1000, TimeUnit.SECONDS);
        }
    }
    
    

    输出如下所示(由于线程执行顺序问题,可能会不太一样):

    produce produce value : 1
    produce produce value : 2
    produce produce value : 3
    consumer consume : 3
    consumer consume : 2
    consumer consume : 1
    produce produce value : 4
    produce produce value : 5
    produce produce value : 6
    consumer consume : 6
    consumer consume : 5
    consumer consume : 4
    produce produce value : 7
    produce produce value : 8
    produce produce value : 9
    consumer consume : 9
    consumer consume : 8
    consumer consume : 7
    produce produce value : 10
    consumer consume : 10
    

    可以修改几个参数多试几次,只要不出现“程序永远不会停止”的情况就表示这样的实现应该没什么问题了。

    3 使用阻塞队列BlockingQueue来实现

    Java5添加了阻塞队列BlockingQueue接口以及几个实现,下图是该接口中几个操作:

    i5opQK.png

    每一列是一组配对的方法。例如add和remove配对使用,offer()和poll()配对使用,在使用的时候最好不用弄错了,否则会导致一些难以发现的问题。

    • 插入操作,当阻塞队列满的时候,执行插入操作,线程会被阻塞(put()方法)或者返回false(offer()方法)或抛出异常(add()方法)。
    • 移除操作,当阻塞队列为空的时候,执行移除操作,线程会被阻塞(take()方法)或者返回false(poll()方法)或抛出异常(remove()方法)。

    主要就是这两个操作,JDK中默认有几个BlockingQueue的实现,例如基于数组的ArrayBlockingQueue,基于链表的LinkedBlockingQueue,优先队列PriorityQueue,同步队列SynchronousQueue。

    下面是一个使用LinkedBlockingQueue实现生产者消费者模型的示例:

    //生产者
    public class Producer {
    
        private final BlockingQueue<Integer> blockingQueue;
    
        private final AtomicInteger count = new AtomicInteger(0);
    
        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        public void produce() {
            try {
                //直接调用put方法把产品放入队列里即可,同样不需要外部加锁
                blockingQueue.put(count.incrementAndGet());
                System.out.println("producer produce value : " + count.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //生产者
    public class Consumer {
    
        //阻塞队列
        private final BlockingQueue<Integer> blockingQueue;
    
        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        public void consume() {
            try {
                //消费就直接调用take方法即可,不再需要在外部加锁了
                System.out.println("consumer consume value : " + blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    输出结果和上面没什么差别,不多说了。

    4 使用信号量Semaphore来实现

    先简单说一下信号量Semaphore的简单使用。Semaphore是众多并发工具类之中的一员,有两个不同形式的构造函数,如下所示:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    

    参数permits表示可用许可证的数量,可是是负数,fair表示公平性。sync是AQS的实现类的一个实例,在此先不讨论AQS。除此之外,Semaphore还有两个重要且常用的操作:acquire(),release()。在Semaphore实例内部的许可证数量大于0的时候,acquire()才会成功(即不会阻塞),否则线程就会进入阻塞状态,成功之后会把许可证的数量减1。release()与其相反,调用release()没有什么要求,无论许可证数量是否大于0,都会成功调用(发生异常除外)并且把许可证数量加1(即相当于把许可证归还)。

    下面是使用Semaphore来实现生产者消费者模型:

    public class Producer {
    
        //缓冲区
        private final int[] values;
    
        //索引
        private final AtomicInteger index;
    
        //产品
        private final AtomicInteger count;
    
        //表示缓冲区未满
        private final Semaphore notFull;
    
        //表示缓冲区未空
        private final Semaphore notEmpty;
    
        //当做锁来用
        private final Semaphore mutex;
    
        public Producer(int[] values, AtomicInteger index, Semaphore notFull, Semaphore notEmpty, Semaphore mutex) {
            this.values = values;
            this.index = index;
            this.notFull = notFull;
            this.notEmpty = notEmpty;
            this.mutex = mutex;
            this.count = new AtomicInteger(0);
        }
    
        public void produce()  {
            try {
                //缓冲区不满的话,会成功,否则会阻塞
                notFull.acquire();
                //加锁
                mutex.acquire();
                //操作
                values[index.getAndIncrement()] = count.incrementAndGet();
                System.out.println("produce produce value : " + count.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放锁,顺序不要弄错了!!
                mutex.release();
                //调用notEmpty的release,表示已经非空了
                notEmpty.release();
            }
        }
    }
    
    public class Consumer {
    
        //缓冲区
        private final int[] values;
    
        //索引
        private final AtomicInteger index;
    
        private final Semaphore notFull;
    
        private final Semaphore notEmpty;
    
        private final Semaphore mutex;
    
        public Consumer(int[] values, AtomicInteger index, Semaphore notFull, Semaphore notEmpty, Semaphore mutex) {
            this.values = values;
            this.index = index;
            this.notFull = notFull;
            this.notEmpty = notEmpty;
            this.mutex = mutex;
        }
    
        public void consume() {
            try {
                //缓冲区非空的话,会成功,否则会阻塞
                notEmpty.acquire();
                mutex.acquire();
                System.out.println("consumer consume : " + values[index.decrementAndGet()]);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                mutex.release();
                //调用notFull的release,表示已经不满了
                notFull.release();
            }
        }
    }
    
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            Semaphore notFull = new Semaphore(3);
            Semaphore notEmpty = new Semaphore(0);
            Semaphore mutex = new Semaphore(1);
            int[] value = new int[3];
            AtomicInteger index = new AtomicInteger(0);
    
            Producer producer = new Producer(value, index, notFull, notEmpty, mutex);
            Consumer consumer = new Consumer(value, index, notFull, notEmpty, mutex);
            ExecutorService service = Executors.newFixedThreadPool(4);
    
            service.execute(() -> {
                for (int i = 0; i < 10; i++) {
                    producer.produce();
                }
            });
    
            service.execute(() -> {
                for (int i = 0; i < 10; i++) {
                    consumer.consume();
                }
            });
    
            service.shutdown();
            service.awaitTermination(1000, TimeUnit.SECONDS);
        }
    }
    

    解释已经在代码中写清楚了,现在可能会让人不明白的是:如何保证线程安全?Semaphore的实现是基于AQS的,本身就已经具备了同步功能,只有一个许可的信号量其实就可以当做一个锁来使用(前提是控制好release()的使用),在代码中的mutex就具备这样的功能。

    5 小结

    本文介绍了什么是“生产者消费者模型”以及为什么需要该模型,之后还介绍了在Java中实现这三种模型的方法。实际上,生产者消费者模型应用非常广,例如现在流行的消息中间件都实现了这种模型(但并不是只有这种模型),例如Kafka,RabbitMQ,ActiveMQ等,理解生产者消费者模型对理解这些消息中间件非常有帮助。

    用Java实现还算是比较简单了,如果接触过操作系统,应该都感受过被C语言支配的恐惧!

    6 参考资料

    《Java并发编程实战》

    Java实现生产者和消费者的5种方式

    相关文章

      网友评论

        本文标题:Java并发编程(五):生产者和消费者

        本文链接:https://www.haomeiwen.com/subject/agkkhqtx.html