生产者消费者问题
背景
在并发编程中,生产者消费者问题(producer/consumer)是一个经典的老生常谈的问题,有时也称为有界缓冲区问题。
问题的基本背景假设是:我们有一个固定大小的缓冲区,这个缓冲区分别有两种工作性质不同的线程去操作。其中一种线程负责向缓冲区中写入数据,我们称之为生产者线程。另一种线程则负责从缓冲区中拿取数据,并称之为消费者线程。
同时两种线程的写入和拿取工作要遵循一定的规则:
- 缓冲区未写满时,生产者线程可以向缓冲区中写入数据。但是消费者线程不能从缓冲区中读取数据。
- 缓冲区写满时,生产者线程不能向缓冲区中写入数据,消费者线程可以冲缓冲区中读取数据。
- 不管是那种性质的线程,在操作缓冲区时,均不可出现并发安全问题。
分析可以得知,解决生产者消费者问题,其实就是要解决线程同步问题与共享资源互斥访问问题。互斥问题的解决可以借助锁来实现,而线程同步则需借助信号量或其他工具来实现。
Java实现
class FixedSizeBuffer
{
private static final int DEFAULT_BUFFER_SIZE = 1024;
private final ReentrantLock lock = new ReentrantLock(); // 共享资源访问锁
private final Condition isFull = lock.newCondition(); // buffer是否已满
private final Condition isEmpty = lock.newCondition(); // buffer是否还空着
private final int size; // buffer的大小
private final byte[] buffer; // buffer
private int cursor; // 写入游标
public FixedSizeBuffer()
{
this(DEFAULT_BUFFER_SIZE);
}
public FixedSizeBuffer(int size)
{
if (size <= 0) throw new IllegalArgumentException();
this.size = size;
this.buffer = new byte[size];
cursor = -1;
}
/**
* 向buffer中写入一个字节的数据
* @param content 数据内容
* @throws InterruptedException 中断异常
*/
public void putByte(byte content) throws InterruptedException
{
/*
由于要对共享资源buffer进行访问,所以要加锁。
*/
lock.lock();
try
{
/*
如果写入游标等于数组的最大下标,这时要停止写入。
(因为,使用cursor = -1代表buffer内容被清空)
*/
while (cursor == (size - 1))
{
System.out.println(Thread.currentThread().getName() + " : 缓冲区已满");
isEmpty.signalAll(); // 唤醒消费者线程,可以从buffer中拿走数据了。
/*
await() 方法会暂时挂起当前线程,并且释放当前线程所持有的锁。
当该线程被唤醒时,线程会从await代码下一处位置开始执行。
在线程被唤醒的同时,也会重新获取当前Condition所关联的锁。
所以这里要使用while循环来判断,因为线程是在此处代码被唤醒的。
这样就做到了多重检查的作用。
*/
isFull.await();
}
++cursor;
System.out.println(Thread.currentThread().getName() + " => Buffer [cursor = " + cursor + "]");
buffer[cursor] = content;
}finally
{
lock.unlock();
}
}
/**
* 取出当前buffer的所有内容
* @return buffer content
* @throws InterruptedException 中断异常
*/
public byte[] takeAll() throws InterruptedException
{
lock.lock();
try
{
while (cursor != (size - 1))
isEmpty.await(); // 当前buffer未满,需要等待写入线程唤醒。
final byte[] result = new byte[size];
System.arraycopy(buffer, 0, result, 0, size); // 将内容拷贝
cursor = -1; // 重置写入游标
System.out.println(Thread.currentThread().getName() + " <= Buffer [cursor = -1]");
isFull.signalAll(); // 唤醒生产者线程
return result;
}finally
{
lock.unlock();
}
}
}
网友评论