1、ThreadSafepointState
ThreadSafepointState的定义同样在safepoint.hpp中,其包含的属性如下:
- volatile bool _at_poll_safepoint; //为true表示当前线程处于基于poll page实现的安全点上
- bool _has_called_back; //是否已经执行了block方法
- JavaThread * _thread; //关键的JavaThread实例
- volatile suspend_type _type; //具体的状态
- JavaThreadState _orig_thread_state; //获取原来的状态
其中suspend_type是一个描述线程是否停在安全点上的枚举,其定义如下:
image.png
重点关注以下方法的实现。
1.1 构造方法 / create / destroy
ThreadSafepointState::ThreadSafepointState(JavaThread *thread) {
_thread = thread;
//初始状态就是_running
_type = _running;
_has_called_back = false;
_at_poll_safepoint = false;
}
void ThreadSafepointState::create(JavaThread *thread) {
ThreadSafepointState *state = new ThreadSafepointState(thread);
thread->set_safepoint_state(state);
}
void ThreadSafepointState::destroy(JavaThread *thread) {
if (thread->safepoint_state()) {
//释放thead的ThreadSafepointState实例
delete(thread->safepoint_state());
thread->set_safepoint_state(NULL);
}
}
其中create和destroy方法的调用链如下:
image.png
image.png
1.2、examine_state_of_thread / restart
examine_state_of_thread方法根据当前线程的状态调整每个线程的ThreadSafepointState,restart用于将每个线程ThreadSafepointState的状态恢复成初始的running,_has_called_back属性恢复成初始的false,其实现如下:
void ThreadSafepointState::examine_state_of_thread() {
//校验当前状态时running
assert(is_running(), "better be running or just have hit safepoint poll");
//保存线程原来的状态
JavaThreadState state = _thread->thread_state();
_orig_thread_state = state;
//判断线程是否已经挂起
bool is_suspended = _thread->is_ext_suspended();
if (is_suspended) {
//如果已挂起,通知SafepointSynchronize当前线程已到达安全点
roll_forward(_at_safepoint);
return;
}
//有些线程的ThreadSafepointState是running,但是实际已经是处于安全点上了,这里更新状态
if (SafepointSynchronize::safepoint_safe(_thread, state)) {
//检查是否是lazy_critical_native,如果是则将当前线程标识为已进入关键区
SafepointSynchronize::check_for_lazy_critical_native(_thread, state);
//将状态置为_at_safepoint
roll_forward(_at_safepoint);
return;
}
if (state == _thread_in_vm) {
//将状态置为_call_back
roll_forward(_call_back);
return;
}
//其他状态的线程会继续执行,直到状态被流转成_thread_blocked,然后通过safepoint_safe方法被置为_at_safepoint
assert(is_running(), "examine_state_of_thread on non-running thread");
return;
}
// Returns true is thread could not be rolled forward at present position.
void ThreadSafepointState::roll_forward(suspend_type type) {
//修改type
_type = type;
switch(_type) {
case _at_safepoint:
//通知SafepointSynchronize当前线程已经停在安全点上,减少_waiting_to_block
SafepointSynchronize::signal_thread_at_safepoint();
if (_thread->in_critical()) {
//如果线程处于JNI关键区,则增加对应的线程计数
SafepointSynchronize::increment_jni_active_count();
}
break;
case _call_back:
//将_has_called_back置为false,该属性默认为false
set_has_called_back(false);
break;
case _running:
default:
ShouldNotReachHere();
}
}
void ThreadSafepointState::restart() {
switch(type()) {
case _at_safepoint:
case _call_back:
break;
case _running:
default:
tty->print_cr("restart thread " INTPTR_FORMAT " with state %d",
_thread, _type);
_thread->print();
ShouldNotReachHere();
}
//将状态置为running,has_called_back恢复成初始的false
_type = _running;
set_has_called_back(false);
}
bool is_running() const { return (_type==_running); }
static void signal_thread_at_safepoint() { _waiting_to_block--; }
inline static void increment_jni_active_count() {
assert_locked_or_safepoint(Safepoint_lock);
_current_jni_active_count++;
}
bool SafepointSynchronize::safepoint_safe(JavaThread *thread, JavaThreadState state) {
switch(state) {
case _thread_in_native:
//上一个栈帧不是Java,或者当前栈帧是walkable
return !thread->has_last_Java_frame() || thread->frame_anchor()->walkable();
case _thread_blocked:
assert(!thread->has_last_Java_frame() || thread->frame_anchor()->walkable(), "blocked and not walkable");
return true;
default:
return false;
}
}
void SafepointSynchronize::check_for_lazy_critical_native(JavaThread *thread, JavaThreadState state) {
if (state == _thread_in_native &&
thread->has_last_Java_frame() &&
thread->frame_anchor()->walkable()) {
//满足条件的线程可能是处于JNI关键区中且是编译代码
frame wrapper_frame = thread->last_frame();
CodeBlob* stub_cb = wrapper_frame.cb();
if (stub_cb != NULL &&
stub_cb->is_nmethod() &&
stub_cb->as_nmethod_or_null()->is_lazy_critical_native()) {
if (!thread->do_critical_native_unlock()) {
//表示进入了安全区
thread->enter_critical();
// Make sure the native wrapper calls back on return to
// perform the needed critical unlock.
thread->set_critical_native_unlock();
}
}
}
}
其调用链如下:
image.png
image.png
2、SafepointSynchronize::begin
SafepointSynchronize::begin就是开启安全点的方法,会通知所有JavaThread安全点开启了,然后通过循环等待的方式等待所有的JavaThread都变成阻塞状态,安全点同步完成,然后就可以执行必须在安全点下执行的任务了,其实现如下:
void SafepointSynchronize::begin() {
Thread* myThread = Thread::current();
//校验当前线程是VMThread
assert(myThread->is_VM_thread(), "Only VM thread may execute a safepoint");
if (PrintSafepointStatistics || PrintSafepointStatisticsTimeout > 0) {
_safepoint_begin_time = os::javaTimeNanos();
_ts_of_current_safepoint = tty->time_stamp().seconds();
}
#if INCLUDE_ALL_GCS
if (UseConcMarkSweepGC) {
//如果使用CMS算法,则获取CMS Token
ConcurrentMarkSweepThread::synchronize(false);
} else if (UseG1GC) {
SuspendibleThreadSet::synchronize();
}
#endif // INCLUDE_ALL_GCS
//获取Threads_lock锁,在end方法退出安全点时会释放该锁
//线程创建和退出都需要获取该锁,即在安全点时无法创建或者销毁线程
Threads_lock->lock();
//校验当前安全点的状态为未同步
assert( _state == _not_synchronized, "trying to safepoint synchronize with wrong state");
//获取当前线程的总数量
int nof_threads = Threads::number_of_threads();
if (TraceSafepoint) {
tty->print_cr("Safepoint synchronization initiated. (%d)", nof_threads);
}
//通知RuntimeService 开始进入安全点
RuntimeService::record_safepoint_begin();
//获取Safepoint_lock锁,线程状态流转时必须获取该锁
MutexLocker mu(Safepoint_lock);
//重置计数器,用来记录处于JNI关键区的线程数
_current_jni_active_count = 0;
//记录等待被阻塞的线程数
_waiting_to_block = nof_threads;
//定义的静态volatile变量,用来记录尝试去阻塞的线程数
TryingToBlock = 0 ;
int still_running = nof_threads;
// Save the starting time, so that it can be compared to see if this has taken
// too long to complete.
jlong safepoint_limit_time = 0;
timeout_error_printed = false;
if (PrintSafepointStatistics || PrintSafepointStatisticsTimeout > 0) {
//如果需要打印日志,则初始化_safepoint_stats
deferred_initialize_stat();
}
//修改_state,表示开始进入安全点了
_state = _synchronizing;
//强制高速缓存刷新,在各CPU间同步数据
OrderAccess::fence();
//刷新所有线程的状态到内存中
if (!UseMembar) {
os::serialize_thread_states();
}
//通知解释器进入安全点
Interpreter::notice_safepoints();
//UseCompilerSafepoints默认为true,表示是否让编译代码在执行过程中停在安全点上
//DeferPollingPageLoopCount默认值为-1,表示尝试改变PollingPage的状态的循环次数
if (UseCompilerSafepoints && DeferPollingPageLoopCount < 0) {
// PageArmed是一个静态变量,表示polling_page的状态
guarantee (PageArmed == 0, "invariant") ;
PageArmed = 1 ;
os::make_polling_page_unreadable();
}
//获取当前系统的CPU个数
int ncpus = os::processor_count() ;
//SafepointTimeout为false,如果为true则表示如果等待进入安全点的总耗时超过SafepointTimeoutDelay就打印失败日志
//SafepointTimeoutDelay的默认值是10000ms
if (SafepointTimeout)
//计算进入安全点的最迟时间
safepoint_limit_time = os::javaTimeNanos() + (jlong)SafepointTimeoutDelay * MICROUNITS;
// Iterate through all threads until it have been determined how to stop them all at a safepoint
unsigned int iterations = 0;
int steps = 0 ;
while(still_running > 0) {
//遍历一遍所有的JavaThread,判断线程ThreadSafepointState状态
for (JavaThread *cur = Threads::first(); cur != NULL; cur = cur->next()) {
assert(!cur->is_ConcurrentGC_thread(), "A concurrent GC thread is unexpectly being suspended");
ThreadSafepointState *cur_state = cur->safepoint_state();
if (cur_state->is_running()) {
//根据当前线程的状态检查并修改ThreadSafepointState
cur_state->examine_state_of_thread();
if (!cur_state->is_running()) {
//如果已经不是running,将计数减一
still_running--;
}
if (TraceSafepoint && Verbose) cur_state->print();
}
}
if (PrintSafepointStatistics && iterations == 0) {
begin_statistics(nof_threads, still_running);
}
if (still_running > 0) {
//所有线程遍历完了
//依然有处于running状态的线程
if (SafepointTimeout && safepoint_limit_time < os::javaTimeNanos()) {
//等待超时了
print_safepoint_timeout(_spinning_timeout);
}
if (UseCompilerSafepoints && int(iterations) == DeferPollingPageLoopCount) {
//DeferPollingPageLoopCount默认值是-1,不会走此逻辑
//如果大于0,则表示当循环次数等于该值时,才会通知编译代码进入安全点
guarantee (PageArmed == 0, "invariant") ;
PageArmed = 1 ;
os::make_polling_page_unreadable();
}
++steps ;
if (ncpus > 1 && steps < SafepointSpinBeforeYield) { //SafepointSpinBeforeYield的默认值是2000
SpinPause() ; // 利用系统调度,做短暂的自旋
} else if (steps < DeferThrSuspendLoopCount) { //DeferThrSuspendLoopCount的默认值是4000
os::NakedYield() ; //让出当前VMThread的CPU执行权
} else {
os::yield_all(steps) ; //linux下的实现和NakedYield一样,都是调用底层的sched_yield
}
iterations ++ ;
}
assert(iterations < (uint)max_jint, "We have been iterating in the safepoint loop too long");
}
//所有线程的状态都变成非running了
assert(still_running == 0, "sanity check");
if (PrintSafepointStatistics) {
update_statistics_on_spin_end();
}
//不断循环直到所有JavaThread都变成阻塞状态
while (_waiting_to_block > 0) {
if (TraceSafepoint) tty->print_cr("Waiting for %d thread(s) to block", _waiting_to_block);
if (!SafepointTimeout || timeout_error_printed) {
//如果SafepointTimeout为false,直接在Safepoint_lock上等待
Safepoint_lock->wait(true); // true, means with no safepoint checks
} else {
//SafepointTimeout为true,计算剩余的时间
jlong remaining_time = safepoint_limit_time - os::javaTimeNanos();
if (remaining_time < 0 || Safepoint_lock->wait(true, remaining_time / MICROUNITS)) {
//如果超时了,打印日志
print_safepoint_timeout(_blocking_timeout);
}
}
}
//校验所有JavaThread都被阻塞了
assert(_waiting_to_block == 0, "sanity check");
//校验_safepoint_counter必须是偶数
assert((_safepoint_counter & 0x1) == 0, "must be even");
//校验获取了Threads_lock锁
assert(Threads_lock->owned_by_self(), "must hold Threads_lock");
_safepoint_counter ++;
//修改状态
_state = _synchronized;
//刷新高速缓存
OrderAccess::fence();
//更新处于JNI关键区中的线程数
GC_locker::set_jni_lock_count(_current_jni_active_count);
if (TraceSafepoint) {
VM_Operation *op = VMThread::vm_operation();
tty->print_cr("Entering safepoint region: %s", (op != NULL) ? op->name() : "no vm operation");
}
//通知RuntimeService,安全点同步完成
RuntimeService::record_safepoint_synchronized();
if (PrintSafepointStatistics) {
//更新统计数据SafepointStats
update_statistics_on_sync_end(os::javaTimeNanos());
}
//执行清理
do_cleanup_tasks();
if (PrintSafepointStatistics) {
//更新统计数据SafepointStats
update_statistics_on_cleanup_end(os::javaTimeNanos());
}
}
//相关组件在JVM进入安全点后执行清理任务
void SafepointSynchronize::do_cleanup_tasks() {
{
TraceTime t1("deflating idle monitors", TraceSafepointCleanupTime);
ObjectSynchronizer::deflate_idle_monitors();
}
{
TraceTime t2("updating inline caches", TraceSafepointCleanupTime);
InlineCacheBuffer::update_inline_caches();
}
{
TraceTime t3("compilation policy safepoint handler", TraceSafepointCleanupTime);
CompilationPolicy::policy()->do_safepoint_work();
}
{
TraceTime t4("mark nmethods", TraceSafepointCleanupTime);
NMethodSweeper::mark_active_nmethods();
}
if (SymbolTable::needs_rehashing()) {
TraceTime t5("rehashing symbol table", TraceSafepointCleanupTime);
SymbolTable::rehash_table();
}
if (StringTable::needs_rehashing()) {
TraceTime t6("rehashing string table", TraceSafepointCleanupTime);
StringTable::rehash_table();
}
if (UseGCLogFileRotation) {
gclog_or_tty->rotate_log(false);
}
{
TraceTime t7("purging class loader data graph", TraceSafepointCleanupTime);
ClassLoaderDataGraph::purge_if_needed();
}
}
void RuntimeService::record_safepoint_begin() {
if (PrintGCApplicationConcurrentTime && _app_timer.is_updated()) {
gclog_or_tty->date_stamp(PrintGCDateStamps);
gclog_or_tty->stamp(PrintGCTimeStamps);
gclog_or_tty->print_cr("Application time: %3.7f seconds",
last_application_time_sec());
}
//更新计时器,记录开始时间
_safepoint_timer.update();
_last_safepoint_sync_time_sec = 0.0;
if (UsePerfData) {
//增加总的进入安全点的次数
_total_safepoints->inc();
if (_app_timer.is_updated()) {
_application_time_ticks->inc(_app_timer.ticks_since_update());
}
}
}
void RuntimeService::record_safepoint_synchronized() {
if (UsePerfData) {
_sync_time_ticks->inc(_safepoint_timer.ticks_since_update());
}
if (PrintGCApplicationStoppedTime) {
_last_safepoint_sync_time_sec = last_safepoint_time_sec();
}
}
void SafepointSynchronize::deferred_initialize_stat() {
//init_done是一个静态属性
if (init_done) return;
if (PrintSafepointStatisticsCount <= 0) {
fatal("Wrong PrintSafepointStatisticsCount");
}
int stats_array_size;
if (PrintSafepointStatisticsTimeout > 0) {
stats_array_size = 1;
PrintSafepointStatistics = true;
} else {
stats_array_size = PrintSafepointStatisticsCount;
}
//初始化_safepoint_stats
_safepoint_stats = (SafepointStats*)os::malloc(stats_array_size
* sizeof(SafepointStats), mtInternal);
guarantee(_safepoint_stats != NULL,
"not enough memory for safepoint instrumentation data");
if (UseCompilerSafepoints && DeferPollingPageLoopCount >= 0) {
//need_to_track_page_armed_status也是一个静态属性
need_to_track_page_armed_status = true;
}
init_done = true;
}
其调用链如下:
image.png
3、SafepointSynchronize::block
该方法用于阻塞当前线程直到VMThead从安全点退出,通常与do_call_back方法同时使用,该方法用于判断是否需要调用block方法,这两方法的实现如下:
inline static bool do_call_back() {
return (_state != _not_synchronized);
}
void SafepointSynchronize::block(JavaThread *thread) {
assert(thread != NULL, "thread must be set");
assert(thread->is_Java_thread(), "not a Java thread");
//停止打印
ttyLocker::break_tty_lock_for_safepoint(os::current_thread_id());
//如果线程正在退出中
if (thread->is_terminated()) {
//如果是因为JVM退出而终止则阻塞当前线程直到JVM进程终止
thread->block_if_vm_exited();
return;
}
JavaThreadState state = thread->thread_state();
//让当前线程变成walkable
thread->frame_anchor()->make_walkable(thread);
// Check that we have a valid thread_state at this point
switch(state) {
case _thread_in_vm_trans:
case _thread_in_Java: //只有编译代码调用block时线程状态是_thread_in_Java,其他的都是中间状态
//临时修改线程状态为_thread_in_vm
thread->set_thread_state(_thread_in_vm);
if (is_synchronizing()) {
//如果正在同步中,增加TryingToBlock计数
Atomic::inc (&TryingToBlock) ;
}
//进入begin方法会获取Safepoint_lock锁,直到所有线程的ThreadSafepointState都变成非running了,才会释放该锁
Safepoint_lock->lock_without_safepoint_check();
if (is_synchronizing()) {
//如果还在同步中
assert(_waiting_to_block > 0, "sanity check");
//等待阻塞的线程数减1
_waiting_to_block--;
//has_called_back置为true,表示已经调用了block方法
thread->safepoint_state()->set_has_called_back(true);
if (thread->in_critical()) {
//如果该线程处于JNI关键区,则增加计数
increment_jni_active_count();
}
if (_waiting_to_block == 0) {
//唤醒在Safepoint_lock上等待的VMThread
Safepoint_lock->notify_all();
}
}
//修改线程状态为_thread_blocked
thread->set_thread_state(_thread_blocked);
Safepoint_lock->unlock();
//begin方法中VMThread会占用Threads_lock,一直到end方法快执行完了才释放该锁,即整个安全点期间
//当前线程都会被阻塞,直到end方法执行完成,VMThread从安全点退出
Threads_lock->lock_without_safepoint_check();
//已经从安全点退出,恢复原来的线程状态
thread->set_thread_state(state);
//释放锁Threads_lock
Threads_lock->unlock();
break;
case _thread_in_native_trans:
case _thread_blocked_trans:
case _thread_new_trans:
//在examine_state_of_thread方法中,只有_thread_in_vm状态的线程才会将其ThreadSafepointState置为_call_back
//如果是_thread_in_vm,则进入block方法前会将其设置为_thread_in_vm_trans,因此不会出现此种情形
if (thread->safepoint_state()->type() == ThreadSafepointState::_call_back) {
thread->print_thread_state();
fatal("Deadlock in safepoint code. "
"Should have called back to the VM before blocking.");
}
//设置线程状态为_thread_blocked
thread->set_thread_state(_thread_blocked);
//让当前线程在Threads_lock上被阻塞,直到end方法被调用
Threads_lock->lock_without_safepoint_check();
//恢复成原来的状态
thread->set_thread_state(state);
//解锁Threads_lock
Threads_lock->unlock();
break;
default:
fatal(err_msg("Illegal threadstate encountered: %d", state));
}
//已经从安全点退出了
if (state != _thread_blocked_trans &&
state != _thread_in_vm_trans &&
thread->has_special_runtime_exit_condition()) {
//如果有异常或者被要求挂起
thread->handle_special_runtime_exit_condition(
!thread->is_at_poll_safepoint() && (state != _thread_in_native_trans));
}
}
void JavaThread::block_if_vm_exited() {
if (_terminated == _vm_exited) {
//只有JVM退出了才会将线程终止状态里置为_vm_exited,JVM退出是在安全点下执行的,此期间
//Threads_lock不会被释放,即当前线程会一直阻塞,直到JVM退出
Threads_lock->lock_without_safepoint_check();
ShouldNotReachHere();
}
}
inline static bool is_synchronizing() { return _state == _synchronizing; }
block方法会将当前线程的状态改成_thread_blocked,并且变成walkable,从而满足SafepointSynchronize::safepoint_safe中的条件,examine_state_of_thread再次检查该线程的运行状态时就会将该线程的ThreadSafepointState置为_at_safepoint,然后执行roll_forward的时候会将_waiting_to_block减一;然后block方法将该线程的状态置为_thread_blocked,在Threads_lock上阻塞等待VMThread从安全点退出,释放Threads_lock。需要注意,调用block方法时,如果线程的状态是_thread_in_vm_trans或者_thread_in_Java,则是走另一种逻辑,会先将线程修改成_thread_in_vm,然后examine_state_of_thread执行时会将当前线程的ThreadSafepointState置为_call_back,即非running的状态,然后等待Safepoint_lock;等所有的线程都变成非running状态了,Safepoint_lock锁被释放了,就会减少_waiting_to_block,将线程状态置为_thread_blocked,然后在Threads_lock上阻塞等待VMThread从安全点退出,释放Threads_lock。
4、SafepointSynchronize::end
end方法用于实现安全点退出,将所有JavaThread的ThreadSafepointState重置成初始状态,然后释放锁Threads_lock,所有因为安全点而等待获取Threads_lock锁的线程就会逐一恢复正常执行,其实现如下:
void SafepointSynchronize::end() {
//校验当前线程占有锁Threads_lock
assert(Threads_lock->owned_by_self(), "must hold Threads_lock");
//校验_safepoint_counter为奇数,贼begin方法和end方法中都会加1
assert((_safepoint_counter & 0x1) == 1, "must be odd");
_safepoint_counter ++;
//校验当前线程是VMThread
assert(myThread->is_VM_thread(), "Only VM thread can execute a safepoint");
if (PrintSafepointStatistics) {
end_statistics(os::javaTimeNanos());
}
if (PageArmed) {
//让polling_page变得可读,表示安全点结束
os::make_polling_page_readable();
PageArmed = 0 ;
}
//移除解释器执行字节码时的安全点检查逻辑,恢复成正常的执行逻辑
Interpreter::ignore_safepoints();
{
//获取锁Safepoint_lock
MutexLocker mu(Safepoint_lock);
//校验当前状态为_synchronized
assert(_state == _synchronized, "must be synchronized before ending safepoint synchronization");
//重置状态,并刷新高速缓存
_state = _not_synchronized;
OrderAccess::fence();
if (TraceSafepoint) {
tty->print_cr("Leaving safepoint region");
}
//遍历所有的线程,重置其ThreadSafepointState
for(JavaThread *current = Threads::first(); current; current = current->next()) {
//VMThreadHintNoPreempt默认为false
if (VMThreadHintNoPreempt) {
os::hint_no_preempt();
}
ThreadSafepointState* cur_state = current->safepoint_state();
assert(cur_state->type() != ThreadSafepointState::_running, "Thread not suspended at safepoint");
cur_state->restart();
assert(cur_state->is_running(), "safepoint state has not been reset");
}
//通知RuntimeService 安全点退出
RuntimeService::record_safepoint_end();
//释放锁Threads_lock,所有停在安全点上的JavaThread可以陆陆续续恢复正常执行
Threads_lock->unlock();
}
#if INCLUDE_ALL_GCS
if (UseConcMarkSweepGC) {
//释放CMS Token,从而允许CMS Thread正常执行
ConcurrentMarkSweepThread::desynchronize(false);
} else if (UseG1GC) {
SuspendibleThreadSet::desynchronize();
}
#endif // INCLUDE_ALL_GCS
//记录上一次安全点结束的时间
_end_of_last_safepoint = os::javaTimeMillis();
}
void RuntimeService::record_safepoint_end() {
if (PrintGCApplicationStoppedTime) {
gclog_or_tty->date_stamp(PrintGCDateStamps);
gclog_or_tty->stamp(PrintGCTimeStamps);
gclog_or_tty->print_cr("Total time for which application threads "
"were stopped: %3.7f seconds, "
"Stopping threads took: %3.7f seconds",
last_safepoint_time_sec(),
_last_safepoint_sync_time_sec);
}
//重启表示应用正常执行时间的计时器
_app_timer.update();
if (UsePerfData) {
//增加安全点的累计耗时
_safepoint_time_ticks->inc(_safepoint_timer.ticks_since_update());
}
}
其调用链如下:
image.png
上begin和end方法的调用链可知,安全点的触发和退出都是在VMThread中完成的,也主要是为VMThread服务的,VMThread相关实现可以参考VM_Operation 源码解析
网友评论