美文网首页
object.wait()和notify()

object.wait()和notify()

作者: 夜月行者 | 来源:发表于2017-11-28 10:01 被阅读0次

    1,简介

    wait和notify是object的方法,也就是说所有对象都有这两个方法,这两个方法可以用来阻塞当前线程(同时放弃互斥锁)或者是 唤醒其他调用wait方法陷入阻塞的线程(不能唤醒那些因为抢占锁而阻塞的队列)

    代码样例

    
    public class NotifyAndWaitT {
    
       public static void main(String[] args) throws InterruptedException {
    
    
           Thread waitThread = new Thread(() -> {
    
               synchronized (NotifyAndWaitT.class) {
                   System.out.println("wait get the lock");
                   try {
                       NotifyAndWaitT.class.wait();
                       System.out.println("wait wake up");
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           });
    
           Thread notifyThread = new Thread(() -> {
               try {
                   Thread.currentThread().sleep(100);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
    
               synchronized (NotifyAndWaitT.class) {
                   System.out.println("notify get the lock");
                   try {
                       NotifyAndWaitT.class.notify();
                       Thread.currentThread().sleep(1000);
                       System.out.println("notify end---------");
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           });
    
           waitThread.start();
           notifyThread.start();
           waitThread.join();
           notifyThread.join();
           System.out.println("main end*******************");
       }
    
    
    }
    
    

    2. wait 运行过程

    能够执行wait和notify的前提是代码已经进入了synchronized包含的代码块中。

    当然synchronized现在通过优化,增加了偏向锁,轻量级锁,但是在执行obj.wait()的时候都会膨胀成重量级锁ObjectMonitor (关于每个锁的关联数据可以看看),然后做一系列的操作。

    具体看一下执行过程:

    参考 object.wait的源码解析.md 可以看到代码执行的逻辑,下面会具体分析一下

    首先是每个了解当前线程持有的锁(objectMonitor)拥有的数据结构, objectMonitor在jdk8中的数据结构.md

    在这里面可以看到在objectMonitor中有一个地方存储的是 _WaitSet ,这个存放的就是因为obj.wait()进入阻塞的线程。

    3. 具体从代码看 wait() 操作

    看objectMonitor.cpp文件中的 ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS)

    
    // will need to be replicated in complete_exit above    
    
    void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {    
    
      Thread * const Self = THREAD ;    
    
      assert(Self->is_Java_thread(), "Must be Java thread!");    
    
      JavaThread *jt = (JavaThread *)THREAD;    
    
    
    
      DeferredInitialize () ;    
    
    
    
      // Throw IMSX or IEX.    
    
      CHECK_OWNER();    
    
    
    
      // check for a pending interrupt    检查是否有中断信号,有的话就抛出异常
    
      if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {    
    
        // post monitor waited event.  Note that this is past-tense, we are done waiting.    
    
        if (JvmtiExport::should_post_monitor_waited()) {    
    
           // Note: 'false' parameter is passed here because the    
    
           // wait was not timed out due to thread interrupt.    
    
           JvmtiExport::post_monitor_waited(jt, this, false);    
    
        }    
    
        TEVENT (Wait - Throw IEX) ;    
    
        THROW(vmSymbols::java_lang_InterruptedException());    
    
        return ;    
    
      }    
    
      TEVENT (Wait) ;    
    
    
    
      assert (Self->_Stalled == 0, "invariant") ;    
    
      Self->_Stalled = intptr_t(this) ;    
    
      jt->set_current_waiting_monitor(this);    
    
    
    
      // create a node to be put into the queue    
    
      // Critically, after we reset() the event but prior to park(), we must check    
    
      // for a pending interrupt.    
    
    //把当前线程包装成一个 objectwaiter对象,
    
      ObjectWaiter node(Self);    
    
      node.TState = ObjectWaiter::TS_WAIT ;    
    
      Self->_ParkEvent->reset() ;    
    
      OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag    
    
    
    
      // Enter the waiting queue, which is a circular doubly linked list in this case    
    
      // but it could be a priority queue or any data structure.    
    
      // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only    
    
      // by the the owner of the monitor *except* in the case where park()    
    
      // returns because of a timeout of interrupt.  Contention is exceptionally rare    
    
      // so we use a simple spin-lock instead of a heavier-weight blocking lock.    
    
    //加锁,把objectwaiter 对象添加到等待的set中
    
    
    
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;    
    
      AddWaiter (&node) ;    
    
      Thread::SpinRelease (&_WaitSetLock) ;    
    
    
    
      if ((SyncFlags & 4) == 0) {    
    
         _Responsible = NULL ;    
    
      }    
    
      intptr_t save = _recursions; // record the old recursion count    
    
      _waiters++;                  // increment the number of waiters    
    
      _recursions = 0;             // set the recursion level to be 1    
    
      exit (Self) ;                    // exit the monitor    
    
      guarantee (_owner != Self, "invariant") ;    
    
    
    
      // As soon as the ObjectMonitor's ownership is dropped in the exit()    
    
      // call above, another thread can enter() the ObjectMonitor, do the    
    
      // notify(), and exit() the ObjectMonitor. If the other thread's    
    
      // exit() call chooses this thread as the successor and the unpark()    
    
      // call happens to occur while this thread is posting a    
    
      // MONITOR_CONTENDED_EXIT event, then we run the risk of the event    
    
      // handler using RawMonitors and consuming the unpark().    
    
      //    
    
      // To avoid the problem, we re-post the event. This does no harm    
    
      // even if the original unpark() was not consumed because we are the    
    
      // chosen successor for this monitor.    
    
    //在这里防止的情况是当前线程刚刚执行完monitor的释放,
    
    //有一个线程进来了,执行完后退出了,又唤醒了当前线程,
    
    //把当前线程从等待队列中给拿出来了,
    
    //但是当前线程实际上还是没有执行park操作
    
    //这样的话再执行下面的park操作的话,
    
    //阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了
    
    // 那么就对当前线程再执行一次unpark,在下面执行park的时候就直接略过了,不会出现问题
    
      if (node._notified != 0 && _succ == Self) {    
    
         node._event->unpark();    
    
      }    
    
    
    
      // The thread is on the WaitSet list - now park() it.    
    
      // On MP systems it's conceivable that a brief spin before we park    
    
      // could be profitable.    
    
      //    
    
      // TODO-FIXME: change the following logic to a loop of the form    
    
      //   while (!timeout && !interrupted && _notified == 0) park()    
    
    
    
      int ret = OS_OK ;    
    
      int WasNotified = 0 ;    
    
      { // State transition wrappers    
    
        OSThread* osthread = Self->osthread();    
    
        OSThreadWaitState osts(osthread, true);    
    
        {    
    
          ThreadBlockInVM tbivm(jt);    
    
          // Thread is in thread_blocked state and oop access is unsafe.    
    
          jt->set_suspend_equivalent();    
    
    //执行park操作,把当前线程挂起
    
    
    
          if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {    
    
              // Intentionally empty    
    
          } else    
    
          if (node._notified == 0) {    
    
            if (millis <= 0) {    
    
               Self->_ParkEvent->park () ;    
    
            } else {    
    
               ret = Self->_ParkEvent->park (millis) ;    
    
            }    
    
          }    
    
    
    
          // were we externally suspended while we were waiting?    
    
          if (ExitSuspendEquivalent (jt)) {    
    
             // TODO-FIXME: add -- if succ == Self then succ = null.    
    
             jt->java_suspend_self();    
    
          }    
    
    
    
        } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm    
    
    
    
    
    
        // Node may be on the WaitSet, the EntryList (or cxq), or in transition    
    
        // from the WaitSet to the EntryList.    
    
        // See if we need to remove Node from the WaitSet.    
    
        // We use double-checked locking to avoid grabbing _WaitSetLock    
    
        // if the thread is not on the wait queue.    
    
        //    
    
        // Note that we don't need a fence before the fetch of TState.    
    
        // In the worst case we'll fetch a old-stale value of TS_WAIT previously    
    
        // written by the is thread. (perhaps the fetch might even be satisfied    
    
        // by a look-aside into the processor's own store buffer, although given    
    
        // the length of the code path between the prior ST and this load that's    
    
        // highly unlikely).  If the following LD fetches a stale TS_WAIT value    
    
        // then we'll acquire the lock and then re-fetch a fresh TState value.    
    
        // That is, we fail toward safety.    
    
    
    
        if (node.TState == ObjectWaiter::TS_WAIT) {    
    
            Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;    
    
            if (node.TState == ObjectWaiter::TS_WAIT) {    
    
               DequeueSpecificWaiter (&node) ;       // unlink from WaitSet    
    
               assert(node._notified == 0, "invariant");    
    
               node.TState = ObjectWaiter::TS_RUN ;    
    
            }    
    
            Thread::SpinRelease (&_WaitSetLock) ;    
    
        }    
    
    
    
        // The thread is now either on off-list (TS_RUN),    
    
        // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).    
    
        // The Node's TState variable is stable from the perspective of this thread.    
    
        // No other threads will asynchronously modify TState.    
    
        guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;    
    
        OrderAccess::loadload() ;    
    
        if (_succ == Self) _succ = NULL ;    
    
        WasNotified = node._notified ;    
    
    
    
     
    
    
    
        // post monitor waited event. Note that this is past-tense, we are done waiting.    
    
        if (JvmtiExport::should_post_monitor_waited()) {    
    
          JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);    
    
        }    
    
        OrderAccess::fence() ;    
    
    
    
        assert (Self->_Stalled != 0, "invariant") ;    
    
        Self->_Stalled = 0 ;    
    
    
    
        assert (_owner != Self, "invariant") ;    
    
        ObjectWaiter::TStates v = node.TState ;    
    
        if (v == ObjectWaiter::TS_RUN) {    
    
            enter (Self) ;    
    
        } else {    
    
            guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;    
    
            ReenterI (Self, &node) ;    
    
            node.wait_reenter_end(this);    
    
        }    
    
        // want residual elements associated with this thread left on any lists.    
    
        guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;    
    
        assert    (_owner == Self, "invariant") ;    
    
        assert    (_succ != Self , "invariant") ;    
    
      } // OSThreadWaitState()    
    
    
    
      jt->set_current_waiting_monitor(NULL);    
    
    
    
      guarantee (_recursions == 0, "invariant") ;    
    
      _recursions = save;     // restore the old recursion count    
    
      _waiters--;             // decrement the number of waiters    
    
    
    
      // Verify a few postconditions    
    
      assert (_owner == Self       , "invariant") ;    
    
      assert (_succ  != Self       , "invariant") ;    
    
      assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;    
    
    
    
      if (SyncFlags & 32) {    
    
         OrderAccess::fence() ;    
    
      }    
    
    
    
      // check if the notification happened    
    
    //如果线程不是通过notify唤醒的,那就只能是通过 interrupt唤醒的,就抛出异常。
    
    //这也是object.wait()方法可以抛出异常的原因
    
      if (!WasNotified) {    
    
        // no, it could be timeout or Thread.interrupt() or both    
    
        // check for interrupt event, otherwise it is timeout    
    
        if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {    
    
          TEVENT (Wait - throw IEX from epilog) ;    
    
          THROW(vmSymbols::java_lang_InterruptedException());    
    
        }    
    
      }    
    
    }    
    
    
    
    
    
    
    
    
    
    

    1.检查是否有中断,如果有中断的话就抛出中断(返回)

    2.把当前线程包装成一个 objectWaiter 对象

    3.通过自旋锁把 objectWaiter 添加到 ObjectMonitor 的 _WaitSet 当中

    4.释放 objectMonitor 对象

    5.如果当前线程处于被 notifyed 状态(针对这种情况出现的可能是下面的解释),对当前线程执行一次 unpark()操作

    
    //在这里防止的情况是当前线程刚刚执行完monitor的释放,
    
    //有一个线程进来了,执行完后退出了,又唤醒了当前线程,
    
    //把当前线程从等待队列中给拿出来了,
    
    //但是当前线程实际上还是没有执行park操作
    
    //这样的话再执行下面的park操作的话,
    
    //阻塞了也没有机会唤醒了,不在那个等待条件的队列里面了
    
    // 那么就对当前线程再执行一次unpark,在下面 第 6 步  执行 park的时候就直接略过了,不会阻塞,也就不会出现问题
    
    

    6.执行park操作把当前线程阻塞,调用的是 操作系统的wait 操作的 api

      6.1. 如果出现5的情况则当前线程并不会阻塞,而是直接走过去

    7.如果线程醒来:有两种情况

      7.1. 其他线程执行了notify()操作,对应底层有unpark()操作,则该线程恢复运行

      7.2. 其他线程执行了interrupt()也会对当前线程有unpark()操作,该线程也会恢复运行

    8.醒来好检查interrupt 标志位是否标示有异常,如果有,则会抛出 interruptedException 异常。

    最后一条是线程在执行 obj.wait()有抛出interruptException异常的原因。

    4,notify的原理

    notify()操作将处于waitSet中的一个节点移动到

    EntryList当中,也就是移动到对锁进行竞争而产生阻塞的队列当中。再等待其他锁的唤醒就ok了。

    当然notify有一些唤醒策略。

    相关文章

      网友评论

          本文标题:object.wait()和notify()

          本文链接:https://www.haomeiwen.com/subject/wirsbxtx.html