美文网首页工作生活
生产者与消费者封装

生产者与消费者封装

作者: 吕檀溪 | 来源:发表于2019-07-01 14:18 被阅读0次

概要

生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产者生产一个,消费者取出一个。这样进行循环。

封装理念

单纯的说生产者与消费我也不能说得很清楚,就拿生活中的事情来举个例子。某东上销售IPhone,首先商品有一定的库存,而库存怎么来的呢?就是IPhone的代理工厂生产的,也许一个,也许多个,这样就形成了我们的生产者。很多用户都去购买IPhone,这个肯定是我们的消费者了。如果没有货,消费者就需要等待代理工厂为某东供货。如果下架了该商品,那么代理工厂就停止生产,消费者买完了库存中的IPhone也就不能再购买了。

实战

  1. 生产线,管理生产者和消费者。包括了注册和移除等相关操作
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产线,管理生产者和消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeLine<T> {
    /**
     * 存储生产者与生产者线程
     */
    private Map<Producer<T>, Thread> producers;

    /**
     * 存储消费者与消费者线程
     */
    private Map<Consumer<T>, Thread> consumers;

    /**
     * 重入锁
     */
    private Lock lock;

    /**
     * 等待计数器
     */
    private CountDownLatch latch;

    /**
     * 标志产线是否正在运行了
     */
    private volatile boolean started;

    /**
     * 阻塞队列容量
     */
    private int capacity;

    /**
     * 阻塞队列
     */
    private BlockingQueue<T> queue;


    /**
     * @param capacity 阻塞队列容量
     */
    public PipeLine(int capacity) {
        this.queue = new ArrayBlockingQueue<T>(capacity);
        this.producers = new ConcurrentHashMap<>();
        this.consumers = new ConcurrentHashMap<>();
        this.lock = new ReentrantLock();
        this.capacity = capacity;
    }

    /**
     * 注册生产者{@link com.melon.other.pipeline.Producer}
     *
     * @param producer 生产者
     * @return 生产线
     */
    public PipeLine<T> registProducer(Producer<T> producer) {
        this.lock.lock();
        try {
            producer.bindPipeLine(this);
            this.producers.put(producer, new Thread(producer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist producer[%s],count %s", producer.getId(), this.producers.size()));
        return this;
    }

    /**
     * 移除生产者
     *
     * @param producer 生产者
     */
    void unregistProducer(Producer<T> producer) {
        lock.lock();
        try {
            producers.remove(producer);
            System.out.println(String.format("Unregist producer[%s], count %s", producer.getId(), producers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }


    /**
     * 注册消费者{@link com.melon.other.pipeline.Consumer}
     *
     * @param consumer 消费者
     * @return 生产线
     */
    public PipeLine<T> registConsumer(Consumer<T> consumer) {
        this.lock.lock();
        try {
            consumer.bindPipeLine(this);
            this.consumers.put(consumer, new Thread(consumer));
        } finally {
            this.lock.unlock();
        }
        System.out.println(String.format("Regist consumer[%s],count %s", consumer.getId(), this.consumers.size()));
        return this;
    }

    /**
     * 移除消费者
     *
     * @param consumer 消费者
     */
    void unregistConsumer(Consumer<T> consumer) {
        lock.lock();
        try {
            consumers.remove(consumer);
            System.out.println(String.format("Unregist consumer[%s], count %s", consumer.getId(), consumers.size()));
        } finally {
            latch.countDown();
            lock.unlock();
        }
    }

    /**
     * 生产者向产线添加产品
     *
     * @param t 产品
     * @throws InterruptedException 异常
     */
    void push(T t) throws InterruptedException {
        if (queue.size() == capacity)
            System.out.println("【PipeLine full】");
        queue.put(t);
    }

    /**
     * 消费者从队列中拿出一个产品
     *
     * @return 产品
     * @throws InterruptedException 异常
     */
    T poll() throws InterruptedException {
        return queue.poll(1, TimeUnit.SECONDS);
    }

    /**
     * 获取队列容量
     *
     * @return int
     */
    public int size() {
        return queue.size();
    }

    /**
     * 获取生产者数量
     *
     * @return int
     */
    public int getProducerCount() {
        lock.lock();
        try {
            if (!queue.isEmpty()) //如果还有产品就表示还有生产者
                return 1;
            return producers.size();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 启动产线
     *
     * @return 产线
     */
    public PipeLine<T> start() {
        lock.lock();
        try {
            if (started) {
                System.out.println("PipeLine has been started!");
                return this;
            }
            //计数器总共的个数是生产者和消费者的综合
            latch = new CountDownLatch(producers.size() + consumers.size());
            //循环启动生产者
            for(Thread producer:producers.values()){
                producer.start();
            }
            //循环启动消费者
            for(Thread consumer:consumers.values()){
                consumer.start();
            }
            //切换标识
            started = true;
            return this;
        } finally {
            lock.unlock();
        }
    }

    public void await() throws InterruptedException {
        latch.await();
    }

}

  1. 生产者
/**
 * 生产者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Producer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;


    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = this.produce();
                if (t == null) { //生产者返回空,就表示这个线程停止生产,需要在finally移除当前的生产者
                    break;
                }
                pipeLine.push(t);
                System.out.println(String.format("Producer[%s] produced,PipeLine size: %s", this.getId(), this.pipeLine.size()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistProducer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前生产者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 生产产品
     *
     * @return T
     * @throws Exception 异常
     */
    protected abstract T produce() throws Exception;

}

  1. 消费者
/**
 * 消费者
 *
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public abstract class Consumer<T> implements Runnable {

    /**
     * 生产线{@link com.melon.other.pipeline.PipeLine}
     */
    private PipeLine<T> pipeLine;

    @Override
    public void run() {
        try {
            for (; ; ) {
                T t = pipeLine.poll(); //拿一个产品
                if (t == null && pipeLine.getProducerCount() == 0) { //如果产品没有拿到,并且没有生产者了,那么就要移除消费者
                    System.out.println(String.format("【Consumer[%s] hungry】", getId()));
                    break;
                }
                consume(t);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pipeLine.unregistConsumer(this);
        }
    }

    /**
     * 绑定生产线
     *
     * @param pipeLine 生产线
     */
    void bindPipeLine(PipeLine<T> pipeLine) {
        this.pipeLine = pipeLine;
    }

    /**
     * 获取当前消费者的线程id
     *
     * @return id - long
     */
    protected long getId() {
        return Thread.currentThread().getId();
    }

    /**
     * 消费
     *
     * @param t 产品
     * @throws Exception 异常
     */
    protected abstract void consume(T t) throws Exception;
}

  1. 测试
  
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author melon
 * @version 1.0
 * @since JDK1.8
 */
public class PipeT {
    private static AtomicInteger num = new AtomicInteger();

    public static void main(String[] args) {
        new PipeLine<String>(10)
                .registProducer(new P())
                .registProducer(new P())
                .registConsumer(new C())
                .registConsumer(new C())
                .registConsumer(new C())
                .start();
    }

    public static class P extends Producer<String> {

        @Override
        protected String produce() {
            if (num.get() >= 50) {
                return null;
            }
            return getId() + " this " + num.getAndIncrement();
        }
    }

    public static class C extends Consumer<String> {

        @Override
        protected void consume(String s) {
            System.out.println(s + ">>" + getId());
        }
    }
}
  1. 运行结果
Regist producer[1],count 1
Regist producer[1],count 2
Regist consumer[1],count 1
Regist consumer[1],count 2
Regist consumer[1],count 3
Producer[10] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[10] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[10] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[10] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
Producer[10] produced,PipeLine size: 10
【PipeLine full】
10 this 0>>14
10 this 2>>14
10 this 3>>14
10 this 4>>14
10 this 5>>14
10 this 6>>14
10 this 7>>14
10 this 8>>14
10 this 9>>14
10 this 10>>14
Producer[10] produced,PipeLine size: 9
10 this 11>>14
10 this 1>>12
Producer[10] produced,PipeLine size: 1
10 this 12>>14
Producer[10] produced,PipeLine size: 1
10 this 13>>13
Producer[10] produced,PipeLine size: 1
10 this 14>>12
Producer[10] produced,PipeLine size: 1
10 this 15>>14
Producer[10] produced,PipeLine size: 1
10 this 16>>13
Producer[10] produced,PipeLine size: 1
10 this 17>>12
Producer[10] produced,PipeLine size: 1
10 this 18>>14
Producer[10] produced,PipeLine size: 1
10 this 19>>13
11 this 20>>12
Producer[11] produced,PipeLine size: 1
11 this 21>>14
Producer[11] produced,PipeLine size: 1
11 this 22>>13
Producer[11] produced,PipeLine size: 1
11 this 23>>12
Producer[11] produced,PipeLine size: 1
11 this 24>>14
Producer[11] produced,PipeLine size: 0
11 this 25>>13
Producer[11] produced,PipeLine size: 1
11 this 26>>12
Producer[11] produced,PipeLine size: 1
11 this 27>>14
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
11 this 28>>13
10 this 29>>13
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
10 this 30>>14
11 this 31>>12
10 this 32>>12
Producer[10] produced,PipeLine size: 5
11 this 35>>14
10 this 36>>14
10 this 34>>13
Producer[10] produced,PipeLine size: 1
11 this 33>>12
10 this 37>>14
Producer[11] produced,PipeLine size: 0
Producer[10] produced,PipeLine size: 1
11 this 38>>13
Producer[11] produced,PipeLine size: 1
10 this 39>>14
11 this 40>>12
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 1
Producer[10] produced,PipeLine size: 1
Producer[11] produced,PipeLine size: 2
Producer[10] produced,PipeLine size: 3
Producer[11] produced,PipeLine size: 4
Producer[10] produced,PipeLine size: 5
Producer[11] produced,PipeLine size: 6
Producer[10] produced,PipeLine size: 7
Producer[11] produced,PipeLine size: 8
Producer[10] produced,PipeLine size: 9
10 this 41>>13
11 this 44>>13
10 this 45>>13
11 this 46>>13
10 this 47>>13
11 this 48>>13
10 this 49>>13
10 this 43>>12
11 this 42>>14
Unregist producer[11], count 1
Unregist producer[10], count 0
【Consumer[13] hungry】
【Consumer[12] hungry】
Unregist consumer[13], count 2
【Consumer[14] hungry】
Unregist consumer[12], count 1
Unregist consumer[14], count 0

相关文章

  • kafka理解

    kafka生产者、消费者与分区的分配关系 生产者如何传输到分区消费者如何从分区读取生产者、消费者与分区的关系 主题...

  • 生产者与消费者封装

    概要 生产者Producer 生产某个对象(共享资源),放在缓冲池中,然后消费者从缓冲池中取出这个对象。也就是生产...

  • Java 并发编程——生产者与消费者

    1. 生产者与消费者 1.1 程序基本实现(问题引出) 生产者与消费者是线程操作的经典案例,即:生产者不断生产,消...

  • java多线程

    生产者与消费者问题

  • Node下的RabbitMQ应用

    一个最简单的生产者与消费者建立过程 创建生产者 创建消费者

  • 操作系统知识点持续更新

    生产者消费者问题 关于生产者消费者问题可以参考这篇文章:生产者消费者问题的java实现 临界区与互斥量 临界区:保...

  • 生产者与消费者模型

    生产者与消费者模型 通过使用Object的wait(),notify()方法进行生产者与消费者模型中出现的数据同步...

  • 2-1.死锁-经典同步问题

    三、经典同步问题 1.生产者-消费者问题 计算机系统中的许多问题都可归结为生产者与消费者问题,生产者与消费者可以通...

  • 生产者与消费者模式

    一、模式特点 生产者与消费者模式中,生产者和消费者各自做着自己的工作,生产者生产物品,将物品放入缓冲区。消费者从缓...

  • 生产者-消费者 模型

    生产者与消费者基本程序模型 在多线程开发过程之中最为著名的案例就是生产者与消费者操作,该操作的主要流程如下:生产者...

网友评论

    本文标题:生产者与消费者封装

    本文链接:https://www.haomeiwen.com/subject/bqegcctx.html