美文网首页
Java并发编程基础-线程间通信

Java并发编程基础-线程间通信

作者: 菠萝丶丶 | 来源:发表于2020-07-01 22:05 被阅读0次

    线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

    volatile和synchronized关键字

    Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

    以上引用自《Java并发编程的艺术》,同时也说明了我们在 Java并发机制的底层实现原理-volatile 中所提到的那个问题,为什么另一个线程会无法跳出循环。而volatile关键字用来修饰字段或成员变量,就是告知程序该变量在任何时候都需要从共享内存中获取,并且对他的改变必须同步刷新回共享内存,所以能保证对所有线程访问的可见性。

    关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

    详情参考之前写的另一篇文章 Java并发机制的底层实现原理-synchronized

    等待/通知机制

    一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个 过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模 式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良 好的伸缩性,但是在Java语言中如何实现类似的功能呢?

    最简单的办法是写一个循环,检查是否当前需要执行对应操作,直到满足条件执行,退出循环。

    while (value != desire) {
         Thread.sleep(1000);
    }
    doSomeThing();
    

    上面这段伪代码在条件不满足的时候睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似可以实现所需的功能,但是会存在如下问题:

    • 难以确保及时性,比如无法及时发现条件已经变化,也就是及时性难以保证
    • 难以降低开销,如果将睡眠时间缩短,比如睡眠1毫秒,这样能更迅速的发现条件变化,但是如果条件长时间没有变化,则又会加剧消耗更多的处理器资源,造成无端的浪费

    如何解决以上两个问题?Java通过内置的等待/通知机制能够很好的解决这个矛盾并且实现上述需求。
    下表为等待/通知机制的相关方法:

    方法名称 方法描述
    notify() 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提
    是该线程获取到了对象的锁
    notifyAll() 通知所有等待在该对象上的线程
    wait() 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被
    中断才会返回,需要注意,调用wait()方法后,会释放对象的锁
    wait(long) 超时等待一段时间,这里的参数时间是毫秒也就是等待长达n毫秒,
    如果没有通知就超时返回
    wait(long, int) 对于超时时间更细力度的控制,可以控制到纳秒

    等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

    如下所示代码中,首先启动了waitThread线程,该线程获取到了lock对象的锁,随后调用了wait方法释放锁,等待其他线程通知唤醒自身,随后等待一毫秒启动notifyThread线程,因为在一定情况下两个线程同时启动是有可能后启动的线程先抢到CPU资源而先执行。由于waitThread释放了锁,所以在notifyThread内一定可以获取到锁,随后notifyThread调用notifyAll方法唤醒等待的waitThread,此时waitThread不能立即获取到锁,需要等待notifyThread释放锁时候才可以获取到锁,即在调用notifyAll方法5毫秒之后waitThread重新获取到锁并且结束循环,执行最后的代码,程序结束。

    public class WaitNotify {
    
        static boolean flag = true;
        static Object lock = new Object();
    
        public static void main(String[] args) throws InterruptedException {
            Thread waitThread = new Thread(new Wait(), "WaitThread");
            waitThread.start();
            Thread.sleep(1);
            Thread notifyThread = new Thread(new Notify(), "NotifyThread");
            notifyThread.start();
        }
    
        static class Wait implements Runnable {
            @Override
            public void run() {
                // 加锁,拥有lock对象的锁
                synchronized (lock) {
                    // 当条件不满足时,继续wait,同时释放了lock的锁
                    while (flag) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " flag is true. wait at " + System.currentTimeMillis());
                            lock.wait();
                        } catch (InterruptedException ex) {
                        }
                    }
                    // 条件满足时,跳出循环,执行最后的代码
                    System.out.println(Thread.currentThread().getName() + " flag is false. running at " + System.currentTimeMillis());
                }
            }
        }
    
        static class Notify implements Runnable {
            @Override
            public void run() {
                // 加锁,拥有lock对象的锁
                synchronized (lock) {
                    // 获取lock对象的锁,然后进行通知,通知时不会释放lock的锁,
                    // 知道当前线程释放了lock后,WaitThread才能从wait方法中返回
                    System.out.println(Thread.currentThread().getName() + " hold lock. notify at " + System.currentTimeMillis());
                    lock.notifyAll();
                    flag = false;
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
        
    }
    

    细节如下:

    • 使用wait()notify()notifyAll()时需要先对调用对象加锁
    • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的 等待队列
    • notify()notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()notifAll()的线程释放锁之后,等待线程才有机会从wait()返回
    • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED
    • wait()方法返回的前提是获得了调用对象的锁

    总结

    从上述WaitNotify实例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)
    等待方遵循如下原则:

    • 获取对象的锁
    • 如果条件不满足,那么调用wait()方法等待,被通知后仍要检查条件
    • 条件满足则执行对应的逻辑

    对应伪代码如下:

    synchronized(对象) {
        while(条件不满足) {
            对象.wait();
        }
    }
    

    通知方遵循如下原则:

    • 获得对象的锁
    • 改变条件
    • 通知所有等待在对象上的线程

    对应的伪代码如下:

    synchronized(对象) {
        改变条件
        对象.notifyAll();
    }
    

    管道输入/输出流

    管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
    管道输入/输出流主要包括了如下4种具体实现:PipedOutputStreamPipedInputStreamPipedReaderPipedWriter,前两种面向字节,而后两种面向字符。

    以下代码使用PipedReaderPipedWriter为例,从控制台接收用户输入再通过输入流传递到另一线程中的输出流,从而实现了线程间以流的方式的数据传输:

    public class Piped {
    
        public static void main(String[] args) throws Exception {
            PipedWriter out = new PipedWriter();
            PipedReader in = new PipedReader(); // 将输出流和输入流进行连接,否则在使用时会抛出IOException
            out.connect(in);
            Thread printThread = new Thread(new Print(in), "PrintThread");
            printThread.start();
            int receive = 0;
            try {
                // 从控制台接收输入
                while ((receive = System.in.read()) != -1) {
                    out.write(receive);
                }
            } finally {
                out.close();
            }
        }
    
        static class Print implements Runnable {
            private PipedReader in;
    
            public Print(PipedReader in) {
                this.in = in;
            }
    
            public void run() {
                int receive = 0;
                try {
                    while ((receive = in.read()) != -1) {
                        System.out.print((char) receive);
                    }
                } catch (IOException ex) {
                }
            }
        }
    }
    

    注意对于Piped类型的流,必须先要进行绑定,也就是调用connect()方法,如果没有将输入/输 出流绑定起来,对于该流的访问将会抛出异常。

    Thread.join()的使用

    如果A线程执行了Thread.join(),那么表示当前线程需要等待A线程执行完毕返回之后才会继续执行其后的代码,除了该方法以外,Thread还提供了了join(long millis)join(long millis,int nanos)方法,这两个方法具有超时性质,如果在规定等待时间内A线程未能执行结束,那么程序将从该方法中返回,继续执行其后的代码。

    以下代码为Thread.join()方法的示例:

    public class Join {
    
        public static void main(String[] args) throws InterruptedException {
            Runnable runnableA = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                    }
                }
            };
            Runnable runnableB = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + " run at " + System.currentTimeMillis());
                        Thread.sleep(10);
                        System.out.println(Thread.currentThread().getName() + " end at " + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                    }
                }
            };
    
            Thread threadA = new Thread(runnableA, "Thread-A");
            Thread threadB = new Thread(runnableB, "Thread-B");
            threadA.start();
            threadA.join();
            System.out.println(threadA.getName() + " end at " + System.currentTimeMillis());
            threadB.start();
            threadB.join(5);
            System.out.println(threadB.getName() + " returned at " + System.currentTimeMillis());
        }
    
    }
    

    该方法执行结果如下:

    Thread-A run at 1593665701058
    Thread-A end at 1593665701064
    Thread-B run at 1593665701064
    Thread-B returned at 1593665701070
    Thread-B end at 1593665701075
    

    可以看到A线程从执行到结束经过了6毫秒左右,而从线程B执行到线程B返回也只经过了6毫秒左右,然而实际B线程执行到结束经过了11毫秒左右。这就说明B线程由于等待超时已经从join()方法中返回。

    上面的例子实际执行时间会比我们程序设置的等待时间多出1-2毫秒,这是由于线程等待和唤醒之间的切换所消耗的时间。

    相关文章

      网友评论

          本文标题:Java并发编程基础-线程间通信

          本文链接:https://www.haomeiwen.com/subject/isahqktx.html