美文网首页
多线程设计模式解读—Producer-Consumer模式

多线程设计模式解读—Producer-Consumer模式

作者: 九九派 | 来源:发表于2018-08-27 11:26 被阅读79次

    Producer-Consumer模式可以说是设计模式的设计模式,后期我们要讲的许多模式像Thread-Pool模式,Active Object模式等都是Producer-Consumer模式的变种,同时Producer-Consumer模式中的生产者和消费者阻塞唤醒机制也可以通过Guarded Suspension模式实现。

    为什么要有Producer-Consumer模式呢?

    1、消除了生产者与消费者之间的代码依赖。

    2、实现线程间的协调运行。生产者与消费者之间的运行速率不同,直接调用,数据处理会产生延迟,导致程序响应性下降。

    这种模式我们平时应该经常接触到,小到单体应用中ThreadPoolExecutor的编码,大到架构实现中Kafka,RabbitMQ的使用。它由以下角色组成:

    Producer:负责生成Product,传入Channel。

    Product:由Producer生成,供Consumer使用。

    Channel:Producer和Consumer之间的缓冲区,用于传递Product。

    Consumer:从Channel获取Product使用。

    这里我们可以使用java.util.concurrent中的BlockingQueue阻塞队列实现Channel。它可以极大地简化编程,take操作会一直阻塞直到有可用数据,put在channel满时也会阻塞直到有数据被消费。它有如下实现类,1、LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,前者基于链表实现,如果不特别指定,元素个数没有最大限制,后者基于数组实现,元素个数有最大限制。2、PriorityBlockingQueue是按优先级排序的队列,DelayQueue是一定时间后才可以take的队列。3、SynchronousQueue,没有存储功能,直接传递,因此put和take会一直阻塞,直到另一个线程准备好参与。

    该模式的示例代码如下:

    public class PCTest {
    
      public static void main(String[] args) {
        //channel,有界阻塞队列,容量100
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
    
        new Thread(producer).start();
        new Thread(consumer).start();
        System.out.println("生产者,消费者开始运行");
      }
    }
    
    class Consumer implements Runnable{
    
      private BlockingQueue<String> queue;
    
      public Consumer(BlockingQueue<String> q){
        this.queue=q;
      }
    
      @Override
      public void run() {
        try{
          String data;
          //获取消息,收到exit后退出
          while((data = queue.take()) !="exit"){
            Thread.sleep(20);
            System.out.println("Consume: "+data);
          }
        }catch(InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    
    class Producer implements Runnable {
    
      private BlockingQueue<String> queue;
    
      public Producer(BlockingQueue<String> q){
        this.queue=q;
      }
      @Override
      public void run() {
        //生产消息
        for(int i=0; i<100; i++){
          String data = new String(""+i);
          try {
            Thread.sleep(i);
            queue.put(data);
            System.out.println("Produce:"+data);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        //毒丸对象
        String data = "exit";
        try {
          queue.put(data);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
    }
    

    1、Channel剩余空间问题:

    如果消费者消费比较慢,这就会导致Channel中的Product逐渐积压,对此,我们可以使用有界阻塞队列,当队列满时,会阻塞直到消费者消费才继续生产Product。如果使用无界阻塞队列,就要考虑使用一段时间后,内存不足的情况,可以采用Semaphore信号量来控制。

    2、只有一个共享队列时的锁的竞争

    如果多个消费者同时消费同一个队列的时候,就会导致锁的竞争,不过BlockingQueue阻塞队列已经帮我们实现了相应的机制,使用Lock,Condition等控制多线程运行,其实就是对Guarded Suspension模式的应用。我们可以通过工作密取算法降低锁的竞争,提高可伸缩性。即每个消费者都有自己的双端队列(Deque,具体实现有ArrayDeque和LinkedBlockingDeque),一个消费者处理完自己队列的Product时,可以从其他消费者双端队列的末尾秘密获取Product。它非常适用于既是生产者又是消费者的问题,比如爬虫,当处理一个页面后,发现有更多页面需要处理,把这些新任务放到自己队列的末尾,当自己的双端队列为空时,则从其他队列尾部获取新任务。

    3、线程停止

    消费者线程和生产者线程哪个先停止,一般是先停止生产者,等Channel剩余Product备份后,或者被消费者处理完后,再停止消费者。至于具体实现,我们可以采用Two-phase termination 模式,设置停止标志并且使用中断;如果你使用线程池管理,则可以调用shutdown方法,它会等队列中的所有任务完成再关闭(shuwdownNow则可能在任务执行到一半时强行关闭);如果生产者和消费者数量不大,可以采用如上面示例中的毒丸对象,来关闭服务。

    相关文章

      网友评论

          本文标题:多线程设计模式解读—Producer-Consumer模式

          本文链接:https://www.haomeiwen.com/subject/ebtsbftx.html