美文网首页
线程间通信之等待通知机制

线程间通信之等待通知机制

作者: Casin | 来源:发表于2018-09-16 15:32 被阅读0次

    等待/通知机制

    前面部分介绍了Java语言中多线程的使用,以及方法及变量在同步情况下的处理方式,本节将介绍多个线程之间进行通信,通过本节的学习可以了解到,线程与线程之间不是独立的个体,他们彼此之间可以互相通信和协作

    不使用等待/通知机制实现线程间通信

    创建项目,在试验中使用sleep()结合while(true)死循环法来实现多个线程间通信

    代码为

    import java.util.ArrayList;
    import java.util.List;
    class MyList{
        private List list = new ArrayList();
        public void add(){
            list.add("秦加兴");
        }
        public int size(){
            return list.size();
        }
    }
    //定义线程类ThreadA以及ThreadB
    class ThreadA extends Thread{
        private MyList list;
    
        public ThreadA(MyList list) {
            super();
            this.list = list;
        }
        @Override
        public void run() {
            try {
                for(int i=0;i<10;i++){
                    list.add();
                    System.out.println("添加了 "+(i+1)+" 个元素");
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    class ThreadB extends Thread{
        private MyList list;
    
        public ThreadB(MyList list) {
            super();
            this.list = list;
        }
        @Override
        public void run() {
            try {
                System.out.println("-------"+list.size());
                while(true){
                    if(list.size()==5){
                        System.out.println("==5了,线程b要退出了!");
                        throw new InterruptedException();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }   
        }
    }
    public class Test1 {
        public static void main(String[] args) {
            MyList service = new MyList();
            ThreadA a = new ThreadA(service);
            a.setName("A");
            a.start();
            ThreadB b = new ThreadB(service);
            b.setName("B");
            b.start();
        }
    }
    

    程序云运行后出现!](https://img.haomeiwen.com/i12188537/aec0ea58f7df4bd0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    解释:虽然两个线程间实现了通信,但有一个弊端就是,线程ThreadB.java不停的通过while语句轮询机制来检测某一个条件会很浪费CPU资源

    什么是等待/通知机制

    等待/通知机制在生活中比比皆是,比如在就餐时就会出现如图所示: 就餐时出现等待通知

    厨师和服务员之间的交互要在“菜品传递台”上,在这期间会有几个问题

    1. 厨师做完一道菜的时间不确定,所以厨师将菜品放在"菜品传递台"上的时间也不确定。
    2. 服务员取到菜的时间取决于厨师,所以服务员就有“等待”(wait)的状态。
    3. 服务员如何能取到菜呢?这又得取决于厨师。厨师将菜放在“菜品传递台”上,其实就相当于一种通知(notify),这是服务员才可以拿到菜并交给就餐者。
    4. 在这个过程中出现了“等待/通知”机制。

    需要说明一下,前面章节中多个线程之间也可以实现通信,原因就是多个线程共同访问同一个变量,但那种通信机制不是“等待/通知”,两个线程完全是主动式地读取一个共享变量,在花费读取时间的基础上,读到的值是不是想要的,并不能完全确定。所以现在迫切想要一种“等待/通知”机制来满足上面的需求。

    等待/通知机制的实现

    方法wait()的作用

    当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,知道街道通知或被中断为止。在调用wait()之前,线程必须获得该对象级别锁,即只能在同步方法或同步代码块中调用wait()方法。 在执行wait()方法后,当前线程释放锁。 在从wait()返回前,线程与其他线程竞争重新获得锁。如果调用wait()时没有持有适当的锁,则抛出IllegalMonitorStateException ,它是RuntimeException 的一个子类,因此,不需要try-catch 语句进行捕获异常。

    方法notify作用

    也要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用notify()时没有持有适当的锁,也会抛出IllegalMonitorStateException 。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选其中一个呈wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。需要说明的是,在执行notify()方法后,当前线程不会马上释放该对象锁, 呈wait状态的线程并不能马上获取该对象锁,要等待执行notify()方法的线程将程序执行完,也就是推出synchronized代码后,当前线程才会释放锁,而成wait状态所在的线程才可以获取该对象锁。当第一个获得了该对象锁的wait线程运行完毕之后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于咩有得到该对象的通知,还会继续阻塞在wait状态,知道这个对象发出一个notify或notifyAll。

    • 用一句话总结一下wait和notify

    wait使线程停止运行,而notify使停止的线程继续运行。

    wait()和notify()的简单使用

    package three;
    /**
     *输出: 
    开始   wait time=1493298291380
    开始 notify time=1493298294382
    结束 notify time=1493298294383
    结束   wait  time=1493298294384
     * @author jiaxing
     *
     */
    //定义两个定义线程
    class MyThread1 extends Thread{
        private Object lock;
    
        public MyThread1(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            //在线程块内执行wait()方法
            try {
                synchronized (lock) {
                    System.out.println("开始   wait time="+System.currentTimeMillis());
                    lock.wait();
                    System.out.println("结束   wait  time="+System.currentTimeMillis());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    class MyThread2 extends Thread{
        private Object lock;
    
        public MyThread2(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            //在线程块内执行notify()方法
            synchronized (lock) {
                System.out.println("开始 notify time="+System.currentTimeMillis());
                lock.notify();
                System.out.println("结束 notify time="+System.currentTimeMillis());
            }
        }
    }
    public class Test2 {
        public static void main(String[] args) {
            try {
                Object lock = new Object();
                //启动两个线程
                MyThread1 t1 = new MyThread1(lock);
                t1.start();
                Thread.sleep(3000);
                MyThread2 t2 = new MyThread2(lock);
                t2.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    从控制台中可以看出3秒后线程被notify通知唤醒

    如何使用wait()和notify()来实现前面size()等于5呢

    • 看下面例子:
    package three;
    
    import java.util.ArrayList;
    import java.util.List;
    /***
     * 输出:
    wait begin 1493298319090
    添加了1个元素
    添加了2个元素
    添加了3个元素
    添加了4个元素
    已发出通知!
    添加了5个元素
    添加了6个元素
    添加了7个元素
    添加了8个元素
    添加了9个元素
    添加了10个元素
    wait end 1493298329143
     *解释:日志信息中wait end 在最后输出,这也说明notify()方法执行后并不立刻释放锁。这个知识点在后面进行补充介绍。
     * @author jiaxing
     *
     */
    class MyList3{
        private static List list = new ArrayList(); 
        public static void add(){
            list.add("anything");
        }
        public static int size(){
            return list.size();
        }
    }
    class ThreadA3 extends Thread{
        private Object lock;
        
        public ThreadA3(Object lock) {
            super();
            this.lock = lock;
        }
    
        @Override
        public void run() {
            super.run();
            try {
                synchronized(lock){
                    if(MyList3.size()!=5){
                        System.out.println("wait begin "+System.currentTimeMillis());
                        lock.wait();
                        System.out.println("wait end "+System.currentTimeMillis());
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    class ThreadB3 extends Thread{
        private Object lock;
    
        public ThreadB3(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            try {
                synchronized (lock) {
                    for(int i=0;i<10;i++){
                        MyList3.add();
                        if(MyList3.size()==5){
                            lock.notify();
                            System.out.println("已发出通知!");
                        }
                        System.out.println("添加了"+(i+1)+"个元素");
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class Test3 {
        public static void main(String[] args) {
            try {
                Object lock = new Object();
                ThreadA3 a = new ThreadA3(lock);
                a.start();
                Thread.sleep(50);
                ThreadB3 b = new ThreadB3(lock);
                b.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    • 关键字synchronized 可以将任何一个Obejct 对象作为同步对象看待,而java为每个Object 都实现了wait()notify() 方法,他们必须在被synchronized 同步的Obejct 的临界区内。通过调用wait() 方法可以使处于临界内的线程进入等待状态,同时释放被同步对象的锁。而notify操作可以唤醒一个因调用了wait 操作而处于阻塞状态中的线程,使其进入就绪状态。被重新唤醒的线程会视图重新获得临界区的控制权,也就是锁,并继续执行临界区内wait 之后的代码。如果发出notify 操作时没有处于阻塞状态中的线程,那么该命令会被忽略。

    • wait() 方法可以调用该方法的线程释放共享资源的锁,然后从运行状态推出,进入等待队列,知道被再次唤醒。

    • notify() 方法可以随机唤醒等待队列中等待同一共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态,也就是notify() 方法仅通知“一个”线程。

    • notifyAll() 方法可以使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能是随机执行,因为这要取决于JVM虚拟机的实现。

    前面的章节中已经介绍了与Thread有关的大部分API,这些API可以改变线程对象的状态,如图3-7所示 线程状态切换示意图
    1. 新创建一个新的线程对象后,再调用它的start() 方法,系统会为此线程分配CPU 资
      源,使其处于Runnable(可运行) 状态,这是一个准备运行的阶段。如果线程抢占到CPU 资
      源,此线程就处于Running(运行) 状态。

    2. Runnable 状态和Running 状态可相互切换,因为有可能线程运行一段时间后,有其
      他高优先级的线程抢占了CPU 资源,这时此线程就从Running 状态变成Runnable 状态。

    线程进入Runnable状态大致分为如下5中情况:

    • 调用sleep() 方法后经过的时间超过了指定的休眠时间。
    • 线程调用的阻塞IO 已经返回,阻塞方法执行完毕。
    • 线程成功地获得了试图同步的监视器。
    • 线程正在等待某个通知,其他线程发出了通知。
    • 处于挂起状态的线程调用了resume 恢复方法。
    1. Blocked 是阻塞的意思,例如遇到了一个IO 操作,此时CPU 处于空闲状态,可能会
      转而把CPU 时间片分配给其他线程,这时也可以称为"暂停"状态。Blocked 状态结束后‘
      进入Runnable 状态, 等待系统重新分配资掘。

    出现阻塞的情况大体分为如下5种:

    • 线程调用sleep 方法, 主动放弃占用的处理器资源。
    • 线程调用了阻塞式IO 方法,在该方法返回前,该线程被阻塞。
    • 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。
    • 线程等待某个通知。
    • 程序调用了suspend 方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。
    1. run() 方法运行结束后进入销毁阶段, 整个线程执行完毕。

    每个对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

    方法wait()锁释放与notify()锁不释放

    当方法wait()被释放后,锁被自动释放,但执行完notify()方法,锁却不自动释放。

    • 看下面的例子展示:
    package three;
    /***
     * 输出结果:
    begin wait()
    begin wait()
     * @author jiaxing
     *
     */
    class Service{
        public void testMethod(Object lock){
            try {
                synchronized (lock) {
                    System.out.println("begin wait()");
                    //Thread.sleep(4000); 同步效果
                    lock.wait();
                    System.out.println("   end wait()");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    class ThreadA4 extends Thread{
        private Object lock;
    
        public ThreadA4(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            //实例化对象
            Service service = new Service();
            service.testMethod(lock);
        }
    }
    class ThreadB4 extends Thread{
        private Object lock;
    
        public ThreadB4(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            Service service = new Service();
            service.testMethod(lock);
        }
    }
    public class Test4 {
        public static void main(String[] args) {
            Object lock = new Object();
            //传入一个对象
            ThreadA4 a = new ThreadA4(lock);
            a.start();
            ThreadB4 b= new ThreadB4(lock);
            b.start();
        }
    }
    
    

    还有一个实验:方法notify()被执行后,不释放锁 ,下面看代码展示:

    package three;
    /***
     * 输出
    begin wait() ThreadName=Thread-0
    begin notify() ThreadName=Thread-2
      end notify() ThreadName=Thread-2
    begin notify() ThreadName=Thread-1
      end notify() ThreadName=Thread-1
      end wait() ThreadName=Thread-0
    解释:
    结果显示:必须执行完notify()方法所在的同步synchronized代码块后才释放锁
     * @author jiaxing
     *
     */
    class Service5{
        //多个通知一个等待
        public void testMethod(Object lock){
            //等待方法
            try {
                synchronized (lock) {
                    System.out.println("begin wait() ThreadName="+Thread.currentThread().getName());
                    lock.wait();
                    System.out.println("  end wait() ThreadName="+Thread.currentThread().getName());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //通知notify方法
        public void synNotifyMethod(Object lock){
            try {
                synchronized (lock) {
                    System.out.println("begin notify() ThreadName="+Thread.currentThread().getName());
                    lock.notify();
                    Thread.sleep(5000);
                    System.out.println("  end notify() ThreadName="+Thread.currentThread().getName());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    //自定义Thread方法
    class ThreadA5 extends Thread{
        private Object lock;
    
        public ThreadA5(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            Service5 service = new Service5();
            service.testMethod(lock);
        }
    }
    //自定义Thread方法调用notify方法
    class NotifyThread extends Thread{
        private Object lock;
    
        public NotifyThread(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            //实例化后调用notify方法
            Service5 service = new Service5();
            service.synNotifyMethod(lock);
        }
    }
    //自定义SynNotifyThread方法调用notify方法
    class SynNotifyThread extends Thread{
        private Object lock;
    
        public SynNotifyThread(Object lock) {
            super();
            this.lock = lock;
        }
        @Override
        public void run() {
            super.run();
            //实例化后调用notify方法
            Service5 service = new Service5();
            service.synNotifyMethod(lock);
        }
    }
    //测试类
    public class Test5 {
        public static void main(String[] args) {
            Object lock = new Object();
            //线程a启动  wait方法
            ThreadA5 a = new ThreadA5(lock);
            a.start();
            //线程b启动 notify方法
            NotifyThread b = new NotifyThread(lock);
            b.start();
            //线程c启动 notify方法
            SynNotifyThread c = new SynNotifyThread(lock);
            c.start();
        }
    }
    
    

    当interrupt方法遇到wait方法

    • 当线程呈wait() 状态时,调用线程对象的interrupt() 方法会出现InterruptedException 异常**。
    • 创建项目后,测试部分代码如下:则当程序运行后,停止wait状态下的线程出现异常
    public void main (String[] args){
        try {
            Obejct lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(5000);
            a.interrupted();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    

    通过上面的几个实验可以总结如下3点:

    1. 执行完同步代码块就会释放对象的锁。
    2. 在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。
    3. 在执行同步代码块的过程中,执行了锁所属对象的wait() 方法,这个线程会释放对象锁,而此线程对象会进入线程等待池中,等待被唤醒。

    只通知一个线程

    • 调用notify() 一次只随机 通知一个线程;
    • 当多次调用notify()方法时,会随机将等待wait 状态的线程进行唤醒。

    唤醒所有线程-->notifyAll()方法

    • 只需将之前的代码中的notify()方法改写成notifyAll()即可。

    方法wait(long)的使用

    • 带一个参数的wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒。举例略。

    通知过早

    • 如果通知过早,则会打乱程序正常的运行逻辑。也即:notify()如果在wait()之前执行时,则会打乱正常的运行逻辑。-->可能会导致wait()不能被执行。

    相关文章

      网友评论

          本文标题:线程间通信之等待通知机制

          本文链接:https://www.haomeiwen.com/subject/pvqfnftx.html