美文网首页
生产者消费者模式

生产者消费者模式

作者: Ethan_Walker | 来源:发表于2018-05-11 18:54 被阅读8次

    1. Object 类作为锁,缓存队列>1

    package lock91_reentrant_procon;
    
    import java.io.ObjectInputStream;
    
    /**
     * Created by lenovo on 2018/5/11.
     */
    public class CommonService {
        private Product[] buffer;
    
        public CommonService(int length) {
            buffer = new Product[length];
        }
    
        private int nowCount = 0;
        private int dataBeginIndex = 0;
        private int dataInsertIndex = 0;
        private Object lock = new Object();
    
        public void producer(Product product) {
            synchronized (lock) {
                try {
                    while (nowCount == buffer.length) {
                        System.out.println("producer: " + Thread.currentThread().getName() + "等待...");
                        lock.wait();
                    }
    
                    buffer[dataInsertIndex] = product;
                    System.out.println("producer: " + Thread.currentThread().getName() + "插入的索引为:" + dataInsertIndex + ",剩余产品:" + (nowCount + 1));
    
                    lock.notify();
    
                    dataInsertIndex = (dataInsertIndex + 1) % buffer.length;
                    nowCount++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        public Product consumer() {
            Product product = null;
            synchronized (lock) {
                try {
                    while (nowCount == 0) {
                        System.out.println("consumer: " + Thread.currentThread().getName() + "等待...");
                        lock.wait();
                    }
                    product = buffer[dataBeginIndex];
                    lock.notify();
    
                    System.out.println("consumer: " + Thread.currentThread().getName() + "消费的索引为:" + dataBeginIndex + ",剩余产品数量:" + (nowCount - 1));
                    dataBeginIndex = (dataBeginIndex + 1) % buffer.length;
                    nowCount--;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return product;
        }
    
    }
    package lock91_reentrant_procon;
    
    import java.io.Serializable;
    import java.util.Date;
    
    /**
     * Created by lenovo on 2018/5/11.
     */
    public class CommonMain {
        public static void main(String[] args) throws InterruptedException {
            CommonService commonService = new CommonService(15);
    
            Runnable consumerRunnable = new Runnable() {
                @Override
                public void run() {
                    commonService.consumer();
                }
            };
    
            Runnable producerRunnable = new Runnable() {
                @Override
                public void run() {
                    Product product = new Product();
                    product.setName("产品名(" + Thread.currentThread().getName() + ")");
                    commonService.producer(product);
                }
            };
            Thread[] consumer = new Thread[50];
            Thread[] producer = new Thread[50];
            for (int i = 0; i < 50; i++) {
                consumer[i] = new Thread(consumerRunnable);
                producer[i] = new Thread(producerRunnable);
                consumer[i].setName("consumer" + i);
                producer[i].setName("producer" + i);
    
            }
            for (int i = 0; i < 25; i++) {
                producer[i].start();
            }
            for (int i = 0; i < 25; i++) {
                consumer[i].start();
            }
            for (int i = 25; i < 50; i++) {
                producer[i].start();
            }
            for (int i = 25; i < 50; i++) {
                consumer[i].start();
            }
    
    
            Thread.sleep(5000);
        }
    
    }
    
    
    

    输出

    producer: producer0插入的索引为:0,剩余产品:1
    producer: producer33插入的索引为:1,剩余产品:2
    producer: producer2插入的索引为:2,剩余产品:3
    producer: producer3插入的索引为:3,剩余产品:4
    producer: producer4插入的索引为:4,剩余产品:5
    producer: producer5插入的索引为:5,剩余产品:6
    producer: producer6插入的索引为:6,剩余产品:7
    producer: producer7插入的索引为:7,剩余产品:8
    producer: producer8插入的索引为:8,剩余产品:9
    producer: producer9插入的索引为:9,剩余产品:10
    producer: producer10插入的索引为:10,剩余产品:11
    producer: producer11插入的索引为:11,剩余产品:12
    producer: producer12插入的索引为:12,剩余产品:13
    producer: producer13插入的索引为:13,剩余产品:14
    producer: producer14插入的索引为:14,剩余产品:15
    producer: producer15等待...
    producer: producer16等待...
    producer: producer17等待...
    

    2. ReentrantLock 和 Condition 实现

    package lock91_reentrant_procon;
    
    import java.io.Serializable;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Created by lenovo on 2018/5/11.
     */
    public class Service {
    
        private ReentrantLock lock = new ReentrantLock();
        private Condition consumerCondition = lock.newCondition();
        private Condition producerCondition = lock.newCondition();
    
        private Product[] buffer;
        private int nowCount = 0; //
        private int dataBeginIndex = 0; //第一条数据在 buffer中的索引,初始时为0,当生产者生产第一个时,才会调用
        private int dataInsertIndex = 0;   // 插入数据应放置的索引位置
    
        public Service(int bufferLength) {
            buffer = new Product[bufferLength]; // 指定缓存大小
        }
    
        public Product consumer() {
            Product product = null;
            try {
                lock.lock();
    //            System.out.println(Thread.currentThread().getName() + "--start-------------------");
                while (nowCount == 0) {
                    System.out.println(Thread.currentThread().getName() + " 等待...");
                    consumerCondition.await();
                }
                producerCondition.signal();
    
                product = buffer[dataBeginIndex];
                System.out.println(Thread.currentThread().getName() + "消费的索引为 : " + dataBeginIndex + ",剩余" + (nowCount - 1));
    
                dataBeginIndex = (dataBeginIndex + 1) % buffer.length;
                nowCount--;
    
            } catch (InterruptedException i) {
                i.printStackTrace();
            } finally {
    //            System.out.println(Thread.currentThread().getName() + "--end---------------------");
                lock.unlock();
            }
            return product;
    
        }
    
        public void producer(Product product) {
            try {
                lock.lock();
    //            System.out.println(Thread.currentThread().getName() + "--start-------------------");
                while (nowCount == buffer.length) {
                    System.out.println(Thread.currentThread().getName() + " 等待...");
                    producerCondition.await();
                }
                consumerCondition.signal();
                
                System.out.println(Thread.currentThread().getName() + " 插入的索引为 : " + dataInsertIndex + ", 剩余 " + (nowCount + 1));
    
                buffer[dataInsertIndex] = product;
                dataInsertIndex = (dataInsertIndex + 1) % buffer.length;
    
                nowCount++;
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
    //            System.out.println(Thread.currentThread().getName() + "--end---------------------");
                lock.unlock();
            }
        }
    
    }
    
    package lock91_reentrant_procon;
    
    /**
     * Created by lenovo on 2018/5/11.
     */
    public class Product {
        private String name;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "Product{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    package lock91_reentrant_procon;
    
    import reentrant.Run;
    
    import java.util.Date;
    
    /**
     * Created by lenovo on 2018/5/11.
     */
    public class Main {
        public static void main(String[] args) {
            Service service = new Service(20);
           /*
           单一生产者、消费者,只有一个生产者线程、消费者线程,每个生产者/消费者可以生产/消费多次
           Thread consumer = new Thread(() -> {
                while (true) {
                    Product product = service.consumer();
                    System.out.println("消费的产品为: "+product.getName());
                }
            });
            Thread producer = new Thread(() -> {
                int count = 1;
                while (true) {
                    Product product = new Product();
                    product.setName("产品" + ++count);
                    service.producer(product);
                }
            });
    
            consumer.start();
            producer.start();
            */
    
            Runnable consumerRunnable = new Runnable() {
                @Override
                public void run() {
                    Product product = service.consumer();
                    System.out.println(Thread.currentThread().getName()+" 消费产品->"+product.getName());
                }
            };
            Runnable producerRunnable = new Runnable() {
                @Override
                public void run() {
                    Product product = new Product();
                    product.setName("product of " + Thread.currentThread().getName());
                    service.producer(product);
                }
            };
    
            Thread[] consumers = new Thread[50];
            Thread[] producers = new Thread[50];
            for (int i = 0; i < 50; i++) {
                consumers[i] = new Thread(consumerRunnable);
                producers[i] = new Thread(producerRunnable);
                consumers[i].setName("consumer" + i);
                producers[i].setName("producer" + i);
            }
    
            for (int i = 0; i < 50; i++) {
                producers[i].start();
                consumers[i].start();
            }
    
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    输出

    consumer10消费的索引为 : 8,剩余0
    consumer10 消费产品->product of producer9
    producer11 插入的索引为 : 9, 剩余 1
    consumer11消费的索引为 : 9,剩余0
    consumer11 消费产品->product of producer11
    producer12 插入的索引为 : 10, 剩余 1
    consumer12消费的索引为 : 10,剩余0
    consumer12 消费产品->product of producer12
    consumer13 等待...
    producer14 插入的索引为 : 11, 剩余 1
    consumer13消费的索引为 : 11,剩余0
    consumer13 消费产品->product of producer14
    consumer14 等待...
    consumer15 等待...
    producer16 插入的索引为 : 12, 剩余 1
    consumer17消费的索引为 : 12,剩余0
    consumer14 等待...
    consumer17 消费产品->product of producer16
    producer17 插入的索引为 : 13, 剩余 1
    

    相关文章

      网友评论

          本文标题:生产者消费者模式

          本文链接:https://www.haomeiwen.com/subject/ajxfdftx.html