前言
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
imagejava中最基本的生产者消费者模型是用notify跟wait实现的。
加锁机制的缺陷
在Java程序中,synchronized解决了多线程竞争的问题。当一个加锁函数执行完成后会自动释放锁,例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁,但是synchronized并没有解决多线程协调的问题。
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
上面这段代码对两个方法进行了加锁,但是由于getTask()方法加锁后,如果queue对象中一直没有对象就会形成死循环,锁得不到释放!其他线程也不行拿到锁进行下一步操作,这样的问题在多线程中经常发生,java为了解决这样的问题所以进行了线程的唤醒沉睡操作!
可以将方法进行稍作修改变为:
public synchronized String getTask() {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
其中 this.wait() 方法的意思是:
wait()让当前线程进入等待状态,同时,wait()也会让当前线程释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)
所以这里调用方法后,如果queue中是空的会将此线程进行沉睡,释放他们的锁!但是释放后谁怎么去通知这个线程又可以继续进行了呢?这个时候就需要notify或者notifyAll方法了!
所以我们再将添加方法进行修改得到:
public synchronized void addTask(String s) {
this.queue.add(s);
this.notify(); // 唤醒在this锁等待的线程
}
其中 this.notify() 方法的意思是:
notify()和notifyAll()的作用,则是唤醒当前对象上的等待线程;notify()是唤醒单个线程,而notifyAll()是唤醒所有的线程。
这段代码的意思就是我将一个值放入到queue后,我马上调用notify方法去通知那些进入wait方法的线程可以继续执行了!
最简单的生产消费
根据上面的解释,我们完整的代码可以像下面这样:
public class Java_21 {
public static void main(String[] args) throws InterruptedException {
TaskQueue q = new TaskQueue();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(() -> {
// 执行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
});
t.start();
ts.add(t);
}
Thread add = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
add.start();
add.join();
Thread.sleep(100);
for (Thread thread : ts) {
thread.interrupt();
}
}
}
class TaskQueue {
private Queue<String> stringQueue = new LinkedList<>();
public synchronized void addTask(String s) {
stringQueue.add(s);
// notify()和notifyAll()的作用,则是唤醒当前对象上的等待线程;notify()是唤醒单个线程,而notifyAll()是唤醒所有的线程。
this.notifyAll(); // 唤醒在this锁等待的所有线程
}
public synchronized String getTask() throws InterruptedException {
while (stringQueue.isEmpty()) {
//wait()的作用是让当前线程进入等待状态,同时,wait()也会让当前线程释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)
this.wait();
}
return stringQueue.remove();
}
}
运行的结果就是:
add task: t-0.8499205383861345
execute task: t-0.8499205383861345
add task: t-0.7942800777561965
execute task: t-0.7942800777561965
add task: t-0.6906469783761674
execute task: t-0.6906469783761674
add task: t-0.8532616076721191
execute task: t-0.8532616076721191
add task: t-0.7939063475258298
execute task: t-0.7939063475258298
add task: t-0.658439645359411
execute task: t-0.658439645359411
add task: t-0.1836403135968735
execute task: t-0.1836403135968735
add task: t-0.868308480446476
execute task: t-0.868308480446476
add task: t-0.5658234674527566
execute task: t-0.5658234674527566
add task: t-0.8699112304857072
execute task: t-0.8699112304857072
一眼就能看出是添加一个元素后马上取出一个元素。
对上面的代码进行解释:
我们有一个TaskQueue类里面有个添加与取出的方法,两个方法都进行了加锁的机制。
首先我们看取出的getTask()机制:
我们判断stringQueue是不是空如果是空就调用wait() 方法释放当前线程的锁,然后进入到睡眠状态
再来看addTask()方法:
我们向stringQueue中添加一个值,添加好了后我们马上调用notifyAll()方法唤醒签名所有调用了wait()方法的线程进行消费!
最后再来看main函数的执行:
我们开启了五个线程进行getTask()的执行,表示取出操作,我们再开启一个线程进行了addTask()操作,我们调用join方法让add线程进行完成后,我们再遍历集合调用interrupt()方法进行所有线程的中断操作
执行过程:
add线程中我调用addTask()后添加一个值马上就去调用notifyAll()通知其他五个等待线程去取,至于是哪个线程取到这就是cpu自己的调度了,等取空了后五个线程中又进入了休眠状态,我这边又去添加值,这样重复进行就可以达到一放一取的操作
生产者消费者扩充
一般的生产者消费者模型都是给了一个缓冲区的概念,都是添加到缓冲区,等缓冲区到达一个值后生产者就不生产了,然后让消费者去消费,等消费者消费完后又暂停等生产者去生产!跟前言里面的图是一个道理,将我们的代码简单的变化就能达到:
修改生产者:
public synchronized void addTask(String s) {
stringQueue.add(s);
if (stringQueue.size()==100){
this.notifyAll();
}
}
中间的缓冲区大小就是100.
最后
生产者消费者模式我们不是经常用到,但是我们开发中用到的框架很多都是基于这样的线程模型,所以掌握原理后有助于我们开发中解决实际的线程安全交流问题
补充
加锁的写法有很多种,例如我将上面的方法不用this加锁,用一个中间常量效果一样,完整代码如下:
public class Java_21 {
static final String LOCK = "lock";
public static void main(String[] args) throws InterruptedException {
TaskQueue q = new TaskQueue();
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(() -> {
// 执行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
});
t.start();
ts.add(t);
}
Thread add = new Thread(() -> {
for (int i = 0; i < 10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
add.start();
add.join();
Thread.sleep(100);
for (Thread thread : ts) {
thread.interrupt();
}
}
}
class TaskQueue {
private Queue<String> stringQueue = new LinkedList<>();
public void addTask(String s) {
synchronized(LOCK){
stringQueue.add(s);
// if (stringQueue.size()==100){
// LOCK.notifyAll(); // 唤醒在this锁等待的所有线程
//
// }
LOCK.notifyAll();
}
}
public String getTask() throws InterruptedException {
synchronized (LOCK){
while (stringQueue.isEmpty()) {
LOCK.wait();
}
return stringQueue.remove();
}
}
}
网友评论