美文网首页
生产者消费者问题

生产者消费者问题

作者: Catcher07 | 来源:发表于2018-07-18 10:06 被阅读0次

    一、概念

    生产者-消费者模式是多线程并发协作的经典模式。所谓的生产者-消费者问题,实际上包含两类的线程, 一种是生产者线程用于生产数据,另外一种的消费者线程用于消费数据。为了解耦生产者和消费者之间的关系,通常采用共享内存的方式(共享数据区域)。生产者只需要把生产的数据放到共享数据区域,而不需要关心消费者的行为。消费者只需要到共享数据区域取数据,而不需要关心生产者的行为。共享数据区域应该要具备以下线程并发协作的功能。

    • 如果共享数据区域满了,阻塞生产者继续生产数据放入其中。
    • 如果共享数据区域空了,阻塞消费者继续消费数据。

    在JAVA中,实现生产者消费者问题,可以采用以下三种方式。

    • 使用Object的wait/notify的消息通知机制;
    • 使用Lock的Condition的await和signal的消息通知机制。
    • 使用BlockingQueue实现。

    二、wait/notify的消息通知机制

    wait/notifyAll实现生产者-消费者:

    public class ProductorConsumer {
    
    
    public static void main(String[] args) {
    
        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8));
        }
    
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList));
        }
    
    }
    
    static class Productor implements Runnable {
    
        private List<Integer> list;
        private int maxLength;
    
        public Productor(List list, int maxLength) {
            this.list = list;
            this.maxLength = maxLength;
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.size() == maxLength) {
                            System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                            list.wait();
                            System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                        list.add(i);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    }
    
    
    static class Consumer implements Runnable {
    
        private List<Integer> list;
    
        public Consumer(List list) {
            this.list = list;
        }
    
        @Override
        public void run() {
            while (true) {
                synchronized (list) {
                    try {
                        while (list.isEmpty()) {
                            System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                            list.wait();
                            System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                        }
                        Integer element = list.remove(0);
                        System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                        list.notifyAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    }
    

    三、await/signalAll

    使用Lock中Condition的await/signalAll实现生产者-消费者

    public class ProductorConsumer {
    
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();
    
    public static void main(String[] args) {
        LinkedList linkedList = new LinkedList();
        ExecutorService service = Executors.newFixedThreadPool(15);
        for (int i = 0; i < 5; i++) {
            service.submit(new Productor(linkedList, 8, lock));
        }
        for (int i = 0; i < 10; i++) {
            service.submit(new Consumer(linkedList, lock));
        }
    
    }
    
    static class Productor implements Runnable {
    
        private List<Integer> list;
        private int maxLength;
        private Lock lock;
    
        public Productor(List list, int maxLength, Lock lock) {
            this.list = list;
            this.maxLength = maxLength;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.size() == maxLength) {
                        System.out.println("生产者" + Thread.currentThread().getName() + "  list以达到最大容量,进行wait");
                        full.await();
                        System.out.println("生产者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Random random = new Random();
                    int i = random.nextInt();
                    System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);
                    list.add(i);
                    empty.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    
    
    static class Consumer implements Runnable {
    
        private List<Integer> list;
        private Lock lock;
    
        public Consumer(List list, Lock lock) {
            this.list = list;
            this.lock = lock;
        }
    
        @Override
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (list.isEmpty()) {
                        System.out.println("消费者" + Thread.currentThread().getName() + "  list为空,进行wait");
                        empty.await();
                        System.out.println("消费者" + Thread.currentThread().getName() + "  退出wait");
                    }
                    Integer element = list.remove(0);
                    System.out.println("消费者" + Thread.currentThread().getName() + "  消费数据:" + element);
                    full.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    
    }
    

    四、BlockingQueue

    使用BlockingQueue实现生产者-消费者

    public class ProductorConsumer {
    
        private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newFixedThreadPool(15);
            for (int i = 0; i < 5; i++) {
                service.submit(new Productor(queue));
            }
            for (int i = 0; i < 10; i++) {
                service.submit(new Consumer(queue));
            }
        }
    
    
        static class Productor implements Runnable {
    
            private BlockingQueue queue;
    
            public Productor(BlockingQueue queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        Random random = new Random();
                        int i = random.nextInt();
                        System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
                        queue.put(i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class Consumer implements Runnable {
            private BlockingQueue queue;
    
            public Consumer(BlockingQueue queue) {
                this.queue = queue;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        Integer element = (Integer) queue.take();
                        System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    参考文章

    https://juejin.im/post/5aeec675f265da0b7c072c56

    相关文章

      网友评论

          本文标题:生产者消费者问题

          本文链接:https://www.haomeiwen.com/subject/hbanpftx.html