简介:
生产者、消费者真的是老生常谈了,很多地方都会用到这种思想,可见其重要程度。所以,还是有必要了解一下它的最原始的实现方式的,也就是利用Object的wait和notifyAll方法。
源码:
/**
* Causes the current thread to wait until another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object.
* In other words, this method behaves exactly as if it simply
* performs the call {@code wait(0)}.
* <p>
* The current thread must own this object's monitor. The thread
* releases ownership of this monitor and waits until another thread
* notifies threads waiting on this object's monitor to wake up
* either through a call to the {@code notify} method or the
* {@code notifyAll} method. The thread then waits until it can
* re-obtain ownership of the monitor and resumes execution.
* <p>
* As in the one argument version, interrupts and spurious wakeups are
* possible, and this method should always be used in a loop:
* <pre>
* synchronized (obj) {
* while (<condition does not hold>)
* obj.wait();
* ... // Perform action appropriate to condition
* }
* </pre>
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*
* @throws IllegalMonitorStateException if the current thread is not
* the owner of the object's monitor.
* @throws InterruptedException if any thread interrupted the
* current thread before or while the current thread
* was waiting for a notification. The <i>interrupted
* status</i> of the current thread is cleared when
* this exception is thrown.
* @see java.lang.Object#notify()
* @see java.lang.Object#notifyAll()
*/
public final void wait() throws InterruptedException {
wait(0);
}
/**
* Wakes up all threads that are waiting on this object's monitor. A
* thread waits on an object's monitor by calling one of the
* {@code wait} methods.
* <p>
* The awakened threads will not be able to proceed until the current
* thread relinquishes the lock on this object. The awakened threads
* will compete in the usual manner with any other threads that might
* be actively competing to synchronize on this object; for example,
* the awakened threads enjoy no reliable privilege or disadvantage in
* being the next thread to lock this object.
* <p>
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*
* @throws IllegalMonitorStateException if the current thread is not
* the owner of this object's monitor.
* @see java.lang.Object#notify()
* @see java.lang.Object#wait()
*/
public final native void notifyAll();
思路:
首先要想的是生产者和消费者分别需要用java里面的什么来实现,毫无疑问,肯定要用线程了,因为它是最基本的调度单元,可以在其run方法里面处理自定义逻辑。确定了线程之后,还要找一个地方来存储生产者生产出来的产品,首先这个容器要有序的,那就用最简单的ArrayList了。有的时候为了测试方便,我们有可能要协调多个线程,可能需要借助于CountDownLatch来实现。
举例1:
这个例子展示的是,开启多个生产者生产一定量的产品放进队列,等所有的生产者都生产完之后,打印生产了多少个产品。然后,多个消费者再开始消费,消费完之后,再打印,容器里面还剩余多少个产品。以此体验生产和消费的思想,大家可以在自己的IDE上运行一下。
@Test
public void test01() throws InterruptedException {
CountDownLatch countDownLatchProducer = new CountDownLatch(15);
CountDownLatch countDownLatchConsumer = new CountDownLatch(6);
for (int i = 0; i < 3; i++) {//定义3个线程
new Thread(new Producer(i,countDownLatchProducer)).start();
}
// Thread.sleep(20000);
countDownLatchProducer.await();//当所有的生产者子线程的任务都完成之后,就会恢复继续向下执行。
System.out.println("消费前大小:"+Queue.getList().size());
for (int i = 0; i < 3; i++) {
new Thread(new Consumer(countDownLatchConsumer)).start();
}
// Thread.sleep(20000);
countDownLatchConsumer.await();//当所有的消费者子线程的任务都完成之后,就会恢复继续向下执行。
System.out.println("消费后大小:"+Queue.getList().size());
}
class Producer implements Runnable {
private int i;
private CountDownLatch countDownLatch;
public Producer(int i,CountDownLatch countDownLatch) {
this.i = i;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
Queue.put(new Bread(i));
countDownLatch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private CountDownLatch countDownLatch;
public Consumer(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
Bread bread = Queue.get();
countDownLatch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Queue {
private final static List<Bread> list = new ArrayList<>(15);
public static List getList() {
return list;
}
static void put(Bread bread) throws InterruptedException {//进队列
synchronized (list) {
while (list.size() >= 15) {//限制容器大小最大值为15
list.wait();
}
list.add(bread);
list.notifyAll();
}
}
static Bread get() throws InterruptedException {//出队列
Bread bread = null;
synchronized (list) {
if (list.size() == 0)
list.wait();
bread = list.remove(list.size() - 1);
list.notifyAll();
}
return bread;
}
}
package com.hao.hellolearn.jdk.producerconsumer;
public class Bread {
private int value;
public Bread(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
举例2:
这个例子展示的是:当容器的容量不足以装下生产者生产的任务的时候,生产者的线程就会阻塞在put方法上,直到消费者开始消费容器中的任务,生产者线程才会恢复执行,可以通过sleep方法来协助测试。
public class WaitNotifyImpl {
@Test
public void test01() throws InterruptedException {
for (int i = 0; i < 3; i++) {//定义3个线程
new Thread(new Producer(i)).start();
}
Thread.sleep(20000);
System.out.println("消费前大小:"+Queue.getList().size());
for (int i = 0; i < 3; i++) {
new Thread(new Consumer()).start();
}
Thread.sleep(20000);
System.out.println("消费后大小:"+Queue.getList().size());
}
class Producer implements Runnable {
private int i;
public Producer(int i) {
this.i = i;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {//每个生产者线程每次生产5个对象
Queue.put(new Bread(i));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
public Consumer() {
}
@Override
public void run() {
try {
for (int i = 0; i < 2; i++) {//每个消费者线程一次消费2个对象
Bread bread = Queue.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
当第一个sleep执行的时候,可以在cmd里面分别执行jps -l 查看运行中的进程的pid,然后用jstack -l pid 来打印出这个进程的堆栈信息,可以看到线程阻塞在object monitor上面。1处的是生产者线程的状态是WAITING是有object.wait 引起的,2处的是TIMED_WAITING是由sleep引起的。
生产者-消费者-jstack.png
总结:
生产者-消费者 生产出来的产品放在一个容器里面,消费者从容器里面取出来;如果生产速率慢,消费速度快,那么当容器为空的时候,消费者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面被放进一个产品,然后争抢着。如果消费速率慢,生产速度快,那么当容器满了的时候生产者会阻塞在当前容器(什么事也做不了,静静的等待),直到容器里面的某一个产品被消费掉。它的本质其实就是线程之间的协作。
觉察即自由。
网友评论