Producer-Consumer模式可以说是设计模式的设计模式,后期我们要讲的许多模式像Thread-Pool模式,Active Object模式等都是Producer-Consumer模式的变种,同时Producer-Consumer模式中的生产者和消费者阻塞唤醒机制也可以通过Guarded Suspension模式实现。
为什么要有Producer-Consumer模式呢?
1、消除了生产者与消费者之间的代码依赖。
2、实现线程间的协调运行。生产者与消费者之间的运行速率不同,直接调用,数据处理会产生延迟,导致程序响应性下降。
这种模式我们平时应该经常接触到,小到单体应用中ThreadPoolExecutor的编码,大到架构实现中Kafka,RabbitMQ的使用。它由以下角色组成:
Producer:负责生成Product,传入Channel。
Product:由Producer生成,供Consumer使用。
Channel:Producer和Consumer之间的缓冲区,用于传递Product。
Consumer:从Channel获取Product使用。
这里我们可以使用java.util.concurrent中的BlockingQueue阻塞队列实现Channel。它可以极大地简化编程,take操作会一直阻塞直到有可用数据,put在channel满时也会阻塞直到有数据被消费。它有如下实现类,1、LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,前者基于链表实现,如果不特别指定,元素个数没有最大限制,后者基于数组实现,元素个数有最大限制。2、PriorityBlockingQueue是按优先级排序的队列,DelayQueue是一定时间后才可以take的队列。3、SynchronousQueue,没有存储功能,直接传递,因此put和take会一直阻塞,直到另一个线程准备好参与。
该模式的示例代码如下:
public class PCTest {
public static void main(String[] args) {
//channel,有界阻塞队列,容量100
BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
System.out.println("生产者,消费者开始运行");
}
}
class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> q){
this.queue=q;
}
@Override
public void run() {
try{
String data;
//获取消息,收到exit后退出
while((data = queue.take()) !="exit"){
Thread.sleep(20);
System.out.println("Consume: "+data);
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer implements Runnable {
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> q){
this.queue=q;
}
@Override
public void run() {
//生产消息
for(int i=0; i<100; i++){
String data = new String(""+i);
try {
Thread.sleep(i);
queue.put(data);
System.out.println("Produce:"+data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//毒丸对象
String data = "exit";
try {
queue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1、Channel剩余空间问题:
如果消费者消费比较慢,这就会导致Channel中的Product逐渐积压,对此,我们可以使用有界阻塞队列,当队列满时,会阻塞直到消费者消费才继续生产Product。如果使用无界阻塞队列,就要考虑使用一段时间后,内存不足的情况,可以采用Semaphore信号量来控制。
2、只有一个共享队列时的锁的竞争
如果多个消费者同时消费同一个队列的时候,就会导致锁的竞争,不过BlockingQueue阻塞队列已经帮我们实现了相应的机制,使用Lock,Condition等控制多线程运行,其实就是对Guarded Suspension模式的应用。我们可以通过工作密取算法降低锁的竞争,提高可伸缩性。即每个消费者都有自己的双端队列(Deque,具体实现有ArrayDeque和LinkedBlockingDeque),一个消费者处理完自己队列的Product时,可以从其他消费者双端队列的末尾秘密获取Product。它非常适用于既是生产者又是消费者的问题,比如爬虫,当处理一个页面后,发现有更多页面需要处理,把这些新任务放到自己队列的末尾,当自己的双端队列为空时,则从其他队列尾部获取新任务。
3、线程停止
消费者线程和生产者线程哪个先停止,一般是先停止生产者,等Channel剩余Product备份后,或者被消费者处理完后,再停止消费者。至于具体实现,我们可以采用Two-phase termination 模式,设置停止标志并且使用中断;如果你使用线程池管理,则可以调用shutdown方法,它会等队列中的所有任务完成再关闭(shuwdownNow则可能在任务执行到一半时强行关闭);如果生产者和消费者数量不大,可以采用如上面示例中的毒丸对象,来关闭服务。
网友评论