美文网首页
生产者消费者模型

生产者消费者模型

作者: arkliu | 来源:发表于2022-12-13 09:00 被阅读0次

    传统的生产者消费者模型

    使用两个线程,操作同一个变量,一个进行+1操作,另一个进行-1操作。

    public class ThreadTest3 {
    
        public static void main(String[] args) {
            Data data = new Data();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程1").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程2").start();
        }
    }
    
    // 判断等待  业务  通知
    class Data {
        private int num = 0;
        
        // +1操作
        public synchronized void increment() throws InterruptedException {
            while (num != 0) {
                this.wait(); //等待
            }
            num++;
            System.out.println(Thread.currentThread().getName()+"==>"+num);
            this.notifyAll(); // 通知其他线程
        }
        
        // -1操作
        public synchronized void decrement() throws InterruptedException {
            while (num != 1) { 
                this.wait();//等待
            }
            num--;
            System.out.println(Thread.currentThread().getName()+"==>"+num);
            this.notifyAll(); // 通知其他线程
        }
    }
    
    image.png

    Lock版本生产者消费者

    public class ThreadTest3 {
    
        public static void main(String[] args) {
            Data data = new Data();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程1").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程2").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程3").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程4").start();
        }
    }
    
    // 判断等待  业务  通知
    class Data {
        private int num = 0;
        private Lock mLock = new ReentrantLock();
        private Condition mCondition = mLock.newCondition();
        // +1操作
        public void increment() throws InterruptedException {
            
            try {
                mLock.lock(); // 1.上锁
                while (num != 0) {
                    mCondition.await();// 2.等待
                }
                num++;
                System.out.println(Thread.currentThread().getName()+"==>"+num);
                mCondition.signalAll();// 3.通知其他线程
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                mLock.unlock(); // 4.解锁
            }
        }
        
        // -1操作
        public void decrement() throws InterruptedException {
            try {
                mLock.lock();
                while (num != 1) { 
                    mCondition.await();//等待
                }
                num--;
                System.out.println(Thread.currentThread().getName()+"==>"+num);
                mCondition.signalAll();// 通知其他线程
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                mLock.unlock();
            }
        }
    }
    
    image.png

    上面一次唤醒所有线程,所有的线程都去共同争抢资源,导致线程执行顺序是无序的,而不是我们想看到的(线程1, 线程2,线程2,线程4 依次执行的效果)。

    使用Condition实现精确唤醒

    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ThreadTest4 {
    
        public static void main(String[] args) {
            Data2 data2 = new Data2();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data2.printA();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程1").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data2.printB();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程2").start();
            new Thread(()->{for(int i=0; i < 10; i++) {try {
                data2.printC();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }}},"线程3").start();
        }
    }
    
    // 判断等待  业务  通知
    class Data2 {
        private int num = 1; // 1A  2B  3C
        private Lock mLock = new ReentrantLock();
        private Condition mCondition1 = mLock.newCondition();
        private Condition mCondition2 = mLock.newCondition();
        private Condition mCondition3 = mLock.newCondition();
        public void printA() throws InterruptedException {
            
            try {
                mLock.lock(); // 1.上锁
                while (num != 1) {
                    mCondition1.await();// 2.等待
                }
                System.out.println(Thread.currentThread().getName()+"==AAAAA");
                num = 2;
                mCondition2.signal();// 3.通知mCondition2 await的线程
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                mLock.unlock(); // 4.解锁
            }
        }
        
        public void printB() throws InterruptedException {
            
            try {
                mLock.lock(); // 1.上锁
                while (num != 2) {
                    mCondition2.await();// 2.等待
                }
                System.out.println(Thread.currentThread().getName()+"==BBBBB");
                num = 3;
                mCondition3.signal();// 3.通知mCondition3 await的线程
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                mLock.unlock(); // 4.解锁
            }
        }
        
        public void printC() throws InterruptedException {
            
            try {
                mLock.lock(); // 1.上锁
                while (num != 3) {
                    mCondition3.await();// 2.等待
                }
                System.out.println(Thread.currentThread().getName()+"==CCCCC");
                num = 1;
                mCondition1.signal();// 3.通知mCondition1 await的线程
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                mLock.unlock(); // 4.解锁
            }
        }
    }
    
    image.png

    相关文章

      网友评论

          本文标题:生产者消费者模型

          本文链接:https://www.haomeiwen.com/subject/uxbytdtx.html