美文网首页
Java中的wait和notify方法

Java中的wait和notify方法

作者: buzzerrookie | 来源:发表于2019-03-03 10:24 被阅读0次

    本文分析Java中Object类的wait和notify方法,深入JVM看一下底层是如何实现的。

    wait和notify方法

    Object类的waitnotify方法用于线程间的同步和互斥,它们在Java层面的定义如下:

    public final void wait(long timeout, int nanos) throws InterruptedException {
        if (timeout < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
        if (nanos < 0 || nanos > 999999) {
            throw new IllegalArgumentException(
                                "nanosecond timeout value out of range");
        }
        if (nanos > 0) {
            timeout++;
        }
        wait(timeout);
    }
    
    public final void wait() throws InterruptedException {
        wait(0);
    }
    
    public final native void wait(long timeout) throws InterruptedException;
    
    public final native void notify();
    public final native void notifyAll();
    

    可以看到最后调用的都是native方法,这些native方法都是由Object类的registerNatives静态方法在静态代码块注册的。registerNatives方法对应的JNI方法可在OpenJDK的源码jdk/src/share/native/java/lang/Object.c中找到定义。该方法调用RegisterNatives函数向JVM注册了其他的JNI方法如hashCode、wait、notify和notifyAll等。

    static JNINativeMethod methods[] = {
        {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
        {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
        {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
        {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
        {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
    };
    
    JNIEXPORT void JNICALL
    Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
    {
        (*env)->RegisterNatives(env, cls,
                                methods, sizeof(methods)/sizeof(methods[0]));
    }
    

    Hotspot中的实现

    wait方法

    wait方法在Hotspot中的实现是文件openjdk8/hotspot/src/share/vm/prims/jvm.cpp中的JVM_MonitorWait函数,预处理后的代码如下所示:

    extern "C" {
        void JNICALL JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms) {
            JavaThread* thread=JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread);
            HandleMarkCleaner __hm(thread);
            Thread* __the_thread__ = thread;
            os::verify_stack_alignment();
            Handle obj(__the_thread__, JNIHandles::resolve_non_null(handle));
            JavaThreadInObjectWaitState jtiows(thread, ms != 0);
            if (JvmtiExport::should_post_monitor_wait()) {
                JvmtiExport::post_monitor_wait((JavaThread *)__the_thread__, (oop)obj(), ms);
            }
            ObjectSynchronizer::wait(obj, ms, __the_thread__);
            if ((((ThreadShadow*)__the_thread__)->has_pending_exception())) return ; (void)(0);
        }
    }
    

    重点在ObjectSynchronizer类的wait静态函数,预处理后的代码如下:

    // NOTE: must use heavy weight monitor to handle wait()
    void ObjectSynchronizer::wait(Handle obj, jlong millis, Thread* __the_thread__) {
      if (UseBiasedLocking) {
        BiasedLocking::revoke_and_rebias(obj, false, __the_thread__);
      }
      if (millis < 0) {
        THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
      }
      ObjectMonitor* monitor = ObjectSynchronizer::inflate(__the_thread__, obj());
      monitor->wait(millis, true, __the_thread__);
      dtrace_waited_probe(monitor, obj, __the_thread__);
    }
    
    • 注释内容说明一旦使用了wait,那么就必须使用重量级的监视器;
    • 如果启用了偏向锁,首先需要撤销偏向;
    • 然后利用ObjectSynchronizer::inflate函数取得obj对象的监视器指针,在监视器上调用wait成员函数。

    notify方法

    notify方法在Hotspot中的实现是文件openjdk8/hotspot/src/share/vm/prims/jvm.cpp中的JVM_MonitorNotify函数,预处理后的代码如下所示:

    extern "C" {
        void JNICALL JVM_MonitorNotify(JNIEnv* env, jobject handle) {
            JavaThread* thread=JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread);
            HandleMarkCleaner __hm(thread);
            Thread* __the_thread__ = thread;
            os::verify_stack_alignment();
            Handle obj(__the_thread__, JNIHandles::resolve_non_null(handle));
            ObjectSynchronizer::notify(obj, __the_thread__);
            if ((((ThreadShadow*)__the_thread__)->has_pending_exception())) return ; (void)(0);
        }
    }
    

    重点在ObjectSynchronizer类的notify静态函数,预处理后的代码如下:

    void ObjectSynchronizer::notify(Handle obj, Thread* __the_thread__) {
     if (UseBiasedLocking) {
        BiasedLocking::revoke_and_rebias(obj, false, __the_thread__);
        assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
      }
    
      markOop mark = obj->mark();
      if (mark->has_locker() && __the_thread__->is_lock_owned((address)mark->locker())) {
        return;
      }
      ObjectSynchronizer::inflate(__the_thread__, obj())->notify(__the_thread__);
    }
    
    • 如果启用了偏向锁,首先需要撤销偏向;
    • 如果obj对象已经被轻量级锁锁定且锁记录在参数线程的方法栈内,则意味着该对象没有与之关联的监视器,也就是说没有竞争的线程,因此直接返回即可;
    • 有竞争的情况下利用ObjectSynchronizer::inflate函数取得obj对象的监视器指针,在监视器上调用notify成员函数。

    notifyAll方法

    notifyAll方法在Hotspot中的实现是文件openjdk8/hotspot/src/share/vm/prims/jvm.cpp中的JVM_MonitorNotifyAll函数,与JVM_MonitorNotify函数相似,在此不再赘述。

    ObjectMonitor类

    前面的文章分析了ObjectMonitor类的实现以及enter和exit函数,本节分析wait、notify和notifyAll函数。

    wait函数

    wait函数代码如下:

    void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
       Thread * const Self = THREAD ;
       assert(Self->is_Java_thread(), "Must be Java thread!");
       JavaThread *jt = (JavaThread *)THREAD;
    
       DeferredInitialize () ;
    
       // Throw IMSX or IEX.
       CHECK_OWNER(); // 一个宏,检查参数线程是否确实持有该监视器,如果不持有则抛出IllegalMonitorStateException;
       // 如果持有则将_owner保存锁记录指针的情况转为_owner保存线程指针,同时令_recursions = 0,OwnerIsThread = 1
       EventJavaMonitorWait event;
    
       // check for a pending interrupt
       if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { // 已经被中断时抛出InterruptedException
         // post monitor waited event.  Note that this is past-tense, we are done waiting.
         if (JvmtiExport::should_post_monitor_waited()) {
            // Note: 'false' parameter is passed here because the
            // wait was not timed out due to thread interrupt.
            JvmtiExport::post_monitor_waited(jt, this, false);
    
            // In this short circuit of the monitor wait protocol, the
            // current thread never drops ownership of the monitor and
            // never gets added to the wait queue so the current thread
            // cannot be made the successor. This means that the
            // JVMTI_EVENT_MONITOR_WAITED event handler cannot accidentally
            // consume an unpark() meant for the ParkEvent associated with
            // this ObjectMonitor.
         }
         if (event.should_commit()) {
           post_monitor_wait_event(&event, 0, millis, false);
         }
         TEVENT (Wait - Throw IEX) ;
         THROW(vmSymbols::java_lang_InterruptedException());
         return ;
       }
    
       TEVENT (Wait) ;
    
       assert (Self->_Stalled == 0, "invariant") ;
       Self->_Stalled = intptr_t(this) ;
       jt->set_current_waiting_monitor(this); // 在JavaThread中保存该监视器指针
    
       // create a node to be put into the queue
       // Critically, after we reset() the event but prior to park(), we must check
       // for a pending interrupt.
       ObjectWaiter node(Self); // 将参数线程包装成ObjectWaiter
       node.TState = ObjectWaiter::TS_WAIT ; // 需要放入WaitSet链表
       Self->_ParkEvent->reset() ;
       OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag
    
       // Enter the waiting queue, which is a circular doubly linked list in this case
       // but it could be a priority queue or any data structure.
       // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
       // by the the owner of the monitor *except* in the case where park()
       // returns because of a timeout of interrupt.  Contention is exceptionally rare
       // so we use a simple spin-lock instead of a heavier-weight blocking lock.
    
       Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
       AddWaiter (&node) ; // 将参数线程放入WaitSet链表末尾
       Thread::SpinRelease (&_WaitSetLock) ;
    
       if ((SyncFlags & 4) == 0) {
          _Responsible = NULL ;
       }
       intptr_t save = _recursions; // record the old recursion count
       _waiters++;                  // increment the number of waiters
       _recursions = 0;             // set the recursion level to be 1
       exit (true, Self) ;                    // exit the monitor 记住参数线程之前是持有该监视器的,调用wait方法会暂时放弃监视器
       guarantee (_owner != Self, "invariant") ;
    
       // The thread is on the WaitSet list - now park() it.
       // On MP systems it's conceivable that a brief spin before we park
       // could be profitable.
       //
       // TODO-FIXME: change the following logic to a loop of the form
       //   while (!timeout && !interrupted && _notified == 0) park()
    
       int ret = OS_OK ;
       int WasNotified = 0 ;
       { // State transition wrappers
         OSThread* osthread = Self->osthread();
         OSThreadWaitState osts(osthread, true);
         {
           ThreadBlockInVM tbivm(jt);
           // Thread is in thread_blocked state and oop access is unsafe.
           jt->set_suspend_equivalent();
    
           if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
               // Intentionally empty
           } else
           if (node._notified == 0) { // park参数线程,即之前持有监视器的线程
             if (millis <= 0) {
                Self->_ParkEvent->park () ;
             } else {
                ret = Self->_ParkEvent->park (millis) ;
             }
           }
    
           // were we externally suspended while we were waiting?
           if (ExitSuspendEquivalent (jt)) {
              // TODO-FIXME: add -- if succ == Self then succ = null.
              jt->java_suspend_self();
           }
    
         } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
    
    
         // Node may be on the WaitSet, the EntryList (or cxq), or in transition
         // from the WaitSet to the EntryList.
         // See if we need to remove Node from the WaitSet.
         // We use double-checked locking to avoid grabbing _WaitSetLock
         // if the thread is not on the wait queue.
         //
         // Note that we don't need a fence before the fetch of TState.
         // In the worst case we'll fetch a old-stale value of TS_WAIT previously
         // written by the is thread. (perhaps the fetch might even be satisfied
         // by a look-aside into the processor's own store buffer, although given
         // the length of the code path between the prior ST and this load that's
         // highly unlikely).  If the following LD fetches a stale TS_WAIT value
         // then we'll acquire the lock and then re-fetch a fresh TState value.
         // That is, we fail toward safety.
         // 之前调用wait方法的线程被其他notify唤醒了,接下来重新竞争监视器,首先将参数线程从WaitSet移除
         if (node.TState == ObjectWaiter::TS_WAIT) {
             Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
             if (node.TState == ObjectWaiter::TS_WAIT) {
                DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
                assert(node._notified == 0, "invariant");
                node.TState = ObjectWaiter::TS_RUN ;
             }
             Thread::SpinRelease (&_WaitSetLock) ;
         }
         // 参数线程肯定不在WaitSet链表了,要么被直接唤醒(见notify函数),要么在EntryList,也可能在cxq
         // The thread is now either on off-list (TS_RUN),
         // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
         // The Node's TState variable is stable from the perspective of this thread.
         // No other threads will asynchronously modify TState.
         guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
         OrderAccess::loadload() ;
         if (_succ == Self) _succ = NULL ;
         WasNotified = node._notified ;
    
         // Reentry phase -- reacquire the monitor.
         // re-enter contended monitor after object.wait().
         // retain OBJECT_WAIT state until re-enter successfully completes
         // Thread state is thread_in_vm and oop access is again safe,
         // although the raw address of the object may have changed.
         // (Don't cache naked oops over safepoints, of course).
    
         // post monitor waited event. Note that this is past-tense, we are done waiting.
         if (JvmtiExport::should_post_monitor_waited()) {
           JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
    
           if (node._notified != 0 && _succ == Self) {
             // In this part of the monitor wait-notify-reenter protocol it
             // is possible (and normal) for another thread to do a fastpath
             // monitor enter-exit while this thread is still trying to get
             // to the reenter portion of the protocol.
             //
             // The ObjectMonitor was notified and the current thread is
             // the successor which also means that an unpark() has already
             // been done. The JVMTI_EVENT_MONITOR_WAITED event handler can
             // consume the unpark() that was done when the successor was
             // set because the same ParkEvent is shared between Java
             // monitors and JVM/TI RawMonitors (for now).
             //
             // We redo the unpark() to ensure forward progress, i.e., we
             // don't want all pending threads hanging (parked) with none
             // entering the unlocked monitor.
             node._event->unpark();
           }
         }
    
         if (event.should_commit()) {
           post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
         }
    
         OrderAccess::fence() ;
    
         assert (Self->_Stalled != 0, "invariant") ;
         Self->_Stalled = 0 ;
    
         assert (_owner != Self, "invariant") ; // 被唤醒后该监视器并不由参数线程持有
         ObjectWaiter::TStates v = node.TState ;
         if (v == ObjectWaiter::TS_RUN) {
             enter (Self) ;
         } else {
             guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
             ReenterI (Self, &node) ;
             node.wait_reenter_end(this);
         }
    
         // Self has reacquired the lock.
         // Lifecycle - the node representing Self must not appear on any queues.
         // Node is about to go out-of-scope, but even if it were immortal we wouldn't
         // want residual elements associated with this thread left on any lists.
         guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
         assert    (_owner == Self, "invariant") ; // 至此经过上面的enter或者ReenterI后,参数线程重新获得了该监视器
         assert    (_succ != Self , "invariant") ;
       } // OSThreadWaitState()
    
       jt->set_current_waiting_monitor(NULL); // 参数线程没有在任何监视器上wait
    
       guarantee (_recursions == 0, "invariant") ;
       _recursions = save;     // restore the old recursion count
       _waiters--;             // decrement the number of waiters
    
       // Verify a few postconditions
       assert (_owner == Self       , "invariant") ;
       assert (_succ  != Self       , "invariant") ;
       assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    
       if (SyncFlags & 32) {
          OrderAccess::fence() ;
       }
       // 如果不是被notify的,检查是否是由中断造成的(中断也会unpark线程),如果是则抛出InterruptedException;否则就是wait超时
       // check if the notification happened
       if (!WasNotified) { // 注意看调用Thread::is_interrupted时第二个参数是true,会清除线程的中断状态
         // no, it could be timeout or Thread.interrupt() or both
         // check for interrupt event, otherwise it is timeout 
         if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
           TEVENT (Wait - throw IEX from epilog) ;
           THROW(vmSymbols::java_lang_InterruptedException());
         }
       }
    
       // NOTE: Spurious wake up will be consider as timeout.
       // Monitor notify has precedence over thread interrupt.
    }
    

    wait函数按以下顺序做了几件事:

    1. 检查参数线程是否确实持有该监视器,如果不持有则抛出IllegalMonitorStateException;
    2. 检查是否已经被中断,如果是则抛出InterruptedException;
    3. 参数线程把自己放入_WaitSet链表末尾,然后调用ObjectMonitor::exit函数放弃该监视器,最后使用ParkEvent的park函数阻塞自己;
    4. 参数线程被其他notify唤醒之后重新竞争监视器,它先把自己从WaitSet移除,这时它可能在EntryList,也可能在cxq,还可能两者均不在。经过enter或者ReenterI后,参数线程重新获得了该监视器;
    5. 最后判断如果不是被notify的,检查是否是由中断造成的(中断也会unpark线程),如果是则抛出InterruptedException,同时清除线程的中断状态;否则就是wait超时。

    notify函数

    notifyAll函数与notify函数相似,只看一下notify函数是如何实现的:

    void ObjectMonitor::notify(TRAPS) {
      CHECK_OWNER(); // 与wait函数一样的检查_owner
      if (_WaitSet == NULL) {
         TEVENT (Empty-Notify) ;
         return ;
      }
      DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
    
      int Policy = Knob_MoveNotifyee ; // Knob_MoveNotifyee全局变量,notify策略
    
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
      ObjectWaiter * iterator = DequeueWaiter() ; // 将WaitSet中的第一个线程出队
      if (iterator != NULL) {
         TEVENT (Notify1 - Transfer) ;
         guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; // 原来一定是在WaitSet中
         guarantee (iterator->_notified == 0, "invariant") ;
         if (Policy != 4) {
            iterator->TState = ObjectWaiter::TS_ENTER ;
         }
         iterator->_notified = 1 ; // 告诉被唤醒的线程是被notify唤醒的,因为wait函数末尾还会有判断是不是中断唤醒的
         Thread * Self = THREAD;
         iterator->_notifier_tid = Self->osthread()->thread_id(); // 告诉被唤醒的线程是谁唤醒的,内核线程ID
    
         ObjectWaiter * List = _EntryList ;
         if (List != NULL) {
            assert (List->_prev == NULL, "invariant") ;
            assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            assert (List != iterator, "invariant") ;
         }
    
         if (Policy == 0) {       // prepend to EntryList 如果Knob_MoveNotifyee是0,将WaitSet的第一个链接到EntyrList前面
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         } else
         if (Policy == 1) {      // append to EntryList 如果Knob_MoveNotifyee是1,将WaitSet的第一个链接到EntyrList后面
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                // CONSIDER:  finding the tail currently requires a linear-time walk of
                // the EntryList.  We can make tail access constant-time by converting to
                // a CDLL instead of using our current DLL.
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } else
         if (Policy == 2) {      // prepend to cxq 如果Knob_MoveNotifyee是2,将WaitSet的第一个链接到cxq前面
             // prepend to cxq
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) {
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
             }
         } else
         if (Policy == 3) {      // append to cxq 如果Knob_MoveNotifyee是3,将WaitSet的第一个链接到cxq后面
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         } else { // 其他情况,直接唤醒WaitSet的第一个线程使其运行,不加入EntryList或cxq
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;
         }
    
         if (Policy < 4) {
           iterator->wait_reenter_begin(this);
         }
    
         // _WaitSetLock protects the wait queue, not the EntryList.  We could
         // move the add-to-EntryList operation, above, outside the critical section
         // protected by _WaitSetLock.  In practice that's not useful.  With the
         // exception of  wait() timeouts and interrupts the monitor owner
         // is the only thread that grabs _WaitSetLock.  There's almost no contention
         // on _WaitSetLock so it's not profitable to reduce the length of the
         // critical section.
      }
    
      Thread::SpinRelease (&_WaitSetLock) ;
    
      if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
         ObjectMonitor::_sync_Notifications->inc() ;
      }
    }
    
    1. 执行与wait函数一样的检查_owner等工作;
    2. 将WaitSet中的第一个线程出队,根据Knob_MoveNotifyee的不同值做不同的处理:
      • Knob_MoveNotifyee是0时,将WaitSet的第一个线程链接到EntyrList前面;
      • Knob_MoveNotifyee是1时,将WaitSet的第一个线程链接到EntryList后面;
      • Knob_MoveNotifyee是2时,将WaitSet的第一个线程链接到cxq前面;
      • Knob_MoveNotifyee是3时,将WaitSet的第一个线程链接到cxq后面;
      • 其他情况,直接唤醒WaitSet的第一个线程使其运行,不加入EntryList或cxq。

    监视器原理图

    目前,较为常见的监视器原理图如下,我暂时没有找到这张图的原始出处。


    monitor.png

    经过这一系列文章的分析,上图Entry Set和Wait Set的含义逐渐清晰,但不知为何没有cxq。

    • Wait Set:线程调用wait方法后即进入Wait Set,若后续被唤醒后没有获取到监视器则进入Entry Set;
    • Entry Set:竞争监视器失败的线程都在Entry Set阻塞。

    相关文章

      网友评论

          本文标题:Java中的wait和notify方法

          本文链接:https://www.haomeiwen.com/subject/wwbnsqtx.html