美文网首页
AQS(三) 多线程可重入

AQS(三) 多线程可重入

作者: sunyelw | 来源:发表于2020-05-15 22:31 被阅读0次

有道面试题

有一家生产奶酪的厂家,每天需要生产100000份奶酪卖给超市.
通过一辆送货车发货,送货车辆每次送100份。
厂家有一个容量为1000份的冷库,用于奶酪保鲜,生产的奶酪需要先存放在冷库。
运输车辆从冷库取货。
厂家有三条生产线,分别是 牛奶供应生产线、发酵剂制作生产线、奶酪生产线。生产每份奶酪需要2份牛奶和1份发酵剂。
请设计生产系统。


1.分析

  1. 三条生产线加一个送货车就是四个线程
  2. 奶酪生产需要等待牛奶与发酵剂
  3. 送货车需要等待奶酪数量满足一次运送的量才开始运送
  4. 奶酪两种情况下需要停止生产
  • 达到冷藏库数量
  • 达到当天奶酪生产目标

其实这就是一个变种的生产者消费者模式,下面开始实现。

2.实现

传统的 消费者生产者模型 是用Thread的wait+notify实现,这里我想用ReentrantLock来玩,用 Condition 来细化锁,具体代码如下

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MilkOne {

    // 运送容量
    final static int ALL_SIZE = 100;
    // 当前容量
    final static int CURR_CAPACITY = 1000;
    // 总生产量
    final static int ALL_CAPACITY = 10000;

    // 牛奶
    final static AtomicInteger milkInt = new AtomicInteger(0);
    // 发酵剂 - 10000
    final static AtomicInteger starterInt = new AtomicInteger(0);
    // 奶酪 - 10000
    final static AtomicInteger cheeseInt = new AtomicInteger(0);

    static class LockCondition {

        Lock lock = new ReentrantLock();

        // 取货车
        Condition carCondition = lock.newCondition();
        // 牛奶
        Condition milkCondition = lock.newCondition();
        // 发酵剂 - 10000
        Condition starterCondition = lock.newCondition();
        // 奶酪 - 10000
        Condition cheeseCondition = lock.newCondition();

        void lock() { lock.lock(); }

        void unlock() { lock.unlock(); }

        void signalCar() { carCondition.signal(); }

        void awaitCar() throws InterruptedException{ carCondition.await(); }

        void signalMilk() { milkCondition.signal(); }

        void awaitMilk() throws InterruptedException{ milkCondition.await(); }

        void signalStarter() { starterCondition.signal(); }

        void awaitStarter() throws InterruptedException{ starterCondition.await(); }

        void signalCheese() { cheeseCondition.signal(); }

        void awaitCheese() throws InterruptedException{ cheeseCondition.await(); }

    }

    /**
     * 三个生产线程(总共生产10000)
     *
     * 没说牛奶跟发酵剂需要保存,这里不设等待
     *
     * 车取货的线程(每满一百取一次)
     *
     */
    public static void main(String[] args) throws InterruptedException{

        LockCondition lock = new LockCondition();

        // 1.生产牛奶
        Thread milkTh = new Thread(new MilkTh(lock), "milk");
        milkTh.start();

        // 2.生产发酵剂
        Thread starterTh = new Thread(new StarterTh(lock), "starter");
        starterTh.start();

        // 3.生产奶酪
        Thread cheeseTh = new Thread(new CheeseTh(lock), "cheese");
        cheeseTh.start();

        // 4.car
        Thread carTh = new Thread(new CarTh(lock), "car");
        carTh.start();
    }

    // 4.运输车
    static class CarTh implements Runnable {
        LockCondition lock;
        CarTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int p = 0;
            for (;;) {
                lock.lock();
                System.out.println(p + " car lock...");
                try {
                    if (p >= ALL_CAPACITY / ALL_SIZE) {
                        break;
                    }
                    // 1.没满运货车数量
                    if (cheeseInt.get() < ALL_SIZE) {
                        // 1.唤醒生产奶酪
                        lock.signalCheese();
                        // 2.运货车等待
                        lock.awaitCar();
                        continue;
                    }
                    // 可以运送
                    if (cheeseInt.compareAndSet(cheeseInt.get(), cheeseInt.get() - ALL_SIZE)) {
                        System.out.println(p++ + " 运送 cheeseInt: " + cheeseInt.get());
                        // 可以继续生产奶酪
                        lock.signalCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(p + " car unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("car end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get() + ", p:" + p);
        }
    }

    // 3.奶酪
    static class CheeseTh implements Runnable {
        LockCondition lock;
        CheeseTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){

            int z = 0;
            for (;;) {
                lock.lock();
                System.out.println(z + " cheese lock...");
                try {
                    if (z >= ALL_CAPACITY) {
                        break;
                    }
                    if (starterInt.get() < 1) lock.awaitStarter();
                    if (milkInt.get() < 2) lock.awaitMilk();

                    /* 1奶酪 = 2牛奶 + 1发酵剂 */
                    // 2牛奶
                    milkInt.compareAndSet(milkInt.get(), milkInt.get() - 2);
                    // 1发酵剂
                    starterInt.decrementAndGet();
                    // 1奶酪
                    int num = cheeseInt.incrementAndGet();
                    System.out.println(z++ + " 生产了一份 cheeseInt: " + cheeseInt.get());
                    // 唤醒运货车
                    lock.signalCar();
                    // 超过了 1000 份奶酪
                    if (CURR_CAPACITY <= num) {
                        // 停止生产
                        lock.awaitCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(z + " cheese unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("cheese end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 2.发酵剂
    static class StarterTh implements Runnable {
        LockCondition lock;
        StarterTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int y = 0;
            for (;;) {
                lock.lock();
                System.out.println(y + " starter lock...");
                try {
                    if (y >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生产一份
                    starterInt.incrementAndGet();
                    System.out.println(y++ + " 生产了一份 starterInt: " + starterInt.get());
                    lock.signalStarter();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(y + " starter unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("starter end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 1.牛奶
    static class MilkTh implements Runnable {

        LockCondition lock;
        MilkTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int x = 0;
            for (;;) {
                lock.lock();
                System.out.println(x + " milk lock...");
                try {
                    if (x >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生产两份
                    milkInt.addAndGet(2);
                    System.out.println(x++ + " 生产了两份 milkInt: " + milkInt.get());
                    lock.signalMilk();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(x + " milk unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("milk end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }
}

这么写功能看上去是实现了,但其实有一个很严重的问题,那就是四个线程共用了同一把可重入锁,而可重入锁是基于 AQS 的独占模式,也就是同一时间只有一个线程能加锁成功,其他线程只能等待。

而此处,明显四个线程应该是可以同时执行的,比如同时生产牛奶和发酵剂,一边生产奶酪一边运送奶酪等。

因此这里的 ReentrantLock 是不符合要求的。

3. 优化

这里就有两种思路,ReentrantLock 行不通那就换成 AQS 共享模式的实现,不过这里我先用了 ReentrantLock 就不想换了。另一个思路就是改造 ReentrantLock。

独占模式的 ReentrantLock 行不通,我改成共享的 ReentrantLock 可以不?

试试看。

import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class HyLock implements Serializable {

    private Sync sync;

    HyLock(int permits) {
        sync = new Sync(permits);
    }

    public void lock() { sync.lock(); }

    public void unlock() { sync.unlock(); }

    public Condition newCondition() { return sync.newCondition(); }

    static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        Sync(int state) {
            setState(state);
        }

        final void lock() {
            acquireShared(1);
        }

        final void unlock() {
            releaseShared(1);
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        protected final int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        protected boolean tryRelease(int arg) {
            return tryReleaseShared(arg);
        }

        protected boolean tryAcquire(int arg) {
            return tryAcquireShared(arg) >= 0;
        }

        protected boolean isHeldExclusively() {
            return true;
        }
    }
}

然后替换下 LockCondition 中的 ReentrantLock

static class LockCondition {

    HyLock lock;
        
    LockCondition(int permits) {
      lock = new HyLock(permits);
    }
    ...
}

main 方法也改下

LockCondition lock = new LockCondition(4);

然后运行会发现报 lock 空指针,看下 LockCondition 的 class 文件

static class LockCondition {
    HyLock lock;
    Condition carCondition;
    Condition milkCondition;
    Condition starterCondition;
    Condition cheeseCondition;

    LockCondition(int permits) {
        this.carCondition = this.lock.newCondition();
        this.milkCondition = this.lock.newCondition();
        this.starterCondition = this.lock.newCondition();
        this.cheeseCondition = this.lock.newCondition();
        this.lock = new HyLock(permits);
    }

对比下 java 文件

static class LockCondition {

    HyLock lock;

    LockCondition(int permits) {
        lock = new HyLock(permits);
    }

    Condition carCondition = lock.newCondition();
    Condition milkCondition = lock.newCondition();
    Condition starterCondition = lock.newCondition();
    Condition cheeseCondition = lock.newCondition();

JIT 会将下面四个赋值操作放到唯一构造方法里面,且放在最上面执行,而此时 lock 还没有初始化所以会报 NPE,所以需要调整一下 LockCondition 类

static class LockCondition {
    HyLock lock;
    // 取货车
    Condition carCondition;
    // 牛奶
    Condition milkCondition;
    // 发酵剂 - 10000
    Condition starterCondition;
    // 奶酪 - 10000
    Condition cheeseCondition;

    LockCondition(int permits) {
        lock = new HyLock(permits);
        carCondition = lock.newCondition();
        milkCondition = lock.newCondition();
        starterCondition = lock.newCondition();
        cheeseCondition = lock.newCondition();
    }
    ...
}

这样就行了。整体上来看,ReentrantLock 从单线程变成了多线程

  1. 相关的数据修改(AtomicInteger)都是原子的所以不会造成数据问题
  2. AQS 的共享模式参照 Semaphore 实现

但遗憾的是,这种改造是有问题的。

如果你看过 ConditionObject 的实现,会发现其内部实现完全没有考虑并发风险,因为其实际场景都是先获取到了锁再执行条件队列(condition queue)的操作(入队、出队),若想真正实现一个多线程版 ReentrantLock,我们还需要重新写一个并发安全的 Condition 实现来替代 AQS 自身的 ConditionObject。

有个佐证就是,JDK 提供的共享锁实现中 newCondition 方法(继承自 java.util.concurrent.locks.Lock 接口)均为不支持,比如 ReentrantReadWriteLock 的 read 锁

public Condition newCondition() {
  throw new UnsupportedOperationException();
}

另外考虑

  1. 是否只能通过等待时间来控制生产速度 or 运送速度?如何设计?
  2. 有些地方是否可以换 CountdownLatch / Semaphore / CyclicBarrier 实现?
  3. 冷库这块是否充分实现了题意?若没有,如何改?

剑仙三尺剑,杯中二两酒。齐活。

相关文章

网友评论

      本文标题:AQS(三) 多线程可重入

      本文链接:https://www.haomeiwen.com/subject/ewghohtx.html