1. 引言
生产者、消费者模式是如此的重要,是理解java 多线程并发的核心知识点,不少同学面试时,常规操作是当着面试官的面,手写一个生产者消费者模式。
2. 实现方式
通常情况下,有5种方式来实现
1. synchronized + wait() + notify() 方式
2. 可重入锁ReentrantLock (配合Condition)
3. BlockingQueue 阻塞队列方式
4. 信号量Semaphore 方式
5. 管道输入输出流PipedInputStream和PipedOutputStream 方式
这里仅介绍,面试比较常见的 第2,第3种方式
3. 可重入锁ReentrantLock 方式
3.1 可重入锁概念:
ReentrantLock类实现Lock接口,并在方法访问共享资源时为其提供同步。操作共享资源的代码会锁定当前工作线程并阻止尝试锁定共享资源的所有其他线程。
顾名思义,ReentrantLock允许线程不止一次地锁定资源。当线程首次进入锁定状态时,保持计数(hold count)设置为1。解锁之前,线程可以再次重新进入锁定状态,并且将计数增加1。
对于每个解锁请求,保持计数减1,当保持计数为0时,资源解锁。
可重入锁还提供公平参数,可以实现所谓公平锁和非公平锁。
1、公平锁能:新老线程一律需要排队使用锁。
2、非公平锁:老的线程排队使用锁;但是无法保证新线程抢占已经在排队的线程的锁。
ReentrantLock 可重入锁的方式,需要配合Condition使用,Condition 与Lock的主要区别在于是否能够响应中断。
public class ProducerAndConsumerByLock {
private static int count = 0;
private int maxNum = 3;
ReentrantLock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
public static void main(String[] args) {
ProducerAndConsumerByLock test = new ProducerAndConsumerByLock();
new Thread(test.new Producer()).start();
new Thread(test.new Producer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取锁
lock.lock();
try {
// while (getCount() == maxNum) {
// producerCondition.await();
// System.out.println("生产能力达到上限,进入等待状态");
// }
count++;
System.out.println(Thread.currentThread().getName()
+ "生产者生产,目前总共有" + count);
//唤醒消费者
consumerCondition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
//获取锁
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
// while (count == 0) {
// consumerCondition.await();
// }
count--;
System.out.println(Thread.currentThread().getName()
+ "消费者消费,目前总共有" + count);
//唤醒生产者
producerCondition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
输入结果:
Thread-1生产者生产,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-2消费者消费,目前总共有1
Thread-3消费者消费,目前总共有0
Thread-0生产者生产,目前总共有1
Thread-1生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2消费者消费,目前总共有0
Thread-1生产者生产,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-0生产者生产,目前总共有4
Thread-3消费者消费,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-0生产者生产,目前总共有4
Thread-3消费者消费,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-0生产者生产,目前总共有4
Thread-2消费者消费,目前总共有3
Thread-1生产者生产,目前总共有4
Thread-3消费者消费,目前总共有3
Thread-0生产者生产,目前总共有4
Thread-0生产者生产,目前总共有5
Thread-1生产者生产,目前总共有6
Thread-3消费者消费,目前总共有5
Thread-2消费者消费,目前总共有4
Thread-0生产者生产,目前总共有5
Thread-1生产者生产,目前总共有6
Thread-3消费者消费,目前总共有5
Thread-2消费者消费,目前总共有4
Thread-0生产者生产,目前总共有5
Thread-1生产者生产,目前总共有6
Thread-2消费者消费,目前总共有5
Thread-3消费者消费,目前总共有4
Thread-3消费者消费,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2消费者消费,目前总共有0
Process finished with exit code 0
不少人使用注释掉的while 语句来判断当前生成者产能是否已满。 不过这种while循环本身也属于一个独立线程,会有冲突的可能,放在演示代码里,输出顺序可能会混乱。
打开注释后,输出如下
Thread-0生产者生产,目前总共有1
Thread-1生产者生产,目前总共有2
Thread-2消费者消费,目前总共有1
Thread-3消费者消费,目前总共有0
Thread-1生产者生产,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2消费者消费,目前总共有0
Thread-1生产者生产,目前总共有1
Thread-0生产者生产,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-3消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-3消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-3消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
Thread-1生产者生产,目前总共有3
Thread-3消费者消费,目前总共有2
Thread-2消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
生产能力达到上限,进入等待状态
Thread-1生产者生产,目前总共有3
Thread-2消费者消费,目前总共有2
Thread-3消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
生产能力达到上限,进入等待状态
Thread-1生产者生产,目前总共有3
Thread-3消费者消费,目前总共有2
Thread-2消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
生产能力达到上限,进入等待状态
Thread-1生产者生产,目前总共有3
Thread-3消费者消费,目前总共有2
Thread-2消费者消费,目前总共有1
生产能力达到上限,进入等待状态
Thread-0生产者生产,目前总共有2
Thread-3消费者消费,目前总共有1
Thread-2消费者消费,目前总共有0
Process finished with exit code 0
4. 阻塞队列方式
主要是利用LinkedBlockingDeque 或者ArrayBlockingQueue 之类的阻塞队列。
其take() 与 put() API 是阻塞性质的。
public class ProducerAndConsumerByQueue {
private BlockingQueue<Toy> blockingDeque = new LinkedBlockingDeque<>(10);
public static void main(String[] args) {
ProducerAndConsumerByQueue producerAndConsumer = new ProducerAndConsumerByQueue();
new Thread(producerAndConsumer.new Producer()).start();
new Thread(producerAndConsumer.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i ++ ) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Toy toy = new Toy(i + "");
try {
blockingDeque.put(toy);
System.out.println( "生产者" + Thread.currentThread().getName() + " 生产玩具" + toy.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i ++ ) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Toy toy = blockingDeque.take();
System.out.println("消费者" + Thread.currentThread().getName() + " 消费玩具" + toy.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Toy {
private String name;
public Toy(String name) {
this.name = name;
}
public String getName() {
return "toy " + name;
}
}
}
输出结果
生产者Thread-0 生产玩具toy 0
生产者Thread-0 生产玩具toy 1
消费者Thread-1 消费玩具toy 0
生产者Thread-0 生产玩具toy 2
生产者Thread-0 生产玩具toy 3
消费者Thread-1 消费玩具toy 1
生产者Thread-0 生产玩具toy 4
生产者Thread-0 生产玩具toy 5
生产者Thread-0 生产玩具toy 6
消费者Thread-1 消费玩具toy 2
生产者Thread-0 生产玩具toy 7
生产者Thread-0 生产玩具toy 8
消费者Thread-1 消费玩具toy 3
生产者Thread-0 生产玩具toy 9
消费者Thread-1 消费玩具toy 4
消费者Thread-1 消费玩具toy 5
消费者Thread-1 消费玩具toy 6
消费者Thread-1 消费玩具toy 7
消费者Thread-1 消费玩具toy 8
消费者Thread-1 消费玩具toy 9
Process finished with exit code 0
网友评论