前言
大家对生产者和消费者模式很熟悉吧,这个模式很好理解,也在工程实践中经常被使用到。学习Java的同学有很大一部分使用Java语言实现过生产者和消费者模式,我就是其中之一。大概思路:有个盛装数据的容器(list)即缓冲区
,一个往容器里放数据即生产者
,一个从容器中取数据即消费者
。但当容器满的时候,生产者
就不能往里放东西了,此时需要等待缓冲区
不满,即有消费者
从容器中取出数据了,这就需要一个等待
和通知
的功能了。这个功能我当时是直接使用Java 中 Object
定义的wait
、notify
和notifyAll
来实现的。现在回忆起来也只能记得这么多。今天的这篇文章会对通知与等待有个比较深入的探索,以便写出更安全的代码。
有问题却一眼看不出问题的消费者和生产者模式
下文将使用Java 实现一个简单的消费者和生产者模式,代码如下:
public class Producer {
//缓冲区
private CircleQueue<String> cache;
public Producer(CircleQueue<String> cache) {
this.cache = cache;
}
public synchronized void produce(String e) throws InterruptedException {
if (cache.isFull()) {
cache.wait();
}
cache.put(e);
cache.notifyAll();
}
}
public class Consumer {
private CircleQueue<String> cache;
public Consumer(CircleQueue<String> container) {
this.cache = container;
}
public synchronized void consume() throws InterruptedException {
if (cache .isEmpty()) {
lock.wait();
}
System.out.println(
String.format("thread:%s,consume a element:%s",
Thread.currentThread().getName(),
cache .take()));
cache.notifyAll();
}
}
上面的代码很短,我就没有注释。大家看了一遍,有没有看出来上面代码运行会报错?运行一下测试代码(见最下面)大家就明白了:
结果:
生产者执行
produce
函数的时候,执行到cache.notifyAll()
会抛出这个异常,这个异常代表什么意思呢?我们来看看IllegalMonitorStateException 的注释:
Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.
上面这个注释, 它想表达两个意思
1> 在不拥有当前对象监视器的情况下调用当前对象的wait方法
2> 在不拥有当前对象监视器的情况下调用当前对象的notify和notifyAll
上面报错的原因就是因为满足了第2条,其实同样
cache.wait()
这里也有问题。下面我们来改下代码:
public void produce(String e) throws InterruptedException {
synchronized (cache) {
if (cache.isFull()) {
cache .wait();
}
cache.put(e);
cache.notifyAll();
}
}
public void consume() throws InterruptedException {
synchronized (cache) {
if (container.isEmpty()) {
cache.wait();
}
System.out.println(
String.format("thread:%s,consume a element:%s",
Thread.currentThread().getName(),
cache.take()));
cache.notifyAll();
}
}
上面只粘贴了主要的代码,synchronized
关键字修饰的是cache
,因此当线程进来的时候获取的是cache
对象的监视器,因此到下面无论执行cache.wait()
还是cache.notifyAll()
都不会抛出异常。当然上面的代码还是有问题的,不知道大家有没有看出来?我们来执行下测试代码,发现报错了,但是也有可能你执行的时候并没有报错,因为这是多线程,存在很多偶然性,多运行几遍你就会发现你中奖了,报错如下:
上面报错的是Consumer
中的cache.take()
处抛的异常,cache对象的类型CircleQueue
是我自己实现的一个环形队列,抛出该异常是因为队列已空,如果这样大家就奇怪了,能执行到cache.take()
不是因为队列不为空才唤醒当前线程的吗?
我先介绍下当某个线程调用wait
的时候发生了什么?
它会释放调用对象上的监视器即锁,然后进入一个条件等待队列中等待被唤醒,此时有别的线程改变了状态(eg: 队列为空,队列已满),然后调用notifyAll(),条件等待队列中的线程再次获取调用对象上的监视器,然后继续向下执行
。画一个草图给大家理解下:
在一个线程被唤醒到获取锁的这个时间里,可能有另外一个线程改变了状态 ( 往队列插入元素或者从队列中取出元素 ), 然后该线程往下执行的时候发现缓冲区为空或者已满这种现象。大家估计也想到了解决办法,就是被唤醒再次获取锁之后再判断一次状态,加一个循环就搞定了,代码如下:
public void consume() throws InterruptedException {
synchronized (cache) {
// 这里把if改为while 就可以了
while (cache.isEmpty()) {
cache.wait();
}
System.out.println(
String.format("thread:%s,consume a element:%s",
Thread.currentThread().getName(),
cache.take()));
cache.notifyAll();
}
}
这里限于篇幅,我只粘贴了消费者
的代码,生产者
同样的地方改一下就可以。多运行几次测试代码,都正常运行,并无线程安全的问题。
后记
有可能大家一开始学习的版本就是我最终实现的版本,可是大家知道是怎么来的吗?知其所以然很重要,你不需要去硬记很多东西。谢谢大家观看,麻烦点个赞和关注一下。
附录
测试代码
public static void main(String[] args) {
CircleQueue<String> cache = new CircleQueue<>(10);
Object lock = new Object();
for (int j = 0; j < 2; j++) {
new Thread(() -> {
Producer producer = new Producer(cache);
for (int i = 0; i < 10; i++) {
try {
producer.produce("数据" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for (int i = 0; i < 2; i++) {
new Thread(() -> {
Consumer consumer = new Consumer(cache);
while (true) {
try {
consumer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
网友评论