美文网首页Java
从生产者/消费者模式角度思考线程间通信

从生产者/消费者模式角度思考线程间通信

作者: 5IYXX | 来源:发表于2017-12-26 15:55 被阅读330次

    多线程中等待/通知模式最经典的案例就是“生产者/消费者”模式。但此模式在使用上有几种“变形”,还有一些小的注意事项,但原理是基于wait/notify的。该模式可以通过操作值来实现。

    一生产与一消费

    代码清单1

    //生产者
    public class P {
        private String lock;
    
        public P(String lock) {
            super();
            this.lock = lock;
        }
    
        public void setValue() {
            try {
                synchronized (lock) {
                    if (!ValueObject.value.equals("")) {
                        lock.wait();
                    }
                    String value = System.currentTimeMillis() + "_" + System.nanoTime();
                    System.out.println("set的值是" + value);
                    ValueObject.value = value;
                    lock.notify();
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //消费者
    public class C {
    
        private String lock;
    
        public C(String lock) {
            super();
            this.lock = lock;
        }
    
        public void getValue() {
            try {
                synchronized (lock) {
                    if (ValueObject.value.equals("")) {
                        lock.wait();
                    }
                    System.out.println("get的值是" + ValueObject.value);
                    ValueObject.value = "";
                    lock.notify();
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    //存储值的对象
    public class ValueObject {
        public static String value = "";
    }
    
    //生产者线程
    public class ThreadP extends Thread {
    
        private P p;
    
        public ThreadP(P p) {
            super();
            this.p = p;
        }
    
        @Override
        public void run() {
            while (true) {
                p.setValue();
            }
        }
    }
    
    //消费者线程
    public class ThreadC extends Thread {
    
        private C r;
    
        public ThreadC(C r) {
            super();
            this.r = r;
        }
    
        @Override
        public void run() {
            while (true) {
                r.getValue();
            }
        }
    }
    
    //运行类
    public class Run {
    
        public static void main(String[] args) {
    
            String lock = new String("");
            P p = new P(lock);
            C r = new C(lock);
    
            ThreadP pThread = new ThreadP(p);
            ThreadC rThread = new ThreadC(r);
    
            pThread.start();
            rThread.start();
        }
    }
    
    

    运行结果

    set的值是1514222349229_7168218698116
    get的值是1514222349229_7168218698116
    set的值是1514222349231_7168221053134
    get的值是1514222349231_7168221053134
    ...
    

    本示例是一个生产者和一个消费者进行数据的交互,在控制台中打印的日志set与get是交替运行的。

    多生产与多消费

    代码清单2

    //生产者
    public class P {
    
        private String lock;
    
        public P(String lock) {
            super();
            this.lock = lock;
        }
    
        public void setValue() {
            try {
                synchronized (lock) {
                    while (!ValueObject.value.equals("")) {
                        System.out.println("生产者 "
                                + Thread.currentThread().getName() + " WAITING了★");
                        lock.wait();
                    }
                    System.out.println("生产者 " + Thread.currentThread().getName()
                            + " RUNNABLE了");
                    String value = System.currentTimeMillis() + "_"
                            + System.nanoTime();
                    ValueObject.value = value;
                    lock.notifyAll();
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //消费者
    public class C {
    
        private String lock;
    
        public C(String lock) {
            super();
            this.lock = lock;
        }
    
        public void getValue() {
            try {
                synchronized (lock) {
                    while (ValueObject.value.equals("")) {
                        System.out.println("消费者"
                                + Thread.currentThread().getName() + " WAITING了☆");
                        lock.wait();
                    }
                    System.out.println("消费者" + Thread.currentThread().getName()
                            + " RUNNABLE了");
                    ValueObject.value = "";
                    lock.notifyAll();
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //生产者线程
    public class ThreadP extends Thread {
    
        private P p;
    
        public ThreadP(P p) {
            super();
            this.p = p;
        }
    
        @Override
        public void run() {
            while (true) {
                p.setValue();
            }
        }
    }
    //消费者x
    public class ThreadC extends Thread {
    
        private C r;
    
        public ThreadC(C r) {
            super();
            this.r = r;
        }
    
        @Override
        public void run() {
            while (true) {
                r.getValue();
            }
        }
    }
    
    public class ValueObject {
    
        public static String value = "";
    }
    
    public class Run {
    
        public static void main(String[] args) throws InterruptedException {
    
            String lock = new String("");
            P p = new P(lock);
            C r = new C(lock);
    
            ThreadP[] pThread = new ThreadP[2];
            ThreadC[] rThread = new ThreadC[2];
    
            for (int i = 0; i < 2; i++) {
                pThread[i] = new ThreadP(p);
                pThread[i].setName("生产者" + (i + 1));
    
                rThread[i] = new ThreadC(r);
                rThread[i].setName("消费者" + (i + 1));
    
                pThread[i].start();
                rThread[i].start();
            }
    
            Thread.sleep(5000);
            Thread[] threadArray = new Thread[Thread.currentThread()
                    .getThreadGroup().activeCount()];
            Thread.currentThread().getThreadGroup().enumerate(threadArray);
    
            for (int i = 0; i < threadArray.length; i++) {
                System.out.println(threadArray[i].getName() + " "
                        + threadArray[i].getState());
            }
        }
    }
    

    运行结果

    生产者 生产者1 RUNNABLE了
    生产者 生产者1 WAITING了★
    消费者消费者1 RUNNABLE了
    消费者消费者1 WAITING了☆
    消费者消费者2 WAITING了☆
    生产者 生产者2 RUNNABLE了
    生产者 生产者2 WAITING了★
    消费者消费者2 RUNNABLE了
    消费者消费者2 WAITING了☆
    消费者消费者1 WAITING了☆
    生产者 生产者1 RUNNABLE了
    生产者 生产者1 WAITING了★
    ...
    

    注意事项

    比较上面两种模式,主要改动了两处。

    • if()判断改为while()循环,重新判断条件,避免逻辑混乱;
    • notify()改为notifyAll()避免进入“假死”状态。

    本文属5IYXX原创,转载请注明(https://www.jianshu.com/p/82f3f4f1cf00)

    相关文章

      网友评论

        本文标题:从生产者/消费者模式角度思考线程间通信

        本文链接:https://www.haomeiwen.com/subject/ancjgxtx.html