美文网首页工作生活
生产者与消费者封装

生产者与消费者封装

作者: 吕檀溪 | 来源:发表于2019-07-01 14:18 被阅读0次

    概要

    生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产者生产一个,消费者取出一个。这样进行循环。

    封装理念

    单纯的说生产者与消费我也不能说得很清楚,就拿生活中的事情来举个例子。某东上销售IPhone,首先商品有一定的库存,而库存怎么来的呢?就是IPhone的代理工厂生产的,也许一个,也许多个,这样就形成了我们的生产者。很多用户都去购买IPhone,这个肯定是我们的消费者了。如果没有货,消费者就需要等待代理工厂为某东供货。如果下架了该商品,那么代理工厂就停止生产,消费者买完了库存中的IPhone也就不能再购买了。

    实战

    1. 生产线,管理生产者和消费者。包括了注册和移除等相关操作
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 生产线,管理生产者和消费者
     *
     * @author melon
     * @version 1.0
     * @since JDK1.8
     */
    public class PipeLine<T> {
        /**
         * 存储生产者与生产者线程
         */
        private Map<Producer<T>, Thread> producers;
    
        /**
         * 存储消费者与消费者线程
         */
        private Map<Consumer<T>, Thread> consumers;
    
        /**
         * 重入锁
         */
        private Lock lock;
    
        /**
         * 等待计数器
         */
        private CountDownLatch latch;
    
        /**
         * 标志产线是否正在运行了
         */
        private volatile boolean started;
    
        /**
         * 阻塞队列容量
         */
        private int capacity;
    
        /**
         * 阻塞队列
         */
        private BlockingQueue<T> queue;
    
    
        /**
         * @param capacity 阻塞队列容量
         */
        public PipeLine(int capacity) {
            this.queue = new ArrayBlockingQueue<T>(capacity);
            this.producers = new ConcurrentHashMap<>();
            this.consumers = new ConcurrentHashMap<>();
            this.lock = new ReentrantLock();
            this.capacity = capacity;
        }
    
        /**
         * 注册生产者{@link com.melon.other.pipeline.Producer}
         *
         * @param producer 生产者
         * @return 生产线
         */
        public PipeLine<T> registProducer(Producer<T> producer) {
            this.lock.lock();
            try {
                producer.bindPipeLine(this);
                this.producers.put(producer, new Thread(producer));
            } finally {
                this.lock.unlock();
            }
            System.out.println(String.format("Regist producer[%s],count %s", producer.getId(), this.producers.size()));
            return this;
        }
    
        /**
         * 移除生产者
         *
         * @param producer 生产者
         */
        void unregistProducer(Producer<T> producer) {
            lock.lock();
            try {
                producers.remove(producer);
                System.out.println(String.format("Unregist producer[%s], count %s", producer.getId(), producers.size()));
            } finally {
                latch.countDown();
                lock.unlock();
            }
        }
    
    
        /**
         * 注册消费者{@link com.melon.other.pipeline.Consumer}
         *
         * @param consumer 消费者
         * @return 生产线
         */
        public PipeLine<T> registConsumer(Consumer<T> consumer) {
            this.lock.lock();
            try {
                consumer.bindPipeLine(this);
                this.consumers.put(consumer, new Thread(consumer));
            } finally {
                this.lock.unlock();
            }
            System.out.println(String.format("Regist consumer[%s],count %s", consumer.getId(), this.consumers.size()));
            return this;
        }
    
        /**
         * 移除消费者
         *
         * @param consumer 消费者
         */
        void unregistConsumer(Consumer<T> consumer) {
            lock.lock();
            try {
                consumers.remove(consumer);
                System.out.println(String.format("Unregist consumer[%s], count %s", consumer.getId(), consumers.size()));
            } finally {
                latch.countDown();
                lock.unlock();
            }
        }
    
        /**
         * 生产者向产线添加产品
         *
         * @param t 产品
         * @throws InterruptedException 异常
         */
        void push(T t) throws InterruptedException {
            if (queue.size() == capacity)
                System.out.println("【PipeLine full】");
            queue.put(t);
        }
    
        /**
         * 消费者从队列中拿出一个产品
         *
         * @return 产品
         * @throws InterruptedException 异常
         */
        T poll() throws InterruptedException {
            return queue.poll(1, TimeUnit.SECONDS);
        }
    
        /**
         * 获取队列容量
         *
         * @return int
         */
        public int size() {
            return queue.size();
        }
    
        /**
         * 获取生产者数量
         *
         * @return int
         */
        public int getProducerCount() {
            lock.lock();
            try {
                if (!queue.isEmpty()) //如果还有产品就表示还有生产者
                    return 1;
                return producers.size();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 启动产线
         *
         * @return 产线
         */
        public PipeLine<T> start() {
            lock.lock();
            try {
                if (started) {
                    System.out.println("PipeLine has been started!");
                    return this;
                }
                //计数器总共的个数是生产者和消费者的综合
                latch = new CountDownLatch(producers.size() + consumers.size());
                //循环启动生产者
                for(Thread producer:producers.values()){
                    producer.start();
                }
                //循环启动消费者
                for(Thread consumer:consumers.values()){
                    consumer.start();
                }
                //切换标识
                started = true;
                return this;
            } finally {
                lock.unlock();
            }
        }
    
        public void await() throws InterruptedException {
            latch.await();
        }
    
    }
    
    
    1. 生产者
    /**
     * 生产者
     *
     * @author melon
     * @version 1.0
     * @since JDK1.8
     */
    public abstract class Producer<T> implements Runnable {
    
        /**
         * 生产线{@link com.melon.other.pipeline.PipeLine}
         */
        private PipeLine<T> pipeLine;
    
    
        @Override
        public void run() {
            try {
                for (; ; ) {
                    T t = this.produce();
                    if (t == null) { //生产者返回空,就表示这个线程停止生产,需要在finally移除当前的生产者
                        break;
                    }
                    pipeLine.push(t);
                    System.out.println(String.format("Producer[%s] produced,PipeLine size: %s", this.getId(), this.pipeLine.size()));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                pipeLine.unregistProducer(this);
            }
        }
    
        /**
         * 绑定生产线
         *
         * @param pipeLine 生产线
         */
        void bindPipeLine(PipeLine<T> pipeLine) {
            this.pipeLine = pipeLine;
        }
    
        /**
         * 获取当前生产者的线程id
         *
         * @return id - long
         */
        protected long getId() {
            return Thread.currentThread().getId();
        }
    
        /**
         * 生产产品
         *
         * @return T
         * @throws Exception 异常
         */
        protected abstract T produce() throws Exception;
    
    }
    
    
    1. 消费者
    /**
     * 消费者
     *
     * @author melon
     * @version 1.0
     * @since JDK1.8
     */
    public abstract class Consumer<T> implements Runnable {
    
        /**
         * 生产线{@link com.melon.other.pipeline.PipeLine}
         */
        private PipeLine<T> pipeLine;
    
        @Override
        public void run() {
            try {
                for (; ; ) {
                    T t = pipeLine.poll(); //拿一个产品
                    if (t == null && pipeLine.getProducerCount() == 0) { //如果产品没有拿到,并且没有生产者了,那么就要移除消费者
                        System.out.println(String.format("【Consumer[%s] hungry】", getId()));
                        break;
                    }
                    consume(t);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                pipeLine.unregistConsumer(this);
            }
        }
    
        /**
         * 绑定生产线
         *
         * @param pipeLine 生产线
         */
        void bindPipeLine(PipeLine<T> pipeLine) {
            this.pipeLine = pipeLine;
        }
    
        /**
         * 获取当前消费者的线程id
         *
         * @return id - long
         */
        protected long getId() {
            return Thread.currentThread().getId();
        }
    
        /**
         * 消费
         *
         * @param t 产品
         * @throws Exception 异常
         */
        protected abstract void consume(T t) throws Exception;
    }
    
    
    1. 测试
      
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author melon
     * @version 1.0
     * @since JDK1.8
     */
    public class PipeT {
        private static AtomicInteger num = new AtomicInteger();
    
        public static void main(String[] args) {
            new PipeLine<String>(10)
                    .registProducer(new P())
                    .registProducer(new P())
                    .registConsumer(new C())
                    .registConsumer(new C())
                    .registConsumer(new C())
                    .start();
        }
    
        public static class P extends Producer<String> {
    
            @Override
            protected String produce() {
                if (num.get() >= 50) {
                    return null;
                }
                return getId() + " this " + num.getAndIncrement();
            }
        }
    
        public static class C extends Consumer<String> {
    
            @Override
            protected void consume(String s) {
                System.out.println(s + ">>" + getId());
            }
        }
    }
    
    1. 运行结果
    Regist producer[1],count 1
    Regist producer[1],count 2
    Regist consumer[1],count 1
    Regist consumer[1],count 2
    Regist consumer[1],count 3
    Producer[10] produced,PipeLine size: 1
    Producer[10] produced,PipeLine size: 2
    Producer[10] produced,PipeLine size: 3
    Producer[10] produced,PipeLine size: 4
    Producer[10] produced,PipeLine size: 5
    Producer[10] produced,PipeLine size: 6
    Producer[10] produced,PipeLine size: 7
    Producer[10] produced,PipeLine size: 8
    Producer[10] produced,PipeLine size: 9
    Producer[10] produced,PipeLine size: 10
    【PipeLine full】
    10 this 0>>14
    10 this 2>>14
    10 this 3>>14
    10 this 4>>14
    10 this 5>>14
    10 this 6>>14
    10 this 7>>14
    10 this 8>>14
    10 this 9>>14
    10 this 10>>14
    Producer[10] produced,PipeLine size: 9
    10 this 11>>14
    10 this 1>>12
    Producer[10] produced,PipeLine size: 1
    10 this 12>>14
    Producer[10] produced,PipeLine size: 1
    10 this 13>>13
    Producer[10] produced,PipeLine size: 1
    10 this 14>>12
    Producer[10] produced,PipeLine size: 1
    10 this 15>>14
    Producer[10] produced,PipeLine size: 1
    10 this 16>>13
    Producer[10] produced,PipeLine size: 1
    10 this 17>>12
    Producer[10] produced,PipeLine size: 1
    10 this 18>>14
    Producer[10] produced,PipeLine size: 1
    10 this 19>>13
    11 this 20>>12
    Producer[11] produced,PipeLine size: 1
    11 this 21>>14
    Producer[11] produced,PipeLine size: 1
    11 this 22>>13
    Producer[11] produced,PipeLine size: 1
    11 this 23>>12
    Producer[11] produced,PipeLine size: 1
    11 this 24>>14
    Producer[11] produced,PipeLine size: 0
    11 this 25>>13
    Producer[11] produced,PipeLine size: 1
    11 this 26>>12
    Producer[11] produced,PipeLine size: 1
    11 this 27>>14
    Producer[11] produced,PipeLine size: 1
    Producer[10] produced,PipeLine size: 1
    11 this 28>>13
    10 this 29>>13
    Producer[10] produced,PipeLine size: 1
    Producer[11] produced,PipeLine size: 1
    Producer[10] produced,PipeLine size: 1
    Producer[11] produced,PipeLine size: 2
    Producer[10] produced,PipeLine size: 3
    Producer[11] produced,PipeLine size: 4
    10 this 30>>14
    11 this 31>>12
    10 this 32>>12
    Producer[10] produced,PipeLine size: 5
    11 this 35>>14
    10 this 36>>14
    10 this 34>>13
    Producer[10] produced,PipeLine size: 1
    11 this 33>>12
    10 this 37>>14
    Producer[11] produced,PipeLine size: 0
    Producer[10] produced,PipeLine size: 1
    11 this 38>>13
    Producer[11] produced,PipeLine size: 1
    10 this 39>>14
    11 this 40>>12
    Producer[10] produced,PipeLine size: 1
    Producer[11] produced,PipeLine size: 1
    Producer[10] produced,PipeLine size: 1
    Producer[11] produced,PipeLine size: 2
    Producer[10] produced,PipeLine size: 3
    Producer[11] produced,PipeLine size: 4
    Producer[10] produced,PipeLine size: 5
    Producer[11] produced,PipeLine size: 6
    Producer[10] produced,PipeLine size: 7
    Producer[11] produced,PipeLine size: 8
    Producer[10] produced,PipeLine size: 9
    10 this 41>>13
    11 this 44>>13
    10 this 45>>13
    11 this 46>>13
    10 this 47>>13
    11 this 48>>13
    10 this 49>>13
    10 this 43>>12
    11 this 42>>14
    Unregist producer[11], count 1
    Unregist producer[10], count 0
    【Consumer[13] hungry】
    【Consumer[12] hungry】
    Unregist consumer[13], count 2
    【Consumer[14] hungry】
    Unregist consumer[12], count 1
    Unregist consumer[14], count 0
    

    相关文章

      网友评论

        本文标题:生产者与消费者封装

        本文链接:https://www.haomeiwen.com/subject/bqegcctx.html