美文网首页
生产者消费模式三种

生产者消费模式三种

作者: jiahzhon | 来源:发表于2021-03-23 17:15 被阅读0次
  • 最外层的while是为了执行完一轮不终止

wait()和notifyall()

ublic class ShareDataV1 {

    public static AtomicInteger atomicInteger = new AtomicInteger();
    public volatile boolean flag = true;
    public static final int MAX_COUNT = 10;
    public static final List<Integer> pool = new ArrayList<>();

    public void produce() {
        // 判断,干活,通知
        while (flag) {
            // 每隔 1000 毫秒生产一个商品
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
            synchronized (pool) {
                //池子满了,生产者停止生产
                //TODO 判断
                while (pool.size() == MAX_COUNT) {
                    try {
                        System.out.println("pool is full, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    public void consumue() {
        // 判断,干活,通知
        while (flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            synchronized (pool) {
                //池子空了,消费者停止消费
                while (pool.size() == 0) {
                    try {
                        System.out.println("pool is empty, wating...");
                        pool.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());
                //通知
                pool.notifyAll();
            }
        }
    }

    public void stop() {
        flag = false;
    }
}

lock,condition,signal

public class ShareDataV2 {


    public static AtomicInteger atomicInteger = new AtomicInteger();
    public volatile boolean flag = true;
    public static final int MAX_COUNT = 10;
    public static final List<Integer> pool = new ArrayList<>();

    private Lock lock = new ReentrantLock();
    //也可以一个condition然后signalall
    private Condition produce_condition = lock.newCondition();
    private Condition consumue_condition = lock.newCondition();

    public void produce() {
        // 判断,干活,通知
        while (flag){

            lock.lock();
            try {
                Thread.sleep(100);
                //池子满了,生产者停止生产
                while (pool.size() == MAX_COUNT) {
                    //等待,不能生产
                    produce_condition.await();
                }
                //干活
                pool.add(atomicInteger.incrementAndGet());
                System.out.println("produce number:" + atomicInteger.get() + "\t" + "current size:" + pool.size());

                consumue_condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }

    public void consumue() {
        // 判断,干活,通知
        while (flag) {
            lock.lock();
            try {
                Thread.sleep(1000);
                //池子空了,消费者停止消费
                while (pool.size() == 0) {
                    //等待,不能消费
                    System.out.println("pool is empty, wating...");
                    consumue_condition.await();
                }
                //干活
                int temp = pool.get(0);
                pool.remove(0);
                System.out.println("cousume number:" + temp + "\t" + "current size:" + pool.size());

                produce_condition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

    }

    public void stop() {
        flag = false;
    }
}

阻塞队列

public class ShareDataV3 {

    private static final int MAX_CAPACITY = 10; //阻塞队列容量
    private static BlockingQueue<Integer> blockingQueue= new ArrayBlockingQueue<>(MAX_CAPACITY); //阻塞队列
    private  volatile boolean FLAG = true;
    private AtomicInteger atomicInteger = new AtomicInteger();

    public void produce() throws InterruptedException {
        while (FLAG){
            boolean retvalue = blockingQueue.offer(atomicInteger.incrementAndGet(), 2, TimeUnit.SECONDS);
            if (retvalue==true){
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"成功"+"资源队列大小= " + blockingQueue.size());
            }else {
                System.out.println(Thread.currentThread().getName()+"\t 插入队列"+ atomicInteger.get()+"失败"+"资源队列大小= " + blockingQueue.size());

            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"FLAG变为flase,生产停止");
    }

    public void consume() throws InterruptedException {
        Integer result = null;
        while (true){
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (null==result){
                System.out.println("超过两秒没有取道数据,消费者即将退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t 消费"+ result+"成功"+"\t\t"+"资源队列大小= " + blockingQueue.size());
            Thread.sleep(1500);
        }

    }

    public void stop() {
        this.FLAG = false;
    }
}

三个的统一调用

public class ProducerConsumer_V1 {
    public static void main(String[] args) {
        ShareDataV1 shareDataV1 = new ShareDataV1();
        new Thread(() -> {
            shareDataV1.produce();
        }, "AAA").start();

        new Thread(() -> {
            shareDataV1.consumue();
        }, "BBB").start();

        new Thread(() -> {
            shareDataV1.produce();
        }, "CCC").start();

        new Thread(() -> {
            shareDataV1.consumue();
        }, "DDD").start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        shareDataV1.stop();
    }
}

相关文章

网友评论

      本文标题:生产者消费模式三种

      本文链接:https://www.haomeiwen.com/subject/bxifhltx.html