有道面试题
有一家生产奶酪的厂家,每天需要生产100000份奶酪卖给超市.
通过一辆送货车发货,送货车辆每次送100份。
厂家有一个容量为1000份的冷库,用于奶酪保鲜,生产的奶酪需要先存放在冷库。
运输车辆从冷库取货。
厂家有三条生产线,分别是 牛奶供应生产线、发酵剂制作生产线、奶酪生产线。生产每份奶酪需要2份牛奶和1份发酵剂。
请设计生产系统。
1.分析
- 三条生产线加一个送货车就是四个线程
- 奶酪生产需要等待牛奶与发酵剂
- 送货车需要等待奶酪数量满足一次运送的量才开始运送
- 奶酪两种情况下需要停止生产
- 达到冷藏库数量
- 达到当天奶酪生产目标
其实这就是一个变种的生产者消费者模式,下面开始实现。
2.实现
传统的 消费者生产者模型 是用Thread的wait+notify实现,这里我想用ReentrantLock来玩,用 Condition 来细化锁,具体代码如下
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MilkOne {
// 运送容量
final static int ALL_SIZE = 100;
// 当前容量
final static int CURR_CAPACITY = 1000;
// 总生产量
final static int ALL_CAPACITY = 10000;
// 牛奶
final static AtomicInteger milkInt = new AtomicInteger(0);
// 发酵剂 - 10000
final static AtomicInteger starterInt = new AtomicInteger(0);
// 奶酪 - 10000
final static AtomicInteger cheeseInt = new AtomicInteger(0);
static class LockCondition {
Lock lock = new ReentrantLock();
// 取货车
Condition carCondition = lock.newCondition();
// 牛奶
Condition milkCondition = lock.newCondition();
// 发酵剂 - 10000
Condition starterCondition = lock.newCondition();
// 奶酪 - 10000
Condition cheeseCondition = lock.newCondition();
void lock() { lock.lock(); }
void unlock() { lock.unlock(); }
void signalCar() { carCondition.signal(); }
void awaitCar() throws InterruptedException{ carCondition.await(); }
void signalMilk() { milkCondition.signal(); }
void awaitMilk() throws InterruptedException{ milkCondition.await(); }
void signalStarter() { starterCondition.signal(); }
void awaitStarter() throws InterruptedException{ starterCondition.await(); }
void signalCheese() { cheeseCondition.signal(); }
void awaitCheese() throws InterruptedException{ cheeseCondition.await(); }
}
/**
* 三个生产线程(总共生产10000)
*
* 没说牛奶跟发酵剂需要保存,这里不设等待
*
* 车取货的线程(每满一百取一次)
*
*/
public static void main(String[] args) throws InterruptedException{
LockCondition lock = new LockCondition();
// 1.生产牛奶
Thread milkTh = new Thread(new MilkTh(lock), "milk");
milkTh.start();
// 2.生产发酵剂
Thread starterTh = new Thread(new StarterTh(lock), "starter");
starterTh.start();
// 3.生产奶酪
Thread cheeseTh = new Thread(new CheeseTh(lock), "cheese");
cheeseTh.start();
// 4.car
Thread carTh = new Thread(new CarTh(lock), "car");
carTh.start();
}
// 4.运输车
static class CarTh implements Runnable {
LockCondition lock;
CarTh(LockCondition lock) { this.lock = lock; }
@Override
public void run(){
int p = 0;
for (;;) {
lock.lock();
System.out.println(p + " car lock...");
try {
if (p >= ALL_CAPACITY / ALL_SIZE) {
break;
}
// 1.没满运货车数量
if (cheeseInt.get() < ALL_SIZE) {
// 1.唤醒生产奶酪
lock.signalCheese();
// 2.运货车等待
lock.awaitCar();
continue;
}
// 可以运送
if (cheeseInt.compareAndSet(cheeseInt.get(), cheeseInt.get() - ALL_SIZE)) {
System.out.println(p++ + " 运送 cheeseInt: " + cheeseInt.get());
// 可以继续生产奶酪
lock.signalCheese();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(p + " car unlock...");
lock.unlock();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("car end..."
+ ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get() + ", p:" + p);
}
}
// 3.奶酪
static class CheeseTh implements Runnable {
LockCondition lock;
CheeseTh(LockCondition lock) { this.lock = lock; }
@Override
public void run(){
int z = 0;
for (;;) {
lock.lock();
System.out.println(z + " cheese lock...");
try {
if (z >= ALL_CAPACITY) {
break;
}
if (starterInt.get() < 1) lock.awaitStarter();
if (milkInt.get() < 2) lock.awaitMilk();
/* 1奶酪 = 2牛奶 + 1发酵剂 */
// 2牛奶
milkInt.compareAndSet(milkInt.get(), milkInt.get() - 2);
// 1发酵剂
starterInt.decrementAndGet();
// 1奶酪
int num = cheeseInt.incrementAndGet();
System.out.println(z++ + " 生产了一份 cheeseInt: " + cheeseInt.get());
// 唤醒运货车
lock.signalCar();
// 超过了 1000 份奶酪
if (CURR_CAPACITY <= num) {
// 停止生产
lock.awaitCheese();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(z + " cheese unlock...");
lock.unlock();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("cheese end..."
+ ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
}
}
// 2.发酵剂
static class StarterTh implements Runnable {
LockCondition lock;
StarterTh(LockCondition lock) { this.lock = lock; }
@Override
public void run(){
int y = 0;
for (;;) {
lock.lock();
System.out.println(y + " starter lock...");
try {
if (y >= ALL_CAPACITY) {
break;
}
// 一次生产一份
starterInt.incrementAndGet();
System.out.println(y++ + " 生产了一份 starterInt: " + starterInt.get());
lock.signalStarter();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(y + " starter unlock...");
lock.unlock();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("starter end..."
+ ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
}
}
// 1.牛奶
static class MilkTh implements Runnable {
LockCondition lock;
MilkTh(LockCondition lock) { this.lock = lock; }
@Override
public void run(){
int x = 0;
for (;;) {
lock.lock();
System.out.println(x + " milk lock...");
try {
if (x >= ALL_CAPACITY) {
break;
}
// 一次生产两份
milkInt.addAndGet(2);
System.out.println(x++ + " 生产了两份 milkInt: " + milkInt.get());
lock.signalMilk();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(x + " milk unlock...");
lock.unlock();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("milk end..."
+ ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
}
}
}
这么写功能看上去是实现了,但其实有一个很严重的问题,那就是四个线程共用了同一把可重入锁,而可重入锁是基于 AQS 的独占模式,也就是同一时间只有一个线程能加锁成功,其他线程只能等待。
而此处,明显四个线程应该是可以同时执行的,比如同时生产牛奶和发酵剂,一边生产奶酪一边运送奶酪等。
因此这里的 ReentrantLock 是不符合要求的。
3. 优化
这里就有两种思路,ReentrantLock 行不通那就换成 AQS 共享模式的实现,不过这里我先用了 ReentrantLock 就不想换了。另一个思路就是改造 ReentrantLock。
独占模式的 ReentrantLock 行不通,我改成共享的 ReentrantLock 可以不?
试试看。
import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
public class HyLock implements Serializable {
private Sync sync;
HyLock(int permits) {
sync = new Sync(permits);
}
public void lock() { sync.lock(); }
public void unlock() { sync.unlock(); }
public Condition newCondition() { return sync.newCondition(); }
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
Sync(int state) {
setState(state);
}
final void lock() {
acquireShared(1);
}
final void unlock() {
releaseShared(1);
}
final ConditionObject newCondition() {
return new ConditionObject();
}
protected final int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
protected boolean tryRelease(int arg) {
return tryReleaseShared(arg);
}
protected boolean tryAcquire(int arg) {
return tryAcquireShared(arg) >= 0;
}
protected boolean isHeldExclusively() {
return true;
}
}
}
然后替换下 LockCondition 中的 ReentrantLock
static class LockCondition {
HyLock lock;
LockCondition(int permits) {
lock = new HyLock(permits);
}
...
}
main 方法也改下
LockCondition lock = new LockCondition(4);
然后运行会发现报 lock 空指针,看下 LockCondition 的 class 文件
static class LockCondition {
HyLock lock;
Condition carCondition;
Condition milkCondition;
Condition starterCondition;
Condition cheeseCondition;
LockCondition(int permits) {
this.carCondition = this.lock.newCondition();
this.milkCondition = this.lock.newCondition();
this.starterCondition = this.lock.newCondition();
this.cheeseCondition = this.lock.newCondition();
this.lock = new HyLock(permits);
}
对比下 java 文件
static class LockCondition {
HyLock lock;
LockCondition(int permits) {
lock = new HyLock(permits);
}
Condition carCondition = lock.newCondition();
Condition milkCondition = lock.newCondition();
Condition starterCondition = lock.newCondition();
Condition cheeseCondition = lock.newCondition();
JIT 会将下面四个赋值操作放到唯一构造方法里面,且放在最上面执行,而此时 lock 还没有初始化所以会报 NPE,所以需要调整一下 LockCondition 类
static class LockCondition {
HyLock lock;
// 取货车
Condition carCondition;
// 牛奶
Condition milkCondition;
// 发酵剂 - 10000
Condition starterCondition;
// 奶酪 - 10000
Condition cheeseCondition;
LockCondition(int permits) {
lock = new HyLock(permits);
carCondition = lock.newCondition();
milkCondition = lock.newCondition();
starterCondition = lock.newCondition();
cheeseCondition = lock.newCondition();
}
...
}
这样就行了。整体上来看,ReentrantLock 从单线程变成了多线程
- 相关的数据修改(AtomicInteger)都是原子的所以不会造成数据问题
- AQS 的共享模式参照 Semaphore 实现
但遗憾的是,这种改造是有问题的。
如果你看过 ConditionObject 的实现,会发现其内部实现完全没有考虑并发风险,因为其实际场景都是先获取到了锁再执行条件队列(condition queue)的操作(入队、出队),若想真正实现一个多线程版 ReentrantLock,我们还需要重新写一个并发安全的 Condition 实现来替代 AQS 自身的 ConditionObject。
有个佐证就是,JDK 提供的共享锁实现中 newCondition 方法(继承自 java.util.concurrent.locks.Lock 接口)均为不支持,比如 ReentrantReadWriteLock 的 read 锁
public Condition newCondition() {
throw new UnsupportedOperationException();
}
另外考虑
- 是否只能通过等待时间来控制生产速度 or 运送速度?如何设计?
- 有些地方是否可以换 CountdownLatch / Semaphore / CyclicBarrier 实现?
- 冷库这块是否充分实现了题意?若没有,如何改?
剑仙三尺剑,杯中二两酒。齐活。
网友评论