美文网首页
Java多线程学习之线程间通信

Java多线程学习之线程间通信

作者: Steven1997 | 来源:发表于2018-04-21 19:22 被阅读56次

    线程间通信

    线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对个线程任务在处理的过程中进行更有效的把控与监督。

    等待 / 通知机制

    不使用等待 / 通知机制实现进程间通信

    使用sleep()和while(true)死循环可以实现多个线程之间的通信。
    MyList.java

    ThreadA.java


    ThreadB.java


    Test.java

    等待 / 通知机制的实现



    来看一个例子:
    Test1.java

    运行结果

    分析
    出现异常的原因是没有"对象监视器",即没有获得对象级别的锁,没有在同步方法中或同步块中调用wait()方法。

    Test2.java



    运行结果

    分析
    线程进入了等待阻塞状态,后面的代码无法执行了。

    MyThread1.java

    MyThread2.java


    Test.java

    运行结果

    也可以实现前面的 size() 值等于5的实验
    MyThread.java


    ThreadA.java

    ThreadB.java





    wait()方法

    wait()方法导致持有该对象同步锁的线程进入等待阻塞状态,并释放它持有的同步锁,它有三个重载的方法。线程在调用某对象的wait()之前必须持有该对象的同步锁,wait()方法必须在同步方法或同步代码块中被调用,否则会抛出IllegalMonitorStateException。注意调用objectA对象的wait()方法,必须在objectA对象的同步方法或同步代码块里,阻塞的是持有objectA对象锁的线程,而不可以在objectA对象的同步方法或同步代码块里调用objectB.wait()!
    1.wait()
    导致进入该方法的线程进入等待状态,直到它被通知或者被中断。
    2.wait(long millis) 设定一个超时间隔,如果在规定时间内没有被通知或中断,线程将被唤醒,millis是毫秒数。
    3.wait(long millis,int nanos) 设定一个超时间隔,如果在规定时间内没有被通知或中断,线程将被唤醒,millis是毫秒数,nanos是纳秒数。

    如果在等待阻塞状态线程被中断会抛出一个InterruptedException异常。

    notify()方法

    在同步方法或同步代码块调用该方法后,JVM会随机选择一个在该对象上调用wait方法阻塞的线程,将其从等待池移入锁池。

    notifyAll方法

    在同步方法或同步代码块调用该方法后,会将所有在该对象上调用wait方法阻塞的线程从等待池移入锁池。

    注意:线程被唤醒只是从等待池进入了锁池,可以参与锁的竞争,但并不代表它已经获得了锁。

    方法wait()锁释放与notify()锁不释放

    当方法wait()被执行后,锁被自动释放。但执行完notify()方法,锁不自动释放,要等到执行notify()的线程将程序执行完,也就是退出 synchronized 代码块后,当前线程才会释放锁。

    wait()方法和sleep()方法的区别

    最简单的区别是,wait方法只能用于同步方法或同步代码块(一定要注意调用同步方法或同步代码块持有的锁必须是调用wait()的对象的锁,而不是随便在一个同步代码块里执行wait()就行!调用wait()后被阻塞的是持有该锁的线程!),而sleep方法可以直接调用。而更深层次的区别在于sleep方法只是暂时让出CPU的执行权,并不释放同步锁。而wait方法则会释放锁。sleep()必须捕获异常,wait()不用捕获异常。一个调用了sleep()或wait()方法的线程如果调用interrupt()方法请求中断,都会立即抛出InterruptedException。

    Service.java


    ThreadA.java和ThreadB.java

    Test.java


    运行结果

    可以看到wait()方法释放锁
    如果把wait()方法改为sleep(),就成了同步的效果,这是因为sleep()并不释放锁

    以上实验总结得以下三点

    生产者-消费者模式

    等待 / 通知模式最经典的案例是生产者-消费者模式。该模式有多种变形,有一些需要注意的细节,但原理都基于 wait / notify。

    1.一生产与一消费:操作值

    P.java


    C.java

    ValueObject.java

    ThreadP.java 和 ThreadC.java

    Run.java

    运行结果
    set 和 get 交替打印

    2.多生产与多消费:操作值 - 死锁

    P.java


    C.java


    ThreadP.java 和 ThreadC.java

    Run.java


    运行结果

    分析
    我们发现程序出现了死锁,这是为什么呢?明明已经使用了wait / notify呀?这是因为虽然使用了wait / notify,但不能保证每次唤醒的线程是异类,也许是同类,比如“生产者”唤醒“生产者”,“消费者”唤醒“消费者”,这样的情况积少成多,则所有线程都可能进入WAITING状态,出现死锁。

    解决方法
    把notify方法换成notifyAll方法,将同类和异类一同唤醒即可。

    3.一生产与一消费:操作栈

    本示例是生产者向堆栈List对象中放入数据,消费者从堆栈中取出数据,List最大容量是1。
    MyStack.java


    P.java


    C.java

    ThreadP.java 和 ThreadC.java

    Run.java

    运行结果
    push 和 pop交替打印

    4.一生产与多消费 —— 操作栈:解决 wait 条件改变与死锁

    上面的例子其他程序不变,修改Run.java


    运行结果

    分析
    我们看到程序出现了IndexOutOfBoundsException,此问题的出现是因为 wait 条件发生了变化,而 MyStack.java 使用了 if 语句判断 wait 条件:

    试想有多个线程进入了 if 语句并 wait,当这些线程被唤醒,会相继执行list.remove(0),从而抛出异常,这是因为当第一个线程执行了remove方法之后,wait 条件发生了改变,然而之前已经进入 if 语句等待的线程被唤醒后不会再次检测是否满足 wait 条件,而是直接执行了remove方法,造成了异常。

    解决方法
    把 MyStack.java 中的 if 语句改为 while 语句,让线程不断自查是否需要 wait。同理,要把 notify 改为 notifyAll 来避免死锁。

    总结

    生产者 / 消费者模式主要可能会出现以下问题:
    1)死锁(用 notifyAll 代替 notify 唤醒异类线程)
    2)wait 条件改变(用 while 代替 if 检测 wait 条件)

    实战:等待 / 通知之交叉备份

    创建20个线程,其中10个线程是将数据备份到 A 数据库中,另外10 个线程将数据备份到 B 数据库中,并且备份 A 数据库和 备份 B 数据库的是交叉运行的。


    TaskBackup.java

    /**
     * Description: 当flag=true的时候备份 A 数据库
     *              当flag=false的时候备份 B数据库 以此实现交叉备份
     */
    public class TaskBackup
    {
        volatile private boolean flag=false;
        // 采用volatile关键字,使变量于多个线程之间可见
    
       synchronized public void backupA(){ 
      //synchronized 关键字,避免多个线程对同一对象的修改,导致“脏读”
            try {
     /*记住,这里的判断一定要用while 而不是用if,为什么呢?因为存在多个线程,不止备份B数据库
    的线程在等待,可能备份A数据库的线程也在等待,如果用 if 可能会导致同类唤醒同类的情况导致
    线程的“死锁”。*/
                while (flag==false){
                    this.wait();
                }
                System.out.println(Thread.currentThread().getName()+"正在备份 A 数据库!");
                 //模拟备份数据库
                flag=false;
                this.notifyAll();/*唤醒所有等待的线程,当然这里并不会唤醒backupA 的线程,
    原因在于,backupA的线程这个时候又做了一个while判断,导致线程继续在等待了,而只有backupB
    的线程被唤醒了*/
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        synchronized public void backupB(){
            try {
                while (flag==true){
                    this.wait();
                }
                System.out.println(Thread.currentThread().getName()+"正在备份 B 数据库!");
              //模拟备份数据库
                flag=true; 
                this.notifyAll(); 
               } catch (InterruptedException e) { 
                   e.printStackTrace(); 
                } 
                 } 
                    }
    

    TreadBackupA.java

    public class ThreadBackupA extends Thread
    {
        private TaskBackup taskPackup;
    
        public ThreadBackupA(TaskBackup taskPackup)
        {
            this.taskPackup = taskPackup;
        }
    
        @Override
        public void run()
        {
            super.run();
            taskPackup.backupA();
        }
    }
    

    ThreadBackupB.java

    public class ThreadBackupB extends Thread
    {
        private TaskBackup taskPackup;
    
        public ThreadBackupB(TaskBackup taskPackup)
        {
            this.taskPackup = taskPackup;
        }
    
        @Override
        public void run()
        {
            super.run();
            taskPackup.backupB();
        }
    }
    

    Run.java

    public class Run
    {
        public static void main(String[] args)
        {
            TaskBackup taskPackup=new TaskBackup();
            for (int i=0;i<20;i++){
                ThreadBackupA threadBackupA=new ThreadBackupA(taskPackup);
                ThreadBackupB threadBackupB=new ThreadBackupB(taskPackup);
                threadBackupA.start();
                threadBackupB.start();
            }
        }
    }
    

    运行结果

    方法join的使用

    join方法,重载形式如下:

    在A线程中调用B线程的join方法,则B线程抢占CPU资源,执行它run()方法中的任务,A线程被无限期阻塞,直到B线程运行结束(如果调用带参的join方法,则超出时限该进程就会让出CPU),A线程再由阻塞转为就绪状态。可以认为join方法的作用是父线程等待子线程执行完成后再执行,换句话说是将异步执行的线程合并为同步执行的线程。
    方法join具有使线程排队运行的作用,有些类似同步的运行效果。join 与 synchronized 的区别是:join 在内部使用 wait() 方法进行等待,而 sychronized 关键字使用的使“对象监视器”原理进行同步。

    下面是一个例子:
    MyThread.java


    Test.java

    运行结果

    join()方法的内部原理是什么呢? 【Java】Thread类中的join()方法原理

    注意:
    1)如果线程被 join 方法阻塞,如果调用它的 interrupt 方法,会抛出 InterruptedException。
    2)join(long) 和 sleep(long) 的区别在于被 join(long) 阻塞的线程会释放锁,因为 join(long) 内部是通过 wait(long) 实现的,而 sleep(long) 不释放锁。

    类 ThreadLocal 的使用

    示例

    Run.java


    运行结果

    分析

    验证线程变量的隔离性

    Tool.java


    ThreadA.java 和 ThreadB.java

    Run.java

    运行结果

    虽然3个线程共享一个ThreadLocal对象t1,都向t1对象set()数据值,但每个线程还是可以取出自己的值,这是因为ThreadLocal实现了线程间数据的隔离,它为每一个线程都提供了变量的副本,使得每个线程在某一时间访问到的并不是同一个对象,这样就隔离了多个线程对数据的数据共享。

    构造方法

    • ThreadLocal() 创建一个ThreadLocal对象

    常用方法

    • void set(Object value)设置当前线程的线程局部变量的值。
    • public Object get()该方法返回当前线程所对应的线程局部变量。
    • public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。
    • protected Object initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。

    实现原理

    参考博客:ThreadLocal源码分析(JDK8)

    类 InheritableThreadLocal 的使用

    使用类 InheritableThreadLocal 可以在子线程中取得从父线程继承下来的值。

    值继承

    使用类 InheritableThreadLocal 可以让子线程从父线程中取得值。
    InheritableThreadLocalExt.java


    Tools.java

    ThreadA.java


    Run.java

    值继承再修改

    如果在继承的同时还可以对值进行进一步的处理就更好了。
    修改InheritableThreadLocalExt.java


    运行结果

    注意:如果子线程在取得值的同时,父线程将 InheritableThreadLocal 的值进行修改,子线程取得的还是旧值。

    相关文章

      网友评论

          本文标题:Java多线程学习之线程间通信

          本文链接:https://www.haomeiwen.com/subject/nbgxlftx.html