美文网首页
并发系列之Thread源码解读

并发系列之Thread源码解读

作者: 阿伦故事2019 | 来源:发表于2019-07-30 15:17 被阅读0次

    北京大学(中国)校训:“自由平等,民主科学。”


    话说,所有不谈源码的开发讲解都是瞎扯淡。今天,阴雨绵绵,心中暑气全无。要不一起看看Thread类的源码,jdk的源码倒是很简单的,这个类中大量重要方法是native方法,在jvm中实现的,那可不是看Java代码了,有c基础的,那就没多大问题了,只要能够看到大概意思,我觉得对开发者来说足够用了,不至于有哪家公司招你来把jvm中的代码改下,哈哈。。。


    今天需要查看Thread中的jvm源码,jdk中的方法与jvm源码方法是具有对应关系的,通俗理解为注册表,具体可在openjdk\jdk\src\share\native\java\lang\Thread.c,现摘出部分映射关系:

    static JNINativeMethod methods[] = {
        {"start0",           "()V",        (void *)&JVM_StartThread},
        {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
        {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
        {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
        {"resume0",          "()V",        (void *)&JVM_ResumeThread},
        {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
        {"yield",            "()V",        (void *)&JVM_Yield},
        {"sleep",            "(J)V",       (void *)&JVM_Sleep},
        {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
        {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
        {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
        {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
        {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
        {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
        {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
        {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
    };
    

    一 线程创建与初始化

    下面的代码片段已经加了详细的注释,在此就不说了,直接上干货吧

    //名称
        private volatile String name;
        //优先级
        private int priority;
        //守护线程标识
        private boolean daemon = false;
        //线程执行的目标对象
        private Runnable target;
        //线程组
        private ThreadGroup group;
        //当前线程的指定栈大小,默认值为0,设置似乎意义不大,具体栈分配由jvm决定
        private long stackSize;
        //线程序列号,为0
        private static long threadSeqNumber;
        //线程id:由threadSeqNumber++生成
        private long tid;
        //标识线程状态,默认是线程未启动
        /**
         * 线程状态有如下几种:NEW RUNNABLE WAITING TIMED_WAITING TERMINATED
         * NEW时对应的threadStatus为0;
         * */
        private int threadStatus = 0;
        //存储当前线程的局部变量
        ThreadLocal.ThreadLocalMap threadLocals = null;
        /**
         * 在创建子线程时,子线程会接收所有可继承的线程局部变量的初始值,以获得父线程所具有的值
         * 为子线程提供从父线程那里继承的值
         * */
        ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
        //为LockSupport提供的变量,具体可查看LockSupport源码
        volatile Object parkBlocker;
        //阻塞器锁,主要用于处理阻塞情况
        private volatile Interruptible blocker;
        //阻断锁
        private Object blockerLock = new Object();
        //最低优先级
        public final static int MIN_PRIORITY = 1;
        //默认优先级
        public final static int NORM_PRIORITY = 5;
        //最高优先级
        public final static int MAX_PRIORITY = 10;
    
        /**
         * Thread有多种构造器,这里只列出最全的构造方法
         * 所有构造器均调用init方法
         * */
        public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
            init(group, target, name, stackSize, null, true);
        }
    
        /**
         * 底层init方法,用于初始化thread对象
         * 参数已在上述属性中做了说明
         * */
        private void init(ThreadGroup g, Runnable target, String name, long stackSize,
                          AccessControlContext acc, boolean inheritThreadLocals) {
            if (name == null) {
                throw new NullPointerException("name cannot be null");
            }
            this.name = name;
            /**
             * 获取当前线程,即创建线程的线程
             * 这里体现了父子线程的关系
             * */
            Thread parent = currentThread();
            //获得系统的安全管理器
            SecurityManager security = System.getSecurityManager();
            if (g == null) {
                if (security != null) {
                    g = security.getThreadGroup();
                }
                //设置线程组,如果子线程未指定,则取父线程的
                if (g == null) {
                    g = parent.getThreadGroup();
                }
            }
            g.checkAccess();
            //线程组未启动线程个数++
            g.addUnstarted();
            //线程组
            this.group = g;
            //守护线程继承性,子线程的是否守护取决于父线程
            this.daemon = parent.isDaemon();
            //优先级继承性,子线程的优先级取决于父线程
            this.priority = parent.getPriority();
            //Runnable对象
            this.target = target;
            //设置优先级
            setPriority(priority);
            if (inheritThreadLocals && parent.inheritableThreadLocals != null)
                //为子线程提供从父线程那里继承的值
                this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
            this.stackSize = stackSize;
            //生成线程ID
            tid = nextThreadID();
        }
        //tid生成器
        private static synchronized long nextThreadID() {
            return ++threadSeqNumber;
        }
        public final ThreadGroup getThreadGroup() {
            return this.group;
        }
        public final boolean isDaemon() {
            return this.daemon;
        }
        public final int getPriority() {
            return this.priority;
        }
        public final void setPriority(int priority) {
            if (priority <= 10 && priority >= 1) {
                ThreadGroup var2;
                if ((var2 = this.getThreadGroup()) != null) {
                    //这里需要注意:线程的优先级上限取决于所属线程组的优先级
                    if (priority > var2.getMaxPriority()) {
                        priority = var2.getMaxPriority();
                    }
                    this.setPriority0(this.priority = priority);
                }
            } else {
                throw new IllegalArgumentException();
            }
        }
        /**
         * 线程执行的具体任务
         */
        public void run() {
            if (target != null) {
                target.run();
            }
        }
        /**
         * 线程真正退出前执行清理
         */
        private void exit() {
            if (group != null) {
                group = null;
            }
            target = null;
            threadLocals = null;
            inheritableThreadLocals = null;
            blocker = null;
        }
        //获取当前线程的native方法
        public static native Thread currentThread();
        //设置线程优先级的native方法
        private native void setPriority0(int var1);
    

    在上述有两个native方法,即currentThread()和setPriority0(),其中currentThread是获取父线程,setPriority0是设置线程优先级,均在jvm.cpp中,点击查看;现摘出具体片段:
    currentThread方法底层调用jvm.cpp中的JVM_CurrentThread函数:

    JVM_ENTRY(jobject, JVM_CurrentThread(JNIEnv* env, jclass threadClass))
      JVMWrapper("JVM_CurrentThread");
      oop jthread = thread->threadObj();
      assert (thread != NULL, "no current thread!");
      return JNIHandles::make_local(env, jthread);
    JVM_END
    

    setPriority0方法底层调用jvm.cpp中的JVM_CurrentThread函数:

    JVM_ENTRY(void, JVM_SetThreadPriority(JNIEnv* env, jobject jthread, jint prio))
      JVMWrapper("JVM_SetThreadPriority");
      // 确保C++线程和OS线程在操作之前不释放
      MutexLocker ml(Threads_lock);
      oop java_thread = JNIHandles::resolve_non_null(jthread);
      java_lang_Thread::set_priority(java_thread, (ThreadPriority)prio);
      JavaThread* thr = java_lang_Thread::thread(java_thread);
      if (thr != NULL) {                  
        // 线程尚未启动,当设置优先级才会启动
        Thread::set_priority(thr, (ThreadPriority)prio);
      }
    JVM_END
    

    二 线程启动start

    start代码片段如下:

    /**
         * 调用start()方法启动线程,执行线程的run方法
         */
        public synchronized void start() {
            /**
             * 线程状态校验,线程必须是0即新建态才能启动
             * 这也是为何一个线程连续两次调start会报错
             */
            if (threadStatus != 0) throw new IllegalThreadStateException();
            //通知线程组当前线程即将执行,同时线程组中未启动线程数-1
            group.add(this);
            boolean started = false;
            try {
                //使线程进入可执行(runnable)状态
                start0();
                started = true;
            } finally {
                try {
                    if (!started) {
                        //启动失败后,修改线程组未启动线程数+1
                        group.threadStartFailed(this);
                    }
                } catch (Throwable ignore) { }
            }
        }
        /**
         * 设置线程启动的native方法
         * 底层会新启动一个线程,新线程才会调用传递过来的Runnable对象run方法
         * */
        private native void start0();
    

    start0方法底层调用jvm.cpp中的JVM_StartThread函数:

    JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
      JVMWrapper("JVM_StartThread");
      JavaThread *native_thread = NULL;
    
      //由于排序问题,引发异常时无法持有线程锁。示例:我们可能需要在构造异常时获取堆锁。
      bool throw_illegal_thread_state = false;
    
      //在线程start中发布jvmti事件之前,必须释放线程锁。
      {
        //确保C++线程和OS线程在操作之前没有被释放。
        MutexLocker mu(Threads_lock);
    
        //自JDK5以来,线程的threadstatus用于防止重新启动已启动的线程,所以通常会发现javathread是空的。
        //但是对于JNI附加的线程,在创建的线程对象(及其JavaThread集合)和对其ThreadStates的更新之间有一个小窗口,
        //因此我们必须检查这个窗口
        if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
          throw_illegal_thread_state = true;
        } else {
          //我们还可以检查stillborn标志,看看这个线程是否已经停止,但是出于历史原因,我们让线程在它开始运行时检测它自己
          jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
          //分配C++线程结构并创建本地线程,从Java中传递过来的stack size已经被声明,
          //但是构造函数采用size_t(无符号类型),因此避免传递负值
          size_t sz = size > 0 ? (size_t) size : 0;
          native_thread = new JavaThread(&thread_entry, sz);
          //此时可能由于内存不足而没有为javathread创建Osthread。
          //检查这种情况并抛出异常。最后,我们可能希望更改此项,以便仅在成功创建线程时获取锁,
          //然后我们还可以执行此检查并在JavaThread构造函数中抛出异常。
          if (native_thread->osthread() != NULL) {
            //注意:当前线程未在“准备”阶段使用
            native_thread->prepare(jthread);
          }
        }
      }
      if (throw_illegal_thread_state) {
        THROW(vmSymbols::java_lang_IllegalThreadStateException());
      }
      assert(native_thread != NULL, "Starting null thread?");
      if (native_thread->osthread() == NULL) {
        // No one should hold a reference to the 'native_thread'.
        delete native_thread;
        if (JvmtiExport::should_post_resource_exhausted()) {
          JvmtiExport::post_resource_exhausted(
            JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
            "unable to create new native thread");
        }
        THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
                  "unable to create new native thread");
      }
      Thread::start(native_thread);
    
    JVM_END
    

    三 线程中断判断

    /**
         * 判断线程是否已经中断,同时清除中断标识
         * static方法,
         */
        public static boolean interrupted() {
            return currentThread().isInterrupted(true);
        }
        /**
         * 判断线程是否已经中断,不清除中断标识
         * this代表当前调用此方法的线程对象
         */
        public boolean isInterrupted() {
            return this.isInterrupted(false);
        }
        /**
         * native方法判断线程是否中断
         */
        private native boolean isInterrupted(boolean ClearInterrupted);
    

    isInterrupted方法底层调用jvm.cpp中的JVM_IsInterrupted函数:

    JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
      JVMWrapper("JVM_IsInterrupted");
    
      //确保C++线程和OS线程在操作之前没有被释放
      oop java_thread = JNIHandles::resolve_non_null(jthread);
      MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
      //我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
      JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
      if (thr == NULL) {
        return JNI_FALSE;
      } else {
        return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
      }
    JVM_END
    

    四 线程join

    /**
         * 等待调用join的线程执行结束
         */
        public final synchronized void join(long var1) throws InterruptedException {
            long var3 = System.currentTimeMillis();
            long var5 = 0L;
            if (var1 < 0L) {
                throw new IllegalArgumentException("timeout value is negative");
            } else {
                //如果join时不设置超时,则会调用Object.wait的无超时等待
                if (var1 == 0L) {
                    while(this.isAlive()) {
                        this.wait(0L);
                    }
                } else {
                    //join设置超时,则会调用Object.wait的超时等待
                    while(this.isAlive()) {
                        long var7 = var1 - var5;
                        if (var7 <= 0L) {
                            break;
                        }    
                        this.wait(var7);
                        var5 = System.currentTimeMillis() - var3;
                    }
                }
    
            }
        }
        /**
         * native方法判断线程存活
         */
        public final native boolean isAlive();
    

    Object.wait在下面讲述,isAlive方法底层调用jvm.cpp中的JVM_IsThreadAlive函数:

    JVM_ENTRY(jboolean, JVM_IsThreadAlive(JNIEnv* env, jobject jthread))
      JVMWrapper("JVM_IsThreadAlive");
    
      oop thread_oop = JNIHandles::resolve_non_null(jthread);
      return java_lang_Thread::is_alive(thread_oop);
    JVM_END
    

    五 线程sleep

    /**
         * 线程休眠
         * @param var0 毫秒
         * @param var2 纳秒
         */
        public static void sleep(long var0, int var2) throws InterruptedException {
            if (var0 < 0L) {
                throw new IllegalArgumentException("timeout value is negative");
            } else if (var2 >= 0 && var2 <= 999999) {
                //纳秒四舍五入
                if (var2 >= 500000 || var2 != 0 && var0 == 0L) {
                    ++var0;
                }
                sleep(var0);
            } else {
                throw new IllegalArgumentException("nanosecond timeout value out of range");
            }
        }
        /**
         * native方法线程休眠
         */
        public static native void sleep(long var0) throws InterruptedException;
    

    sleep方法底层调用jvm.cpp中的JVM_Sleep函数:

    JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
      JVMWrapper("JVM_Sleep");
    
      if (millis < 0) {
        THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
      }
      //线程中断则抛出异常
      if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
      //保存当前线程状态并在末尾还原它,并将新线程状态设置为SLEEPING
      JavaThreadSleepState jtss(thread);
    
    #ifndef USDT2
      HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
    #else /* USDT2 */
      HOTSPOT_THREAD_SLEEP_BEGIN(
                                 millis);
    #endif /* USDT2 */
    
      EventThreadSleep event;
    
      if (millis == 0) {
        //当convertsleeptoyield为on时,这与JVM_Sleep的经典VM实现相匹配。
        //对于类似的线程行为(win32)至关重要,即在某些GUI上下文中,对Solaris进行短时间睡眠是有益的。
        if (ConvertSleepToYield) {
          os::yield();
        } else {
          ThreadState old_state = thread->osthread()->get_state();
          thread->osthread()->set_state(SLEEPING);
          os::sleep(thread, MinSleepInterval, false);
          thread->osthread()->set_state(old_state);
        }
      } else {
        ThreadState old_state = thread->osthread()->get_state();
        thread->osthread()->set_state(SLEEPING);
        if (os::sleep(thread, millis, true) == OS_INTRPT) {
          //当休眠时,一个异步异常(例如,threaddeathexception)可能抛出了,但不需要覆盖它们。
          if (!HAS_PENDING_EXCEPTION) {
            if (event.should_commit()) {
              event.set_time(millis);
              event.commit();
            }
    #ifndef USDT2
            HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
    #else /* USDT2 */
            HOTSPOT_THREAD_SLEEP_END(
                                     1);
    #endif /* USDT2 */
           
            //THROW_MSG方法返回,意味着不能以正确地还原线程状态,因为那很可能是错的。
            THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
          }
        }
        thread->osthread()->set_state(old_state);
      }
      if (event.should_commit()) {
        event.set_time(millis);
        event.commit();
      }
    #ifndef USDT2
      HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
    #else /* USDT2 */
      HOTSPOT_THREAD_SLEEP_END(
                               0);
    #endif /* USDT2 */
    JVM_END
    

    六 线程yield

    /**
         * native方法线程让度CPU执行权
         */
        public static native void yield();
    

    yield方法底层调用jvm.cpp中的JVM_Yield函数:

    JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
      JVMWrapper("JVM_Yield");
      if (os::dont_yield()) return;
    #ifndef USDT2
      HS_DTRACE_PROBE0(hotspot, thread__yield);
    #else /* USDT2 */
      HOTSPOT_THREAD_YIELD();
    #endif /* USDT2 */
      //当ConvertYieldToSleep为off(默认)时,这与传统的VM使用yield相匹配,对于类似的线程行为至关重要
      if (ConvertYieldToSleep) {//on
        //系统调用sleep
        os::sleep(thread, MinSleepInterval, false);
      } else {//off
        //系统调用yield
        os::yield();
      }
    JVM_END
    

    七 线程中断interrupt

    /**
         * 线程中断
         */
        public void interrupt() {
            Object var1 = this.blockerLock;
            synchronized(this.blockerLock) {
                Interruptible var2 = this.blocker;
                if (var2 != null) {
                    this.interrupt0();
                    var2.interrupt(this);
                    return;
                }
            }
            this.interrupt0();
        }
        /**
         * native方法线程中断
         */
        private native void interrupt0();
    

    interrupt0方法底层调用jvm.cpp中的JVM_Interrupt函数:

    JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
      JVMWrapper("JVM_Interrupt");
    
      //确保C++线程和OS线程在操作之前没有被释放
      oop java_thread = JNIHandles::resolve_non_null(jthread);
      MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
      //我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
      JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
      if (thr != NULL) {
        Thread::interrupt(thr);
      }
    JVM_END
    

    七 Object的Wait/Notify/NotifyAll

    /**
         * 线程等待
         * @param var1 毫秒
         * @param var3 纳秒
         */
        public final void wait(long var1, int var3) throws InterruptedException {
            if (var1 < 0L) {
                throw new IllegalArgumentException("timeout value is negative");
            } else if (var3 >= 0 && var3 <= 999999) {
                //纳秒>0,毫秒直接++
                if (var3 > 0) {
                    ++var1;
                }
                //调用native方法
                this.wait(var1);
            } else {
                throw new IllegalArgumentException("nanosecond timeout value out of range");
            }
        }
        /**
         * native方法线程等待
         */
        public final native void wait(long var1) throws InterruptedException;
        /**
         * native方法线程单个唤醒
         */
        public final native void notify();
        /**
         * native方法线程唤醒等待池中所有线程
         */
        public final native void notifyAll();
    

    Wait/Notify/NotifyAll在objectMonitor.cpp中,点击查看
    Wait片段:

    void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
       Thread * const Self = THREAD ;
       assert(Self->is_Java_thread(), "Must be Java thread!");
       JavaThread *jt = (JavaThread *)THREAD;
    
       DeferredInitialize () ;
    
       // Throw IMSX or IEX.
       CHECK_OWNER();
    
       //调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
       if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
         //post monitor waited event
         //注意这是过去式,已经等待完了
         if (JvmtiExport::should_post_monitor_waited()) {
            //注意:这里传递参数'false',这是因为由于线程中断,等待不会超时
            JvmtiExport::post_monitor_waited(jt, this, false);
         }
         TEVENT (Wait - Throw IEX) ;
         THROW(vmSymbols::java_lang_InterruptedException());
         return ;
       }
       TEVENT (Wait) ;
    
       assert (Self->_Stalled == 0, "invariant") ;
       Self->_Stalled = intptr_t(this) ;
       jt->set_current_waiting_monitor(this);
    
       // create a node to be put into the queue
       // Critically, after we reset() the event but prior to park(), we must check
       // for a pending interrupt.
    
       //创建一个node放入队列
       //关键是,在reset()之后,但在park()之前,必须检查是否有挂起的中断
       ObjectWaiter node(Self);
       node.TState = ObjectWaiter::TS_WAIT ;
       Self->_ParkEvent->reset() ;
       OrderAccess::fence();
    
       //在本例中等待队列是一个循环的双向链表,但它也可以是一个优先级队列或任何数据结构。
       //_WaitSetLock保护着等待队列.
       //通常,等待队列只能由监视器*except*的所有者访问,但在park()因中断超时而返回的情况下也是可以。
       //竞争非常小,所以使用一个自旋锁而不是重量级的阻塞锁。
       Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
       AddWaiter (&node) ;
       Thread::SpinRelease (&_WaitSetLock) ;
    
       if ((SyncFlags & 4) == 0) {
          _Responsible = NULL ;
       }
       intptr_t save = _recursions; // 记录旧的递归次数
       _waiters++;                  // waiters 自增
       _recursions = 0;             // 设置 recursion level to be 1
       exit (Self) ;                // 退出监视器
       guarantee (_owner != Self, "invariant") ;
    
       //一旦在上面的exit()调用中删除了ObjectMonitor的所有权,
       //另一个线程就可以进入ObjectMonitor,执行notify()和exit()对象监视器。
       //如果另一个线程的exit()调用选择此线程作为后继者,并且此线程在发布MONITOR_CONTENDED_EXIT时发生unpark()调用,
       //则我们使用RawMonitors运行事件风险处理,并使用unpark().
       //为了避免这个问题,我们重新发布事件,即使未使用原来的unpark(),
       //这也不会造成任何伤害,因为已经为此监视器选好了继任者。
       if (node._notified != 0 && _succ == Self) {
          node._event->unpark();
       }
    
       // The thread is on the WaitSet list - now park() it.
       // On MP systems it's conceivable that a brief spin before we park
       // could be profitable.
       //
       // TODO-FIXME: change the following logic to a loop of the form
       //   while (!timeout && !interrupted && _notified == 0) park()
    
       int ret = OS_OK ;
       int WasNotified = 0 ;
       { // State transition wrappers
         OSThread* osthread = Self->osthread();
         OSThreadWaitState osts(osthread, true);
         {
           ThreadBlockInVM tbivm(jt);
           // Thread is in thread_blocked state and oop access is unsafe.
           //线程处于阻塞状态,并且oop访问是不安全的
           jt->set_suspend_equivalent();
    
           if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
               // Intentionally empty 空处理
           } else
           if (node._notified == 0) {
             if (millis <= 0) {
                // 调用park()方法阻塞线程
                Self->_ParkEvent->park () ;
             } else {
                // 调用park()方法在超时时间内阻塞线程
                ret = Self->_ParkEvent->park (millis) ;
             }
           }
    
           // were we externally suspended while we were waiting?
           if (ExitSuspendEquivalent (jt)) {
              // TODO-FIXME: add -- if succ == Self then succ = null.
              jt->java_suspend_self();
           }
    
         } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
    
         //当线程不在等待队列时,使用双重检查锁定避免获取_WaitSetLock
         if (node.TState == ObjectWaiter::TS_WAIT) {
             Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
             if (node.TState == ObjectWaiter::TS_WAIT) {
                DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
                assert(node._notified == 0, "invariant");
                node.TState = ObjectWaiter::TS_RUN ;
             }
             Thread::SpinRelease (&_WaitSetLock) ;
         }
    
         //从这个线程的角度来看,Node's TState是稳定的,
         //没有其他线程能够异步修改TState
         guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
         OrderAccess::loadload() ;
         if (_succ == Self) _succ = NULL ;
         WasNotified = node._notified ;
    
         // Reentry phase -- reacquire the monitor.
         // re-enter contended(竞争) monitor after object.wait().
         // retain OBJECT_WAIT state until re-enter successfully completes
         // Thread state is thread_in_vm and oop access is again safe,
         // although the raw address of the object may have changed.
         // (Don't cache naked oops over safepoints, of course).
    
         // post monitor waited event.
         //注意这是过去式,已经等待完了
         if (JvmtiExport::should_post_monitor_waited()) {
           JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
         }
         OrderAccess::fence() ;
    
         assert (Self->_Stalled != 0, "invariant") ;
         Self->_Stalled = 0 ;
    
         assert (_owner != Self, "invariant") ;
         ObjectWaiter::TStates v = node.TState ;
         if (v == ObjectWaiter::TS_RUN) {
             enter (Self) ;
         } else {
             guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
             ReenterI (Self, &node) ;
             node.wait_reenter_end(this);
         }
    
         // Self has reacquired the lock.
         // Lifecycle - the node representing Self must not appear on any queues.
         // Node is about to go out-of-scope, but even if it were immortal(长久的) we wouldn't
         // want residual(残留的) elements associated with this thread left on any lists.
         guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
         assert    (_owner == Self, "invariant") ;
         assert    (_succ != Self , "invariant") ;
       } // OSThreadWaitState()
    
       jt->set_current_waiting_monitor(NULL);
    
       guarantee (_recursions == 0, "invariant") ;
       _recursions = save;     // restore the old recursion count
       _waiters--;             // decrement the number of waiters
    
       // Verify a few postconditions
       assert (_owner == Self       , "invariant") ;
       assert (_succ  != Self       , "invariant") ;
       assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    
       if (SyncFlags & 32) {
          OrderAccess::fence() ;
       }
    
       //检查是否有通知notify发生
       // 从park()方法返回后,判断是否是因为中断返回,再次调用
       // thread::is_interrupted(Self, true)判断并清除线程中断状态
       // 如果中断状态为true,抛出中断异常并结束。
       if (!WasNotified) {
         // no, it could be timeout or Thread.interrupt() or both
         // check for interrupt event, otherwise it is timeout
         if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
           TEVENT (Wait - throw IEX from epilog) ;
           THROW(vmSymbols::java_lang_InterruptedException());
         }
       }
       //注意:虚假唤醒将被视为超时;监视器通知优先于线程中断。
    }
    

    Notify片段:

    void ObjectMonitor::notify(TRAPS) {
      CHECK_OWNER();
      if (_WaitSet == NULL) {
         TEVENT (Empty-Notify) ;
         return ;
      }
      DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
    
      int Policy = Knob_MoveNotifyee ;
    
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
      ObjectWaiter * iterator = DequeueWaiter() ;
      if (iterator != NULL) {
         TEVENT (Notify1 - Transfer) ;
         guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
         guarantee (iterator->_notified == 0, "invariant") ;
         if (Policy != 4) {
            iterator->TState = ObjectWaiter::TS_ENTER ;
         }
         iterator->_notified = 1 ;
    
         ObjectWaiter * List = _EntryList ;
         if (List != NULL) {
            assert (List->_prev == NULL, "invariant") ;
            assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            assert (List != iterator, "invariant") ;
         }
    
         if (Policy == 0) {       // prepend(预追加) to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         } else
         if (Policy == 1) {      // append(真正追加) to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                //考虑:当前获取EntryList的tail需要遍历整个链表
                //将tail访问转换为CDLL而不是使用当前的DLL,从而使访问时间固定。
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } else
         if (Policy == 2) {      // prepend to cxq
             // prepend(预追加) to cxq
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) {
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
             }
         } else
         if (Policy == 3) {      // append(真正追加) to cxq
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         } else {
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;
         }
    
         if (Policy < 4) {
           iterator->wait_reenter_begin(this);
         }
    
         // _WaitSetLock protects the wait queue, not the EntryList.  We could
         // move the add-to-EntryList operation, above, outside the critical section
         // protected by _WaitSetLock.  In practice that's not useful.  With the
         // exception of  wait() timeouts and interrupts the monitor owner
         // is the only thread that grabs _WaitSetLock.  There's almost no contention
         // on _WaitSetLock so it's not profitable to reduce the length of the
         // critical section.
      }
      Thread::SpinRelease (&_WaitSetLock) ;
      if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
         ObjectMonitor::_sync_Notifications->inc() ;
      }
    }
    

    NotifyAll片段:

    void ObjectMonitor::notifyAll(TRAPS) {
      CHECK_OWNER();
      ObjectWaiter* iterator;
      if (_WaitSet == NULL) {
          TEVENT (Empty-NotifyAll) ;
          return ;
      }
      DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
    
      int Policy = Knob_MoveNotifyee ;
      int Tally = 0 ;
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
    
      for (;;) {
         iterator = DequeueWaiter () ;
         if (iterator == NULL) break ;
         TEVENT (NotifyAll - Transfer1) ;
         ++Tally ;
    
         // Disposition - what might we do with iterator ?
         // a.  add it directly to the EntryList - either tail or head.
         // b.  push it onto the front of the _cxq.
         // For now we use (a).
    
         guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
         guarantee (iterator->_notified == 0, "invariant") ;
         iterator->_notified = 1 ;
         if (Policy != 4) {
            iterator->TState = ObjectWaiter::TS_ENTER ;
         }
    
         ObjectWaiter * List = _EntryList ;
         if (List != NULL) {
            assert (List->_prev == NULL, "invariant") ;
            assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
            assert (List != iterator, "invariant") ;
         }
    
         if (Policy == 0) {       // prepend to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         } else
         if (Policy == 1) {      // append to EntryList
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                // CONSIDER:  finding the tail currently requires a linear-time walk of
                // the EntryList.  We can make tail access constant-time by converting to
                // a CDLL instead of using our current DLL.
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } else
         if (Policy == 2) {      // prepend to cxq
             // prepend to cxq
             iterator->TState = ObjectWaiter::TS_CXQ ;
             for (;;) {
                 ObjectWaiter * Front = _cxq ;
                 iterator->_next = Front ;
                 if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                     break ;
                 }
             }
         } else
         if (Policy == 3) {      // append to cxq
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         } else {
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;
         }
    
         if (Policy < 4) {
           iterator->wait_reenter_begin(this);
         }
    
         // _WaitSetLock protects the wait queue, not the EntryList.  We could
         // move the add-to-EntryList operation, above, outside the critical section
         // protected by _WaitSetLock.  In practice that's not useful.  With the
         // exception of  wait() timeouts and interrupts the monitor owner
         // is the only thread that grabs _WaitSetLock.  There's almost no contention
         // on _WaitSetLock so it's not profitable to reduce the length of the
         // critical section.
      }
    
      Thread::SpinRelease (&_WaitSetLock) ;
    
      if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
         ObjectMonitor::_sync_Notifications->inc(Tally) ;
      }
    }
    

    特此声明:
    分享文章有完整的知识架构图,将从以下几个方面系统展开:
    1 基础(Linux/Spring boot/并发)
    2 性能调优(jvm/tomcat/mysql)
    3 高并发分布式
    4 微服务体系
    如果您觉得文章不错,请关注阿伦故事,您的支持是我坚持的莫大动力,在此受小弟一拜!


    每篇福利:

    评论区打出车型.jpg

    相关文章

      网友评论

          本文标题:并发系列之Thread源码解读

          本文链接:https://www.haomeiwen.com/subject/rxjbrctx.html