一.通过线程锁实现
package 多线程;
import java.util.concurrent.atomic.AtomicInteger;
public class 生产消费者2 {
/**
* 消费者
*/
static class Consumer extends Thread {
private final AtomicInteger dataHolder;
private final Object consumerLock;
private final Object producerLock;
Consumer(AtomicInteger dataHolder, Object consumerLock, Object producerLock) {
this.dataHolder = dataHolder;
this.consumerLock = consumerLock;
this.producerLock = producerLock;
}
@Override
public void run() {
super.run();
while (true) {
// 阻塞,等待生产者唤醒
try {
synchronized (consumerLock) {
consumerLock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 消费数据
System.out.println("消费->" + dataHolder.get());
System.out.println();
// 消费完了,通知生产者开始生产
synchronized (producerLock) {
producerLock.notifyAll();
}
}
}
}
/**
* 消费者
*/
static class Producer extends Thread {
private final AtomicInteger dataHolder;
private final Object consumeLock;
private final Object productLock;
Producer(AtomicInteger dataHolder, Object consumeLock, Object productLock) {
this.dataHolder = dataHolder;
this.consumeLock = consumeLock;
this.productLock = productLock;
}
@Override
public void run() {
super.run();
while (true) {
// 生产数据
System.out.println("生产->" + dataHolder.incrementAndGet());
// 唤醒消费者,去消费数据
synchronized (consumeLock) {
consumeLock.notifyAll();
}
// 阻塞生产者
synchronized (productLock) {
try {
productLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 睡眠一会儿,不要生产过快
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
final AtomicInteger dataHolder = new AtomicInteger();
final Object consumeLock = new Object();
final Object productLock = new Object();
Producer producer = new Producer(dataHolder, consumeLock, productLock);
Consumer consumer = new Consumer(dataHolder, consumeLock, productLock);
consumer.start();
producer.start();
consumer.join();
producer.join();
}
}
日志
生产->1
消费->1
生产->2
消费->2
生产->3
消费->3
生产->4
消费->4
这个方式简单易懂,关键点是锁时机
二.通过BlockingQueue实现
package 多线程;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class LogThread extends Thread {
final protected void log(String message) {
System.out.println(getName() + ":" + message);
}
}
class ProductThread extends LogThread {
private final BlockingQueue<Integer> queue;
ProductThread(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
super.run();
int i = 0;
while (true) {
try {
queue.put(++i);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("生产->" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class ConsumerThread extends LogThread {
private final BlockingQueue<Integer> queue;
public ConsumerThread(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
Integer data = null;
try {
data = this.queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null) {
log("消费->" + data);
System.out.println();
}
}
}
}
public class 生产消费者 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
ProductThread productThread = new ProductThread(queue);
for (int i = 0; i < 5; i++) {
ConsumerThread consumerThread = new ConsumerThread(queue);
consumerThread.start();
}
productThread.start();
productThread.join();
}
}
日志如下:
Thread-0:生产->1
Thread-1:消费->1
Thread-0:生产->2
Thread-3:消费->2
Thread-0:生产->3
Thread-4:消费->3
Thread-0:生产->4
Thread-2:消费->4
Thread-0:生产->5
Thread-5:消费->5
原理如下:
BlockingQueue.put(v) 如何容器没有空间的,也就是数据没有消费完,则阻塞
BlockingQueue.take()方法定义,如果数据为空,也就是数据没有生产出来,则阻塞。
BlockingQueue 的容量,可以通过构造函数传入,默认是Integer.MAX_VALUE
网友评论