美文网首页
Java生产者消费者模型简析

Java生产者消费者模型简析

作者: imkobedroid | 来源:发表于2020-04-28 11:55 被阅读0次

    前言

    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

    image

    java中最基本的生产者消费者模型是用notify跟wait实现的。

    加锁机制的缺陷

    在Java程序中,synchronized解决了多线程竞争的问题。当一个加锁函数执行完成后会自动释放锁,例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁,但是synchronized并没有解决多线程协调的问题。

    class TaskQueue {
        Queue<String> queue = new LinkedList<>();
    
        public synchronized void addTask(String s) {
            this.queue.add(s);
        }
    
        public synchronized String getTask() {
            while (queue.isEmpty()) {
            }
            return queue.remove();
        }
    }
    

    上面这段代码对两个方法进行了加锁,但是由于getTask()方法加锁后,如果queue对象中一直没有对象就会形成死循环,锁得不到释放!其他线程也不行拿到锁进行下一步操作,这样的问题在多线程中经常发生,java为了解决这样的问题所以进行了线程的唤醒沉睡操作!

    可以将方法进行稍作修改变为:

    public synchronized String getTask() {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
    

    其中 this.wait() 方法的意思是:

            wait()让当前线程进入等待状态,同时,wait()也会让当前线程释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)
    

    所以这里调用方法后,如果queue中是空的会将此线程进行沉睡,释放他们的锁!但是释放后谁怎么去通知这个线程又可以继续进行了呢?这个时候就需要notify或者notifyAll方法了!

    所以我们再将添加方法进行修改得到:

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notify(); // 唤醒在this锁等待的线程
    }
    

    其中 this.notify() 方法的意思是:

    notify()和notifyAll()的作用,则是唤醒当前对象上的等待线程;notify()是唤醒单个线程,而notifyAll()是唤醒所有的线程。

    这段代码的意思就是我将一个值放入到queue后,我马上调用notify方法去通知那些进入wait方法的线程可以继续执行了!

    最简单的生产消费

    根据上面的解释,我们完整的代码可以像下面这样:

    
    public class Java_21 {
    
        public static void main(String[] args) throws InterruptedException {
            TaskQueue q = new TaskQueue();
            List<Thread> ts = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
                Thread t = new Thread(() -> {
                    // 执行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                });
                t.start();
                ts.add(t);
            }
            Thread add = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    // 放入task:
                    String s = "t-" + Math.random();
                    System.out.println("add task: " + s);
                    q.addTask(s);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            add.start();
            add.join();
            Thread.sleep(100);
            for (Thread thread : ts) {
                thread.interrupt();
            }
        }
    }
    
    class TaskQueue {
        private Queue<String> stringQueue = new LinkedList<>();
    
        public synchronized void addTask(String s) {
            stringQueue.add(s);
    //        notify()和notifyAll()的作用,则是唤醒当前对象上的等待线程;notify()是唤醒单个线程,而notifyAll()是唤醒所有的线程。
            this.notifyAll();  // 唤醒在this锁等待的所有线程
        }
    
    
        public synchronized String getTask() throws InterruptedException {
            while (stringQueue.isEmpty()) {
                //wait()的作用是让当前线程进入等待状态,同时,wait()也会让当前线程释放它所持有的锁。“直到其他线程调用此对象的 notify() 方法或 notifyAll() 方法”,当前线程被唤醒(进入“就绪状态”)
                this.wait();
            }
            return stringQueue.remove();
        }
    }
    
    

    运行的结果就是:

    add task: t-0.8499205383861345
    execute task: t-0.8499205383861345
    add task: t-0.7942800777561965
    execute task: t-0.7942800777561965
    add task: t-0.6906469783761674
    execute task: t-0.6906469783761674
    add task: t-0.8532616076721191
    execute task: t-0.8532616076721191
    add task: t-0.7939063475258298
    execute task: t-0.7939063475258298
    add task: t-0.658439645359411
    execute task: t-0.658439645359411
    add task: t-0.1836403135968735
    execute task: t-0.1836403135968735
    add task: t-0.868308480446476
    execute task: t-0.868308480446476
    add task: t-0.5658234674527566
    execute task: t-0.5658234674527566
    add task: t-0.8699112304857072
    execute task: t-0.8699112304857072
    
    

    一眼就能看出是添加一个元素后马上取出一个元素。

    对上面的代码进行解释:

    我们有一个TaskQueue类里面有个添加与取出的方法,两个方法都进行了加锁的机制。

    首先我们看取出的getTask()机制:

    我们判断stringQueue是不是空如果是空就调用wait() 方法释放当前线程的锁,然后进入到睡眠状态

    再来看addTask()方法:

    我们向stringQueue中添加一个值,添加好了后我们马上调用notifyAll()方法唤醒签名所有调用了wait()方法的线程进行消费!

    最后再来看main函数的执行:

    我们开启了五个线程进行getTask()的执行,表示取出操作,我们再开启一个线程进行了addTask()操作,我们调用join方法让add线程进行完成后,我们再遍历集合调用interrupt()方法进行所有线程的中断操作

    执行过程:

    add线程中我调用addTask()后添加一个值马上就去调用notifyAll()通知其他五个等待线程去取,至于是哪个线程取到这就是cpu自己的调度了,等取空了后五个线程中又进入了休眠状态,我这边又去添加值,这样重复进行就可以达到一放一取的操作

    生产者消费者扩充

    一般的生产者消费者模型都是给了一个缓冲区的概念,都是添加到缓冲区,等缓冲区到达一个值后生产者就不生产了,然后让消费者去消费,等消费者消费完后又暂停等生产者去生产!跟前言里面的图是一个道理,将我们的代码简单的变化就能达到:

    修改生产者:

      public synchronized void addTask(String s) {
            stringQueue.add(s);
            if (stringQueue.size()==100){
                this.notifyAll();
            }
        }
    

    中间的缓冲区大小就是100.

    最后

    生产者消费者模式我们不是经常用到,但是我们开发中用到的框架很多都是基于这样的线程模型,所以掌握原理后有助于我们开发中解决实际的线程安全交流问题

    补充

    加锁的写法有很多种,例如我将上面的方法不用this加锁,用一个中间常量效果一样,完整代码如下:

    public class Java_21 {
         static final String LOCK = "lock";
    
        public static void main(String[] args) throws InterruptedException {
    
    
            TaskQueue q = new TaskQueue();
            List<Thread> ts = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
                Thread t = new Thread(() -> {
                    // 执行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                });
                t.start();
                ts.add(t);
            }
            Thread add = new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    // 放入task:
                    String s = "t-" + Math.random();
                    System.out.println("add task: " + s);
                    q.addTask(s);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            add.start();
            add.join();
            Thread.sleep(100);
            for (Thread thread : ts) {
                thread.interrupt();
            }
        }
    }
    
    class TaskQueue {
        private Queue<String> stringQueue = new LinkedList<>();
    
        public  void addTask(String s) {
            synchronized(LOCK){
                stringQueue.add(s);
    //        if (stringQueue.size()==100){
    //            LOCK.notifyAll();  // 唤醒在this锁等待的所有线程
    //
    //        }
    
                LOCK.notifyAll();
            }
        }
    
    
        public  String getTask() throws InterruptedException {
            synchronized (LOCK){
                while (stringQueue.isEmpty()) {
                    LOCK.wait();
                }
                return stringQueue.remove();
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Java生产者消费者模型简析

          本文链接:https://www.haomeiwen.com/subject/mplvwhtx.html