北京大学(中国)校训:“自由平等,民主科学。”
话说,所有不谈源码的开发讲解都是瞎扯淡。今天,阴雨绵绵,心中暑气全无。要不一起看看Thread类的源码,jdk的源码倒是很简单的,这个类中大量重要方法是native方法,在jvm中实现的,那可不是看Java代码了,有c基础的,那就没多大问题了,只要能够看到大概意思,我觉得对开发者来说足够用了,不至于有哪家公司招你来把jvm中的代码改下,哈哈。。。
今天需要查看Thread中的jvm源码,jdk中的方法与jvm源码方法是具有对应关系的,通俗理解为注册表,具体可在openjdk\jdk\src\share\native\java\lang\Thread.c,现摘出部分映射关系:
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
一 线程创建与初始化
下面的代码片段已经加了详细的注释,在此就不说了,直接上干货吧
//名称
private volatile String name;
//优先级
private int priority;
//守护线程标识
private boolean daemon = false;
//线程执行的目标对象
private Runnable target;
//线程组
private ThreadGroup group;
//当前线程的指定栈大小,默认值为0,设置似乎意义不大,具体栈分配由jvm决定
private long stackSize;
//线程序列号,为0
private static long threadSeqNumber;
//线程id:由threadSeqNumber++生成
private long tid;
//标识线程状态,默认是线程未启动
/**
* 线程状态有如下几种:NEW RUNNABLE WAITING TIMED_WAITING TERMINATED
* NEW时对应的threadStatus为0;
* */
private int threadStatus = 0;
//存储当前线程的局部变量
ThreadLocal.ThreadLocalMap threadLocals = null;
/**
* 在创建子线程时,子线程会接收所有可继承的线程局部变量的初始值,以获得父线程所具有的值
* 为子线程提供从父线程那里继承的值
* */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//为LockSupport提供的变量,具体可查看LockSupport源码
volatile Object parkBlocker;
//阻塞器锁,主要用于处理阻塞情况
private volatile Interruptible blocker;
//阻断锁
private Object blockerLock = new Object();
//最低优先级
public final static int MIN_PRIORITY = 1;
//默认优先级
public final static int NORM_PRIORITY = 5;
//最高优先级
public final static int MAX_PRIORITY = 10;
/**
* Thread有多种构造器,这里只列出最全的构造方法
* 所有构造器均调用init方法
* */
public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
init(group, target, name, stackSize, null, true);
}
/**
* 底层init方法,用于初始化thread对象
* 参数已在上述属性中做了说明
* */
private void init(ThreadGroup g, Runnable target, String name, long stackSize,
AccessControlContext acc, boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
/**
* 获取当前线程,即创建线程的线程
* 这里体现了父子线程的关系
* */
Thread parent = currentThread();
//获得系统的安全管理器
SecurityManager security = System.getSecurityManager();
if (g == null) {
if (security != null) {
g = security.getThreadGroup();
}
//设置线程组,如果子线程未指定,则取父线程的
if (g == null) {
g = parent.getThreadGroup();
}
}
g.checkAccess();
//线程组未启动线程个数++
g.addUnstarted();
//线程组
this.group = g;
//守护线程继承性,子线程的是否守护取决于父线程
this.daemon = parent.isDaemon();
//优先级继承性,子线程的优先级取决于父线程
this.priority = parent.getPriority();
//Runnable对象
this.target = target;
//设置优先级
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
//为子线程提供从父线程那里继承的值
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
//生成线程ID
tid = nextThreadID();
}
//tid生成器
private static synchronized long nextThreadID() {
return ++threadSeqNumber;
}
public final ThreadGroup getThreadGroup() {
return this.group;
}
public final boolean isDaemon() {
return this.daemon;
}
public final int getPriority() {
return this.priority;
}
public final void setPriority(int priority) {
if (priority <= 10 && priority >= 1) {
ThreadGroup var2;
if ((var2 = this.getThreadGroup()) != null) {
//这里需要注意:线程的优先级上限取决于所属线程组的优先级
if (priority > var2.getMaxPriority()) {
priority = var2.getMaxPriority();
}
this.setPriority0(this.priority = priority);
}
} else {
throw new IllegalArgumentException();
}
}
/**
* 线程执行的具体任务
*/
public void run() {
if (target != null) {
target.run();
}
}
/**
* 线程真正退出前执行清理
*/
private void exit() {
if (group != null) {
group = null;
}
target = null;
threadLocals = null;
inheritableThreadLocals = null;
blocker = null;
}
//获取当前线程的native方法
public static native Thread currentThread();
//设置线程优先级的native方法
private native void setPriority0(int var1);
在上述有两个native方法,即currentThread()和setPriority0(),其中currentThread是获取父线程,setPriority0是设置线程优先级,均在jvm.cpp中,点击查看;现摘出具体片段:
currentThread方法底层调用jvm.cpp中的JVM_CurrentThread函数:
JVM_ENTRY(jobject, JVM_CurrentThread(JNIEnv* env, jclass threadClass))
JVMWrapper("JVM_CurrentThread");
oop jthread = thread->threadObj();
assert (thread != NULL, "no current thread!");
return JNIHandles::make_local(env, jthread);
JVM_END
setPriority0方法底层调用jvm.cpp中的JVM_CurrentThread函数:
JVM_ENTRY(void, JVM_SetThreadPriority(JNIEnv* env, jobject jthread, jint prio))
JVMWrapper("JVM_SetThreadPriority");
// 确保C++线程和OS线程在操作之前不释放
MutexLocker ml(Threads_lock);
oop java_thread = JNIHandles::resolve_non_null(jthread);
java_lang_Thread::set_priority(java_thread, (ThreadPriority)prio);
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
// 线程尚未启动,当设置优先级才会启动
Thread::set_priority(thr, (ThreadPriority)prio);
}
JVM_END
二 线程启动start
start代码片段如下:
/**
* 调用start()方法启动线程,执行线程的run方法
*/
public synchronized void start() {
/**
* 线程状态校验,线程必须是0即新建态才能启动
* 这也是为何一个线程连续两次调start会报错
*/
if (threadStatus != 0) throw new IllegalThreadStateException();
//通知线程组当前线程即将执行,同时线程组中未启动线程数-1
group.add(this);
boolean started = false;
try {
//使线程进入可执行(runnable)状态
start0();
started = true;
} finally {
try {
if (!started) {
//启动失败后,修改线程组未启动线程数+1
group.threadStartFailed(this);
}
} catch (Throwable ignore) { }
}
}
/**
* 设置线程启动的native方法
* 底层会新启动一个线程,新线程才会调用传递过来的Runnable对象run方法
* */
private native void start0();
start0方法底层调用jvm.cpp中的JVM_StartThread函数:
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
//由于排序问题,引发异常时无法持有线程锁。示例:我们可能需要在构造异常时获取堆锁。
bool throw_illegal_thread_state = false;
//在线程start中发布jvmti事件之前,必须释放线程锁。
{
//确保C++线程和OS线程在操作之前没有被释放。
MutexLocker mu(Threads_lock);
//自JDK5以来,线程的threadstatus用于防止重新启动已启动的线程,所以通常会发现javathread是空的。
//但是对于JNI附加的线程,在创建的线程对象(及其JavaThread集合)和对其ThreadStates的更新之间有一个小窗口,
//因此我们必须检查这个窗口
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
//我们还可以检查stillborn标志,看看这个线程是否已经停止,但是出于历史原因,我们让线程在它开始运行时检测它自己
jlong size = java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
//分配C++线程结构并创建本地线程,从Java中传递过来的stack size已经被声明,
//但是构造函数采用size_t(无符号类型),因此避免传递负值
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
//此时可能由于内存不足而没有为javathread创建Osthread。
//检查这种情况并抛出异常。最后,我们可能希望更改此项,以便仅在成功创建线程时获取锁,
//然后我们还可以执行此检查并在JavaThread构造函数中抛出异常。
if (native_thread->osthread() != NULL) {
//注意:当前线程未在“准备”阶段使用
native_thread->prepare(jthread);
}
}
}
if (throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread != NULL, "Starting null thread?");
if (native_thread->osthread() == NULL) {
// No one should hold a reference to the 'native_thread'.
delete native_thread;
if (JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted(
JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
Thread::start(native_thread);
JVM_END
三 线程中断判断
/**
* 判断线程是否已经中断,同时清除中断标识
* static方法,
*/
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
/**
* 判断线程是否已经中断,不清除中断标识
* this代表当前调用此方法的线程对象
*/
public boolean isInterrupted() {
return this.isInterrupted(false);
}
/**
* native方法判断线程是否中断
*/
private native boolean isInterrupted(boolean ClearInterrupted);
isInterrupted方法底层调用jvm.cpp中的JVM_IsInterrupted函数:
JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
JVMWrapper("JVM_IsInterrupted");
//确保C++线程和OS线程在操作之前没有被释放
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
//我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr == NULL) {
return JNI_FALSE;
} else {
return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
}
JVM_END
四 线程join
/**
* 等待调用join的线程执行结束
*/
public final synchronized void join(long var1) throws InterruptedException {
long var3 = System.currentTimeMillis();
long var5 = 0L;
if (var1 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else {
//如果join时不设置超时,则会调用Object.wait的无超时等待
if (var1 == 0L) {
while(this.isAlive()) {
this.wait(0L);
}
} else {
//join设置超时,则会调用Object.wait的超时等待
while(this.isAlive()) {
long var7 = var1 - var5;
if (var7 <= 0L) {
break;
}
this.wait(var7);
var5 = System.currentTimeMillis() - var3;
}
}
}
}
/**
* native方法判断线程存活
*/
public final native boolean isAlive();
Object.wait在下面讲述,isAlive方法底层调用jvm.cpp中的JVM_IsThreadAlive函数:
JVM_ENTRY(jboolean, JVM_IsThreadAlive(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_IsThreadAlive");
oop thread_oop = JNIHandles::resolve_non_null(jthread);
return java_lang_Thread::is_alive(thread_oop);
JVM_END
五 线程sleep
/**
* 线程休眠
* @param var0 毫秒
* @param var2 纳秒
*/
public static void sleep(long var0, int var2) throws InterruptedException {
if (var0 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else if (var2 >= 0 && var2 <= 999999) {
//纳秒四舍五入
if (var2 >= 500000 || var2 != 0 && var0 == 0L) {
++var0;
}
sleep(var0);
} else {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}
}
/**
* native方法线程休眠
*/
public static native void sleep(long var0) throws InterruptedException;
sleep方法底层调用jvm.cpp中的JVM_Sleep函数:
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//线程中断则抛出异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
//保存当前线程状态并在末尾还原它,并将新线程状态设置为SLEEPING
JavaThreadSleepState jtss(thread);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__begin, millis);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_BEGIN(
millis);
#endif /* USDT2 */
EventThreadSleep event;
if (millis == 0) {
//当convertsleeptoyield为on时,这与JVM_Sleep的经典VM实现相匹配。
//对于类似的线程行为(win32)至关重要,即在某些GUI上下文中,对Solaris进行短时间睡眠是有益的。
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);
thread->osthread()->set_state(old_state);
}
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
//当休眠时,一个异步异常(例如,threaddeathexception)可能抛出了,但不需要覆盖它们。
if (!HAS_PENDING_EXCEPTION) {
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
1);
#endif /* USDT2 */
//THROW_MSG方法返回,意味着不能以正确地还原线程状态,因为那很可能是错的。
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
#else /* USDT2 */
HOTSPOT_THREAD_SLEEP_END(
0);
#endif /* USDT2 */
JVM_END
六 线程yield
/**
* native方法线程让度CPU执行权
*/
public static native void yield();
yield方法底层调用jvm.cpp中的JVM_Yield函数:
JVM_ENTRY(void, JVM_Yield(JNIEnv *env, jclass threadClass))
JVMWrapper("JVM_Yield");
if (os::dont_yield()) return;
#ifndef USDT2
HS_DTRACE_PROBE0(hotspot, thread__yield);
#else /* USDT2 */
HOTSPOT_THREAD_YIELD();
#endif /* USDT2 */
//当ConvertYieldToSleep为off(默认)时,这与传统的VM使用yield相匹配,对于类似的线程行为至关重要
if (ConvertYieldToSleep) {//on
//系统调用sleep
os::sleep(thread, MinSleepInterval, false);
} else {//off
//系统调用yield
os::yield();
}
JVM_END
七 线程中断interrupt
/**
* 线程中断
*/
public void interrupt() {
Object var1 = this.blockerLock;
synchronized(this.blockerLock) {
Interruptible var2 = this.blocker;
if (var2 != null) {
this.interrupt0();
var2.interrupt(this);
return;
}
}
this.interrupt0();
}
/**
* native方法线程中断
*/
private native void interrupt0();
interrupt0方法底层调用jvm.cpp中的JVM_Interrupt函数:
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
//确保C++线程和OS线程在操作之前没有被释放
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
//我们需要重新解析java_thread,因为在获取锁的过程中可能会发生GC
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
七 Object的Wait/Notify/NotifyAll
/**
* 线程等待
* @param var1 毫秒
* @param var3 纳秒
*/
public final void wait(long var1, int var3) throws InterruptedException {
if (var1 < 0L) {
throw new IllegalArgumentException("timeout value is negative");
} else if (var3 >= 0 && var3 <= 999999) {
//纳秒>0,毫秒直接++
if (var3 > 0) {
++var1;
}
//调用native方法
this.wait(var1);
} else {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}
}
/**
* native方法线程等待
*/
public final native void wait(long var1) throws InterruptedException;
/**
* native方法线程单个唤醒
*/
public final native void notify();
/**
* native方法线程唤醒等待池中所有线程
*/
public final native void notifyAll();
Wait/Notify/NotifyAll在objectMonitor.cpp中,点击查看;
Wait片段:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
assert(Self->is_Java_thread(), "Must be Java thread!");
JavaThread *jt = (JavaThread *)THREAD;
DeferredInitialize () ;
// Throw IMSX or IEX.
CHECK_OWNER();
//调用is_interrupted()判断并清除线程中断状态,如果中断状态为true,抛出中断异常并结束
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
//post monitor waited event
//注意这是过去式,已经等待完了
if (JvmtiExport::should_post_monitor_waited()) {
//注意:这里传递参数'false',这是因为由于线程中断,等待不会超时
JvmtiExport::post_monitor_waited(jt, this, false);
}
TEVENT (Wait - Throw IEX) ;
THROW(vmSymbols::java_lang_InterruptedException());
return ;
}
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
//创建一个node放入队列
//关键是,在reset()之后,但在park()之前,必须检查是否有挂起的中断
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence();
//在本例中等待队列是一个循环的双向链表,但它也可以是一个优先级队列或任何数据结构。
//_WaitSetLock保护着等待队列.
//通常,等待队列只能由监视器*except*的所有者访问,但在park()因中断超时而返回的情况下也是可以。
//竞争非常小,所以使用一个自旋锁而不是重量级的阻塞锁。
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // 记录旧的递归次数
_waiters++; // waiters 自增
_recursions = 0; // 设置 recursion level to be 1
exit (Self) ; // 退出监视器
guarantee (_owner != Self, "invariant") ;
//一旦在上面的exit()调用中删除了ObjectMonitor的所有权,
//另一个线程就可以进入ObjectMonitor,执行notify()和exit()对象监视器。
//如果另一个线程的exit()调用选择此线程作为后继者,并且此线程在发布MONITOR_CONTENDED_EXIT时发生unpark()调用,
//则我们使用RawMonitors运行事件风险处理,并使用unpark().
//为了避免这个问题,我们重新发布事件,即使未使用原来的unpark(),
//这也不会造成任何伤害,因为已经为此监视器选好了继任者。
if (node._notified != 0 && _succ == Self) {
node._event->unpark();
}
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
//线程处于阻塞状态,并且oop访问是不安全的
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty 空处理
} else
if (node._notified == 0) {
if (millis <= 0) {
// 调用park()方法阻塞线程
Self->_ParkEvent->park () ;
} else {
// 调用park()方法在超时时间内阻塞线程
ret = Self->_ParkEvent->park (millis) ;
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
//当线程不在等待队列时,使用双重检查锁定避免获取_WaitSetLock
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
//从这个线程的角度来看,Node's TState是稳定的,
//没有其他线程能够异步修改TState
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// Reentry phase -- reacquire the monitor.
// re-enter contended(竞争) monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).
// post monitor waited event.
//注意这是过去式,已经等待完了
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// Self has reacquired the lock.
// Lifecycle - the node representing Self must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal(长久的) we wouldn't
// want residual(残留的) elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
//检查是否有通知notify发生
// 从park()方法返回后,判断是否是因为中断返回,再次调用
// thread::is_interrupted(Self, true)判断并清除线程中断状态
// 如果中断状态为true,抛出中断异常并结束。
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
//注意:虚假唤醒将被视为超时;监视器通知优先于线程中断。
}
Notify片段:
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend(预追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append(真正追加) to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
//考虑:当前获取EntryList的tail需要遍历整个链表
//将tail访问转换为CDLL而不是使用当前的DLL,从而使访问时间固定。
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend(预追加) to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append(真正追加) to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
NotifyAll片段:
void ObjectMonitor::notifyAll(TRAPS) {
CHECK_OWNER();
ObjectWaiter* iterator;
if (_WaitSet == NULL) {
TEVENT (Empty-NotifyAll) ;
return ;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
int Tally = 0 ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;
for (;;) {
iterator = DequeueWaiter () ;
if (iterator == NULL) break ;
TEVENT (NotifyAll - Transfer1) ;
++Tally ;
// Disposition - what might we do with iterator ?
// a. add it directly to the EntryList - either tail or head.
// b. push it onto the front of the _cxq.
// For now we use (a).
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
iterator->_notified = 1 ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc(Tally) ;
}
}
特此声明:
分享文章有完整的知识架构图,将从以下几个方面系统展开:
1 基础(Linux/Spring boot/并发)
2 性能调优(jvm/tomcat/mysql)
3 高并发分布式
4 微服务体系
如果您觉得文章不错,请关注阿伦故事,您的支持是我坚持的莫大动力,在此受小弟一拜!
每篇福利:
网友评论