美文网首页多线程并发编程系列教程
5. Communication of Thread(线程通信)

5. Communication of Thread(线程通信)

作者: Vander1991 | 来源:发表于2019-11-02 14:22 被阅读0次

    5.1 线程的通信方式

    有时候我们需要进行线程间通信,如简单的生产者消费者模式中,生产者生产完产品,需要通知消费者去消费产品,这就是一个最简单的线程通信的模型。想实现多个线程之间的协同,一个线程需要获取另一个线程的执行结果,线程执行的先后顺序这样的需求都需要用到线程通信。


    线程通信图示

    线程间通信的常用方式有以下几种:
    1)文件共享
    2)网络共享
    3)共享变量
    4)通过JDK提供的线程协调API:wait/notify、 park/unpark、suspend/resume(已废弃)
    首先是通过文件共享的例子,一个线程往文件里面写消息,另一个线程从文件中读取内容,以文件作为介质来进行线程间通信。

    5.2 线程的通信实战演练

    5.2.1 文件为中介的线程通信

    public class FileCommunicate {
    
        public static void main(String args[]) {
            createWriteThread();
            createReadThread();
        }
    
        /**
         * 创建写文件线程
         */
        public static void createWriteThread() {
            new Thread(() -> {
                int runTime = 10;
                while(runTime-- > 0){
                    File file = new File("05-communication-of-thread/src/main/resources/fileCommunicate.log");
                    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                    String now = String.format("当前时间:%s", df.format(new Date()));
                    try {
                        FileUtils.writeStringToFile(file, now, "UTF-8");
                        Thread.sleep(1000);
                    } catch (IOException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        /**
         * 创建读文件线程
         */
        public static void createReadThread() {
            new Thread(() -> {
                int runTime = 10;
                while(runTime-- > 0){
                    File file = new File("01-high-performance-program/05-communication-of-thread/src/main/resources/fileCommunicate.log");
                    String now = String.format("当前时间:%s", String.valueOf(System.currentTimeMillis()));
                    try {
                        String content = null;
                        while((content = FileUtils.readFileToString(file)) != null){
                            System.out.println(content);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
    }
    

    5.2.2 变量为中介的线程通信

    public class VariableCommunicate {
    
        private static String content = "";
    
        private static final int runTimes = 10;
    
        public static void main(String args[]){
            writeTread();
            readTread();
        }
    
        /**
         * 写线程
         */
        public static void writeTread(){
            new Thread(()->{
                try {
                    for(int i =0; i<runTimes; i++){
                        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                        content = String.format("当前时间:%s", df.format(new Date()));
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            ).start();
        }
        /**
         * 读线程
         */
        public static void readTread(){
            new Thread(()->{
                try {
                    for(int i =0; i<runTimes; i++){
                        Thread.sleep(1000);
                        System.out.println(content);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
    }
    

    5.2.3 JDK API 作为线程通信

    5.2.3.1 suspend-resume(已废弃)

    1)正常的Demo

    public class SuspendDemo {
    
        private static BlockingDeque<Bread> queue = new LinkedBlockingDeque<Bread>();
    
        public static void main(String args[]) {
            try {
                Thread consumeTread = consumeTread();
                Thread.sleep(1000);
                producerTread(consumeTread);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 正常的生产者线程
         */
        public static void producerTread(Thread consumerThread) {
            System.out.println("启动生产者线程:");
            new Thread(() -> {
                Bread bread = new Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("恢复消费者线程:");
                consumerThread.resume();
            }).start();
        }
    
        /**
         * 正常的消费者线程
         */
        public static Thread consumeTread() {
            Thread consumer = new Thread(() -> {
                // take不到东西说明面包还没生产好
                Bread bread = null;
                System.out.println("消费者线程被挂起!");
                Thread.currentThread().suspend();
                if ((bread = queue.poll()) != null) {
                    System.out.println("\t获取到了面包:" + bread);
                } else {
                    System.out.println("\t依旧获取不到面包");
                }
            });
            System.out.println("启动消费者线程:");
            consumer.start();
            return consumer;
        }
    
        @Getter
        @Setter
        @AllArgsConstructor
        @ToString
        protected  static class Bread {
            private String name;
            private float price;
        }
    
    }
    

    运行结果:

    运行结果

    2)会产生死锁的Demo

    public class SuspendDeadLockDemo {
        private static BlockingDeque<SuspendDemo.Bread> queue = new LinkedBlockingDeque<SuspendDemo.Bread>();
    
        public static void main(String args[]) throws Exception {
            Thread consumeTread = consumerThreadDeadLock();
            Thread.sleep(1000);
            producerThreadDeadLock(consumeTread);
        }
    
        /**
         * 死锁的suspend/resume。 suspend并不会像wait一样释放锁,故此容易写出死锁代码
         */
        public static Thread consumerThreadDeadLock() throws Exception {
            // 启动线程
            Thread consumerThread = new Thread(() -> {
                System.out.println("消费者线程被挂起!");
                // 当前线程拿到锁,然后挂起
                synchronized (queue) {
                    Thread.currentThread().suspend();
                }
                SuspendDemo.Bread bread = null;
                if ((bread = queue.poll()) != null) {
                    System.out.println("\t获取到了面包:" + bread);
                } else {
                    System.out.println("\t依旧获取不到面包");
                }
            });
            consumerThread.start();
            return consumerThread;
        }
    
        public static void producerThreadDeadLock(Thread consumerThread) throws Exception {
            System.out.println("启动生产者线程:");
            new Thread(() -> {
                SuspendDemo.Bread bread = new SuspendDemo.Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("恢复消费者线程:");
                synchronized (queue) {
                    consumerThread.resume();
                }
            }).start();
        }
    
    }
    

    运行结果:同样是可以看到程序一直挂在那里没有运行完

    运行结果
    suspend/resume会被废弃掉,就是因为有两种情况下会造成死锁:
    1、像下面的代码那样,如果消费者线程挂起前已经持有了锁,然后消费者线程挂起,后面的生产者线程需要在获取到该锁的情况下才能让消费者线程resume,这样生产者线程就会一直获取不到这个锁(因为此时锁已经被消费者线程持有了,而且消费者线程被挂起且它不会释放掉当前被它占用着的锁,叫醒它的条件就是别的线程要拿到这把锁)
    造成死锁的图示
    2、造成死锁的第二种情况就是,假设生产者先把消费者线程先唤醒了,然后消费者线程去获取面包,但是此时生产者线程还没生产完面包,这种情况就会导致后面消费者线程没有人唤醒了,这样造成了resume没有起到它应有的作用(因为resume的时候消费者线程还没开始suspend,编程思想中称为“错失的信号”
    产生死锁的示例代码:
    T1:
        synchronized(sharedMonitor) {
            // 执行一段使得someCondition为false的代码
            consumerThread.resume();
        }
    T2:
        while(someCondition){
            // 在此处进行线程切换,切换到上面的线程
             synchronized(sharedMonitor){
                consumerThread.suspend();
            }
        }
    

    首先T2是消费者线程先运行起来,此时的someCondition为true,T2进入了while块中,然后调度器又调回给T1执行,T1已经将someCondition置为false,也就是说此时消费者线程已经不能被挂起了,但是由于此时已经判断过了someCondition,所以消费者线程会被挂起。(一般的解决方式是保证someCondition的判断和线程挂起是在一个同步块中就能防止由于后期判断条件已经变化导致不正确的结果,但是由于消费者线程挂起是不释放锁的,所以将标黄部分移到while外面也无法解决此问题,所以这种方式容易产生死锁)

    3)会产生死锁的Demo2

    public class SuspendDeadLockDemo2 {
        private static BlockingDeque<SuspendDemo.Bread> queue = new LinkedBlockingDeque<SuspendDemo.Bread>();
    
        public static void main(String args[]) throws Exception {
            Thread consumeTread = consumerThreadDeadLock2();
            producerThreadDeadLock2(consumeTread);
        }
    
        /**
         * 生产者线程死锁
         *
         * @param consumerThread
         * @throws Exception
         */
        public static void producerThreadDeadLock2(Thread consumerThread) throws Exception {
            System.out.println("生产者线程启动:");
            new Thread(() -> {
                synchronized (queue) {
                    SuspendDemo.Bread bread = new SuspendDemo.Bread("bread", 1.0f);
                    System.out.println("\t生产面包:" + bread);
                    queue.add(bread);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("恢复消费者线程:");
                    consumerThread.resume();
                }
            }).start();
        }
    
        /**
         * 消费者线程死锁
         *
         * @return
         * @throws Exception
         */
        public static Thread consumerThreadDeadLock2() throws Exception {
            // 启动线程
            Thread consumerThread = new Thread(() -> {
                System.out.println("消费者线程启动!");
                // 当前线程拿到锁,然后挂起
                SuspendDemo.Bread bread = null;
                while ((bread = queue.poll()) == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (queue) {
                        Thread.currentThread().suspend();
                        System.out.println("\t获取到了面包:" + bread);
                    }
                }
            });
            consumerThread.start();
            return consumerThread;
        }
    
    }
    

    运行结果:可以看到程序一直挂在那运行不完

    image.png

    5.2.3.2 wait-notify

    wait-notify跟上述5.2.3.1中描述的方式最大的区别在于wait的时候,会释放掉当前所占有的锁,并且wait-notify需要在同步代码块中执行(也就是要在synchronized块中执行)

    1)正常的Demo

    public class WaitDemo {
    
        private static BlockingDeque<Bread> queue = new LinkedBlockingDeque<Bread>();
    
        public static void main(String args[]) {
            try {
                consumeTread();
                Thread.sleep(1000);
                producerTread();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 正常的生产者线程
         */
        public static void producerTread() {
            System.out.println("启动生产者线程:");
            new Thread(() -> {
                synchronized (queue) {
                    Bread bread = new Bread("bread", 1.0f);
                    System.out.println("\t生产面包:" + bread);
                    queue.add(bread);
                    System.out.println("恢复消费者线程:");
                    queue.notifyAll();
                }
            }).start();
        }
    
        /**
         * 正常的消费者线程
         */
        public static Thread consumeTread() {
            Thread consumer = new Thread(() -> {
                synchronized (queue) {
                    // take不到东西说明面包还没生产好
                    Bread bread = null;
                    System.out.println("消费者线程被挂起!");
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if ((bread = queue.poll()) != null) {
                        System.out.println("\t获取到了面包:" + bread);
                    } else {
                        System.out.println("\t依旧获取不到面包");
                    }
                }
            });
            System.out.println("启动消费者线程:");
            consumer.start();
            return consumer;
        }
    
        @Getter
        @Setter
        @AllArgsConstructor
        @ToString
        protected static class Bread {
            private String name;
            private float price;
        }
    
    }
    

    运行结果:

    运行结果

    2)会产生死锁的Demo

    public class WaitDeadLockDemo {
        private static BlockingDeque<WaitDemo.Bread> queue = new LinkedBlockingDeque<>();
    
        public static void main(String args[]) throws Exception {
            consumerThreadDeadLock();
            producerThreadDeadLock();
        }
    
        /**
         * 生产者线程死锁
         *
         * @throws Exception
         */
        public static void producerThreadDeadLock() throws Exception {
            System.out.println("生产者线程启动:");
            new Thread(() -> {
                synchronized (queue) {
                    WaitDemo.Bread bread = new WaitDemo.Bread("bread", 1.0f);
                    System.out.println("\t生产面包:" + bread);
                    queue.add(bread);
                    System.out.println("恢复消费者线程:");
                    queue.notifyAll();
                }
            }).start();
        }
    
        /**
         * 消费者线程死锁
         *
         * @return
         * @throws Exception
         */
        public static Thread consumerThreadDeadLock() throws Exception {
            // 启动线程
            Thread consumerThread = new Thread(() -> {
                System.out.println("消费者线程启动!");
                // 当前线程拿到锁,然后挂起
                WaitDemo.Bread bread = null;
                while ((bread = queue.poll()) == null) {
                    try {
                        Thread.sleep(1000);
                        synchronized (queue) {
                            queue.wait();
                            System.out.println("\t获取到了面包:" + bread);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            });
            consumerThread.start();
            return consumerThread;
        }
    
    }
    

    运行结果:

    运行结果
    这种死锁的产生原因就是因为错失的信号,后面面包生产出来之后消费者线程已经不需要被挂起了,解决方式是保证someCondition的判断和线程挂起是在一个同步块中就能防止由于后期判断条件已经变化导致不正确的结果(简单地说就是确保消费者在面包生产前先进入阻塞等待状态)
    public static Thread consumerThreadDeadLock() throws Exception {
        // 启动线程
        Thread consumerThread = new Thread(() -> {
            System.out.println("消费者线程启动!");
            // 当前线程拿到锁,然后挂起
            WaitDemo.Bread bread = null;
            while ((bread = queue.poll()) == null) {
                try {
                    Thread.sleep(1000);
                    synchronized (queue) {//将这个移到while外面,消除条件的竞争
                        queue.wait();
                        System.out.println("\t获取到了面包:" + bread);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
        });
        consumerThread.start();
        return consumerThread;
    }
    

    3)会产生死锁的Demo的解决方案

    public class WaitDeadLockResolveDemo {
        private static volatile BlockingDeque<WaitDemo.Bread> queue = new LinkedBlockingDeque<>();
    
        public static void main(String args[]) throws Exception {
            consumerThreadDeadLock();
            producerThreadDeadLock();
        }
    
        /**
         * 生产者线程死锁
         *
         * @throws Exception
         */
        public static void producerThreadDeadLock() throws Exception {
            System.out.println("生产者线程启动!");
            new Thread(() -> {
                synchronized (queue) {
                    WaitDemo.Bread bread = new WaitDemo.Bread("bread", 1.0f);
                    System.out.println("\t生产面包:" + bread);
                    queue.add(bread);
                    System.out.println("恢复消费者线程!");
                    queue.notifyAll();
                }
            }).start();
        }
    
        /**
         * 消费者线程死锁
         *
         * @return
         * @throws Exception
         */
        public static Thread consumerThreadDeadLock() throws Exception {
            // 启动线程
            Thread consumerThread = new Thread(() -> {
                System.out.println("消费者线程启动!");
                // 当前线程拿到锁,然后挂起
                WaitDemo.Bread bread = null;
                synchronized (queue) {
                    while ((bread = queue.poll()) == null) {
                        try {
                            Thread.sleep(1000);
                            // 消费者线程挂起
                            System.out.println("消费者线程挂起!");
                            queue.wait();
                            bread = queue.poll();
                            System.out.println("\t获取到了面包:" + bread);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            consumerThread.start();
            return consumerThread;
        }
    
    }
    

    运行结果:(这里没有运行完是因为获取了面包之后又判断队列为null,然后又挂起了消费者线程,继续消费面包)

    运行结果

    5.2.3.3 park-unpark

    park-unpark的限制更小,它不要求在同步代码块中,并且它也不用考虑“错失的信号”的情况,你先在线程T2进行unpark,再对线程T1进行park,线程T1仍然能被唤醒。
    线程调用park等待“许可”,unpark方法为指定线程提供“许可(permit)”,多次调用unpark之后再调用park,线程会直接运行,但不会叠加,即连续多次调用park,第一次会拿到许可直接运行,后续调用会进入等待。

    1)正常的Demo

    public class ParkDemo {
    
        private static BlockingDeque<ParkDemo.Bread> queue = new LinkedBlockingDeque<>();
    
        public static void main(String args[]) {
            Thread consumerThread = consumeTread();
            producerTread(consumerThread);
            System.out.println("启动消费者线程!");
            consumerThread.start();
        }
    
        /**
         * 正常的生产者线程
         */
        public static void producerTread(Thread consumerThread) {
            System.out.println("启动生产者线程!");
            new Thread(() -> {
                ParkDemo.Bread bread = new ParkDemo.Bread("bread", 1.0f);
                System.out.println("\t生产面包:" + bread);
                queue.add(bread);
                System.out.println("生产者线程唤醒消费者线程!");
                LockSupport.unpark(consumerThread);
            }).start();
        }
    
        /**
         * 正常的消费者线程
         */
        public static Thread consumeTread() {
            Thread consumer = new Thread(() -> {
                // take不到东西说明面包还没生产好
                ParkDemo.Bread bread = null;
                System.out.println("消费者线程被挂起!");
                LockSupport.park();
                System.out.println("消费者线程被唤醒!");
                if ((bread = queue.poll()) != null) {
                    System.out.println("\t获取到了面包:" + bread);
                } else {
                    System.out.println("\t依旧获取不到面包");
                }
            });
            return consumer;
        }
    
        @Getter
        @Setter
        @AllArgsConstructor
        @ToString
        protected static class Bread {
            private String name;
            private float price;
        }
    
    }
    

    运行结果:(这里是先执行了unpark后面再执行park的操作)

    运行结果

    5.3 伪唤醒

    在编写线程通信相关的代码时,不能使用if语句来进行判断是否进入等待状态,这是错误的写法!官方建议应该在循环中检查等待条件,原因是处于等待状态的线程可能会受到错误警报和伪唤醒,如果不在循环中检查等待条件,程序就会在没有满足结束条件的情况下退出。
    相当于应该这么写:

        // wait
        synchronized(obj) {
            while(<条件判断>)
                obj.wait();
            // … …后续操作
        }
        // park
        while(<条件判断>) {
            LockSupport.park();
            // … …后续操作
        }
    
    

    相关文章

      网友评论

        本文标题:5. Communication of Thread(线程通信)

        本文链接:https://www.haomeiwen.com/subject/qpncbctx.html