美文网首页
Java线程的中断与休眠

Java线程的中断与休眠

作者: buzzerrookie | 来源:发表于2019-03-03 10:24 被阅读0次

    本文分析Thread类分别用于中断与休眠的interrupt和sleep方法,在深入它们之前先看辅助类PlatformEvent和ParkEvent。

    PlatformEvent和ParkEvent

    Linux下的PlatformEvent类定义在文件hotspot/src/os/linux/vm/os_linux.hpp中,ParkEvent类继承了PlatformEvent类。PlatformEvent类定义如下:

    class PlatformEvent : public CHeapObj<mtInternal> {
      private:
        double CachePad [4] ;   // increase odds that _mutex is sole occupant of cache line
        volatile int _Event ;
        volatile int _nParked ;
        pthread_mutex_t _mutex  [1] ;
        pthread_cond_t  _cond   [1] ;
        double PostPad  [2] ;
        Thread * _Assoc ;
    
      public:       // TODO-FIXME: make dtor private
        ~PlatformEvent() { guarantee (0, "invariant") ; }
    
      public:
        PlatformEvent() {
          int status;
          status = pthread_cond_init (_cond, os::Linux::condAttr());
          assert_status(status == 0, status, "cond_init");
          status = pthread_mutex_init (_mutex, NULL);
          assert_status(status == 0, status, "mutex_init");
          _Event   = 0 ;
          _nParked = 0 ;
          _Assoc   = NULL ;
        }
    
        // Use caution with reset() and fired() -- they may require MEMBARs
        void reset() { _Event = 0 ; }
        int  fired() { return _Event; }
        void park () ;
        void unpark () ;
        int  TryPark () ;
        int  park (jlong millis) ; // relative timed-wait only
        void SetAssociation (Thread * a) { _Assoc = a ; }
    } ;
    
    • _mutex和_cond都是只有一个元素的数组,之所以这么写而不是定义一个变量我推测是因为引用它们时使用标识符即可作为指针而不用再使用取地址运算符;
    • _Event用于循环中调用pthread_cond_timedwait函数的条件,防止假的唤醒;
    • _nParked表示在该事件上停泊的线程数量。

    park函数

    park函数的实现在文件hotspot/src/os/linux/vm/os_linux.cpp中,代码如下:

    int os::PlatformEvent::park(jlong millis) {
      guarantee (_nParked == 0, "invariant") ;
    
      int v ;
      for (;;) {
          v = _Event ;
          if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
      }
      guarantee (v >= 0, "invariant") ;
      if (v != 0) return OS_OK ;
    
      // We do this the hard way, by blocking the thread.
      // Consider enforcing a minimum timeout value.
      struct timespec abst;
      compute_abstime(&abst, millis);
    
      int ret = OS_TIMEOUT;
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "mutex_lock");
      guarantee (_nParked == 0, "invariant") ;
      ++_nParked ;
    
      // Object.wait(timo) will return because of
      // (a) notification
      // (b) timeout
      // (c) thread.interrupt
      //
      // Thread.interrupt and object.notify{All} both call Event::set.
      // That is, we treat thread.interrupt as a special case of notification.
      // The underlying Solaris implementation, cond_timedwait, admits
      // spurious/premature wakeups, but the JLS/JVM spec prevents the
      // JVM from making those visible to Java code.  As such, we must
      // filter out spurious wakeups.  We assume all ETIME returns are valid.
      //
      // TODO: properly differentiate simultaneous notify+interrupt.
      // In that case, we should propagate the notify to another waiter.
    
      while (_Event < 0) {
        status = os::Linux::safe_cond_timedwait(_cond, _mutex, &abst);
        if (status != 0 && WorkAroundNPTLTimedWaitHang) {
          pthread_cond_destroy (_cond);
          pthread_cond_init (_cond, os::Linux::condAttr()) ;
        }
        assert_status(status == 0 || status == EINTR ||
                      status == ETIME || status == ETIMEDOUT,
                      status, "cond_timedwait");
        if (!FilterSpuriousWakeups) break ;                 // previous semantics
        if (status == ETIME || status == ETIMEDOUT) break ;
        // We consume and ignore EINTR and spurious wakeups.
      }
      --_nParked ;
      if (_Event >= 0) {
         ret = OS_OK;
      }
      _Event = 0 ;
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock");
      assert (_nParked == 0, "invariant") ;
      // Paranoia to ensure our locked and lock-free paths interact
      // correctly with each other.
      OrderAccess::fence();
      return ret;
    }
    
    • guarantee是一个类似Assert的宏,可以忽略;
    • compute_abstime函数将当前时间向后millis毫秒数的值保存到abst,用于后面的pthread_cond_timedwait函数;
    • 先调用pthread_mutex_lock函数获取_mutex成员变量表示的互斥锁,接着使用pthread_cond_timedwait函数在_cond成员变量表示的条件变量上等待abst时间,最后调用pthread_mutex_unlock函数释放锁。
    int os::Linux::safe_cond_timedwait(pthread_cond_t *_cond, pthread_mutex_t *_mutex, const struct timespec *_abstime)
    {
       if (is_NPTL()) {
          return pthread_cond_timedwait(_cond, _mutex, _abstime);
       } else {
          // 6292965: LinuxThreads pthread_cond_timedwait() resets FPU control
          // word back to default 64bit precision if condvar is signaled. Java
          // wants 53bit precision.  Save and restore current value.
          int fpu = get_fpu_control_word();
          int status = pthread_cond_timedwait(_cond, _mutex, _abstime);
          set_fpu_control_word(fpu);
          return status;
       }
    }
    

    unpark函数

    unpark函数实现也在文件hotspot/src/os/linux/vm/os_linux.cpp中,代码如下:

    void os::PlatformEvent::unpark() {
      // Transitions for _Event:
      //    0 :=> 1
      //    1 :=> 1
      //   -1 :=> either 0 or 1; must signal target thread
      //          That is, we can safely transition _Event from -1 to either
      //          0 or 1. Forcing 1 is slightly more efficient for back-to-back
      //          unpark() calls.
      // See also: "Semaphores in Plan 9" by Mullender & Cox
      //
      // Note: Forcing a transition from "-1" to "1" on an unpark() means
      // that it will take two back-to-back park() calls for the owning
      // thread to block. This has the benefit of forcing a spurious return
      // from the first park() call after an unpark() call which will help
      // shake out uses of park() and unpark() without condition variables.
    
      if (Atomic::xchg(1, &_Event) >= 0) return;
    
      // Wait for the thread associated with the event to vacate
      int status = pthread_mutex_lock(_mutex);
      assert_status(status == 0, status, "mutex_lock");
      int AnyWaiters = _nParked;
      assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
      if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
        AnyWaiters = 0;
        pthread_cond_signal(_cond);
      }
      status = pthread_mutex_unlock(_mutex);
      assert_status(status == 0, status, "mutex_unlock");
      if (AnyWaiters != 0) {
        status = pthread_cond_signal(_cond);
        assert_status(status == 0, status, "cond_signal");
      }
    
      // Note that we signal() _after dropping the lock for "immortal" Events.
      // This is safe and avoids a common class of  futile wakeups.  In rare
      // circumstances this can cause a thread to return prematurely from
      // cond_{timed}wait() but the spurious wakeup is benign and the victim will
      // simply re-test the condition and re-park itself.
    }
    
    • 使用pthread_cond_signal函数唤醒条件变量上等待的线程;
    • 这里学习到了pthread_cond_signal调用既可以放在pthread_mutex_lock和pthread_mutex_unlock之间,也可以放在pthread_mutex_lock和pthread_mutex_unlock之后,参见解释

    中断线程

    Thread类实例可以使用interrupt成员方法中断线程,其内部调用了JNI方法interrupt0。

    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
    
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    
    private native void interrupt0();
    

    由RegisterNatives函数可知,在JRE调用interrupt0方法后会在JVM调用JVM_Interrupt函数。JVM_Interrupt函数代码如下:

    extern "C" {
        void JNICALL JVM_Interrupt(JNIEnv* env, jobject jthread) {
            JavaThread* thread=JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread); HandleMarkCleaner __hm(thread);
            Thread* __the_thread__ = thread;
            os::verify_stack_alignment();
            // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
            oop java_thread = JNIHandles::resolve_non_null(jthread);
            MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
            // We need to re-resolve the java_thread, since a GC might have happened during the
            // acquire of the lock
            JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
            if (thr != NULL) {
                Thread::interrupt(thr);
            }
        }
    }
    

    C++ Thread类的interrupt函数和os::interrupt函数实现分别如下:

    void Thread::interrupt(Thread* thread) {
        trace("interrupt", thread);
        debug_only(check_for_dangling_thread_pointer(thread);)
        os::interrupt(thread);
    }
    
    void os::interrupt(Thread* thread) {
        assert(Thread::current() == thread || Threads_lock->owned_by_self(),
            "possibility of dangling Thread pointer");
    
        OSThread* osthread = thread->osthread();
        if (!osthread->interrupted()) {
            osthread->set_interrupted(true);
            // More than one thread can get here with the same value of osthread,
            // resulting in multiple notifications.  We do, however, want the store
            // to interrupted() to be visible to other threads before we execute unpark().
            OrderAccess::fence();
            ParkEvent * const slp = thread->_SleepEvent ;
            if (slp != NULL) slp->unpark() ;
        }
    
        // For JSR166. Unpark even if interrupt status already was set
        if (thread->is_Java_thread())
            ((JavaThread*)thread)->parker()->unpark();
    
        ParkEvent * ev = thread->_ParkEvent ;
        if (ev != NULL) ev->unpark() ;
    }
    
    • 首先取得thread参数关联的OSThread,然后将其中断状态设成true;
    • Thread类的_SleepEvent成员变量是用于休眠的ParkEvent,在其上调用unpark函数;
    • Thread类的_ParkEvent成员变量是用于对象锁的ParkEvent,如果不为NULL,那么当前线程持有某个对象的锁,调用unpark函数唤醒了在该对象上阻塞的其他线程;
    • 对JavaThread,还需要在其_parker成员变量(Parker类型,与ParkEvent功能相似)上调用unpark函数,这个还不太知道原因,有时间看看JSR166。

    线程休眠

    sleep方法是一个native方法,由RegisterNatives函数可知,在JRE调用sleep方法后会在JVM调用JVM_Sleep函数。JVM_Sleep函数代码如下:

    extern "C" {
        void JNICALL JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis) {
            JavaThread* thread=JavaThread::thread_from_jni_environment(env);
            ThreadInVMfromNative __tiv(thread); HandleMarkCleaner __hm(thread);
            Thread* __the_thread__ = thread;
            os::verify_stack_alignment();
            if (millis < 0) {
                THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
            }
    
            if (Thread::is_interrupted (__the_thread__, true) && !HAS_PENDING_EXCEPTION) {
                THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
            }
            // Save current thread state and restore it at the end of this block.
            // And set new thread state to SLEEPING.
            JavaThreadSleepState jtss(thread);
            EventThreadSleep event;
            if (millis == 0) {
                // When ConvertSleepToYield is on, this matches the classic VM implementation of
                // JVM_Sleep. Critical for similar threading behaviour (Win32)
                // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
                // for SOLARIS
                if (ConvertSleepToYield) {
                    os::yield();
                } else {
                    ThreadState old_state = thread->osthread()->get_state();
                    thread->osthread()->set_state(SLEEPING);
                    os::sleep(thread, MinSleepInterval, false);
                    thread->osthread()->set_state(old_state);
                }
            } else {
                ThreadState old_state = thread->osthread()->get_state();
                thread->osthread()->set_state(SLEEPING);
                if (os::sleep(thread, millis, true) == OS_INTRPT) {
                    // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
                    // us while we were sleeping. We do not overwrite those.
                    if (!HAS_PENDING_EXCEPTION) {
                        if (event.should_commit()) {
                        event.set_time(millis);
                        event.commit();
                        }
                        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
                        // to properly restore the thread state.  That's likely wrong.
                        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
                    }
                }
                thread->osthread()->set_state(old_state);
            }
            if (event.should_commit()) {
                event.set_time(millis);
                event.commit();
            }
        }
    }
    

    JVM_Sleep函数首先保存当前线程的状态,接着调用os::sleep函数执行休眠操作,休眠结束后再恢复之前保存的状态。

    os::sleep函数

    os::sleep函数定义在文件hotspot/src/os/linux/vm/os_linux.cpp中,其代码如下所示:

    int os::sleep(Thread* thread, jlong millis, bool interruptible) {
      assert(thread == Thread::current(),  "thread consistency check");
    
      ParkEvent * const slp = thread->_SleepEvent ;
      slp->reset() ;
      OrderAccess::fence() ;
    
      if (interruptible) {
        jlong prevtime = javaTimeNanos();
    
        for (;;) {
          if (os::is_interrupted(thread, true)) {
            return OS_INTRPT;
          }
    
          jlong newtime = javaTimeNanos();
    
          if (newtime - prevtime < 0) {
            // time moving backwards, should only happen if no monotonic clock
            // not a guarantee() because JVM should not abort on kernel/glibc bugs
            assert(!Linux::supports_monotonic_clock(), "time moving backwards");
          } else {
            millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
          }
    
          if(millis <= 0) {
            return OS_OK;
          }
    
          prevtime = newtime;
    
          {
            assert(thread->is_Java_thread(), "sanity check");
            JavaThread *jt = (JavaThread *) thread;
            ThreadBlockInVM tbivm(jt);
            OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
    
            jt->set_suspend_equivalent();
            // cleared by handle_special_suspend_equivalent_condition() or
            // java_suspend_self() via check_and_wait_while_suspended()
    
            slp->park(millis);
    
            // were we externally suspended while we were waiting?
            jt->check_and_wait_while_suspended();
          }
        }
      } else {
        OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
        jlong prevtime = javaTimeNanos();
    
        for (;;) {
          // It'd be nice to avoid the back-to-back javaTimeNanos() calls on
          // the 1st iteration ...
          jlong newtime = javaTimeNanos();
    
          if (newtime - prevtime < 0) {
            // time moving backwards, should only happen if no monotonic clock
            // not a guarantee() because JVM should not abort on kernel/glibc bugs
            assert(!Linux::supports_monotonic_clock(), "time moving backwards");
          } else {
            millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
          }
    
          if(millis <= 0) break ;
    
          prevtime = newtime;
          slp->park(millis);
        }
        return OS_OK ;
      }
    }
    
    • 参数thread是要休眠的线程,millis是休眠时间,interruptible表示是否可中断;
    • 可中断与不可中断的休眠代码差异不大,都是用无限循环加休眠时间构成主体。可中断休眠在每次循环后都调用os::is_interrupted函数检查了中断状态,第二个参数传true意味着会清除中断状态,若线程确实被中断则返回OS_INTRPT,这样JVM_Sleep函数就会抛出异常。
    bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
      assert(Thread::current() == thread || Threads_lock->owned_by_self(),
        "possibility of dangling Thread pointer");
    
      OSThread* osthread = thread->osthread();
    
      bool interrupted = osthread->interrupted();
    
      if (interrupted && clear_interrupted) {
        osthread->set_interrupted(false);
        // consider thread->_SleepEvent->reset() ... optional optimization
      }
    
      return interrupted;
    }
    

    到此,可知Thread类interrupt方法的Javadoc所说的清除中断状态的原因就在于os::is_interrupted(thread, true)调用。

    If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

    OrderAccess::fence()涉及到内存屏障,这是将来要分析的工作。

    相关文章

      网友评论

          本文标题:Java线程的中断与休眠

          本文链接:https://www.haomeiwen.com/subject/piwysqtx.html