美文网首页
用wait/notifyall来实现多个生产者和消费者

用wait/notifyall来实现多个生产者和消费者

作者: 就这些吗 | 来源:发表于2019-12-27 00:49 被阅读0次

    在这里先放一些知识点:
    1.wait()、notify()和notifyAll()都是Object的方法,都会释放锁。

    为什么这些操作线程的方法要定义在object类中呢?
    简单说:因为synchronized中的这把锁可以是任意对象,所以任意对象都可以调用wait()和notify();所以wait和notify属于Object。

    2.如果生产者和消费者都是用同一个锁对象,当某个消费线程消费完了,使用 notify 或者 notifyAll 唤醒其他线程。其实我们的本意唤醒生产线程就可以了,但是 notifyAll 会去唤醒所有线程,包括消费线程,这样的话将会导致很多的无用功,浪费系统资源。如果不用 notifyAll,而把上面的代码改成 notify 的话,那会使用生产/消费速度变得奇慢无比,因为notify是随机唤醒的
    解决方案:

    一般的做法是修改一个特定标识,然后notifyAll,被唤醒的线程查看该标识是否指定自己处理,是就运行下去,不是就继续wait
    还有一种做法是针对每一个线程做一个wait object,要唤醒哪一个就notify哪一个object就行了,但是能不能这样做去取决于你的业务需求

    生产者:

    
    import java.util.Random;
    import java.util.Vector;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Producer implements Runnable {
    
        // true--->生产者一直执行,false--->停掉生产者
        private volatile boolean isRunning = true;
    
        // 公共资源
        private final Vector sharedQueue;
    
        // 公共资源的最大数量
        private final int SIZE;
    
        // 生产数据
        private static AtomicInteger count = new AtomicInteger();
    
        public Producer(Vector sharedQueue, int SIZE) {
            this.sharedQueue = sharedQueue;
            this.SIZE = SIZE;
        }
    
        @Override
        public void run() {
            int data;
            Random r = new Random();
    
            System.out.println("start producer id = " + Thread.currentThread().getId());
            try {
                while (isRunning) {
                    // 模拟延迟
                    Thread.sleep(r.nextInt(1000));
    
                    // 当队列满时阻塞等待
                    while (sharedQueue.size() == SIZE) {
                        synchronized (sharedQueue) {
                            System.out.println("Queue is full, producer " + Thread.currentThread().getId()
                                    + " is waiting, size:" + sharedQueue.size());
                            sharedQueue.wait();
                        }
                    }
    
                    // 队列不满时持续创造新元素
                    synchronized (sharedQueue) {
                        // 生产数据
                        data = count.incrementAndGet();
                        sharedQueue.add(data);
    
                        System.out.println("producer create data:" + data + ", size:" + sharedQueue.size());
                        sharedQueue.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupted();
            }
        }
    
        public void stop() {
            isRunning = false;
        }
    }
    

    消费者:

    
    import java.util.Random;
    import java.util.Vector;
    
    public class Consumer implements Runnable {
    
        // 公共资源
        private final Vector sharedQueue;
    
        public Consumer(Vector sharedQueue) {
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
    
            Random r = new Random();
    
            System.out.println("start consumer id = " + Thread.currentThread().getId());
            try {
                while (true) {
                    // 模拟延迟
                    Thread.sleep(r.nextInt(1000));
    
                    // 当队列空时阻塞等待
                    while (sharedQueue.isEmpty()) {
                        synchronized (sharedQueue) {
                            System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()
                                    + " is waiting, size:" + sharedQueue.size());
                            sharedQueue.wait();
                        }
                    }
                    // 队列不空时持续消费元素
                    synchronized (sharedQueue) {
                        System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size());
                        sharedQueue.notifyAll();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
    

    主方法

    
    import java.util.Vector;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Test2 {
    
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1.构建内存缓冲区
            Vector sharedQueue = new Vector();
            int size = 4;
    
            // 2.建立线程池和线程
            ExecutorService service = Executors.newCachedThreadPool();
            Producer prodThread1 = new Producer(sharedQueue, size);
            Producer prodThread2 = new Producer(sharedQueue, size);
            Producer prodThread3 = new Producer(sharedQueue, size);
            Consumer consThread1 = new Consumer(sharedQueue);
            Consumer consThread2 = new Consumer(sharedQueue);
            Consumer consThread3 = new Consumer(sharedQueue);
            service.execute(prodThread1);
            service.execute(prodThread2);
            service.execute(prodThread3);
            service.execute(consThread1);
            service.execute(consThread2);
            service.execute(consThread3);
    
            // 3.睡一会儿然后尝试停止生产者(结束循环)
            Thread.sleep(10 * 1000);
            prodThread1.stop();
            prodThread2.stop();
            prodThread3.stop();
    
            // 4.再睡一会儿关闭线程池
            Thread.sleep(3000);
    
            // 5.shutdown()等待任务执行完才中断线程(因为消费者一直在运行的,所以会发现程序无法结束)
            service.shutdown();
    
    
        }
    }
    

    参考:【面试必备】手撕代码,你怕不怕?

    相关文章

      网友评论

          本文标题:用wait/notifyall来实现多个生产者和消费者

          本文链接:https://www.haomeiwen.com/subject/urrroctx.html