sysmon 不依赖 P 直接执行,通过 newm 函数创建一个工作线程,每20微秒一次的扫描p
这个工作线程是系统线程,不受调度器控制。sysmon同样可以见监控影响整个调度
//创建 m 对象
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_)
mp.sigmask = initSigmask
if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
// We're on a locked M or a thread that may have been
// started by C. The kernel state of this thread may
// be strange (the user may have locked it for that
// purpose). We don't want to clone that into another
// thread. Instead, ask a known-good thread to create
// the thread for us.
//
// This is disabled on Plan 9. See golang.org/issue/22227.
//
// TODO: This may be unnecessary on Windows, which
// doesn't model thread creation off fork.
lock(&newmHandoff.lock)
if newmHandoff.haveTemplateThread == 0 {
throw("on a locked thread with no template thread")
}
mp.schedlink = newmHandoff.newm
newmHandoff.newm.set(mp)
if newmHandoff.waiting {
newmHandoff.waiting = false
notewakeup(&newmHandoff.wake)
}
unlock(&newmHandoff.lock)
return
}
newm1(mp)
}
func newm1(mp *m) {
if iscgo {
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(funcPC(mstart))
if msanenabled {
msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
execLock.rlock() // Prevent process clone.
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock() // Prevent process clone.
// 通过系统调用新建工作线程
newosproc(mp)
execLock.runlock()
}
接下来,我们就来看 sysmon 函数到底做了什么?
sysmon 执行一个无限循环,一开始每次循环休眠 20us,之后(1 ms 后)每次休眠时间倍增,最终每一轮都会休眠 10ms。
sysmon 中会进行 netpool(获取 fd 事件)、retake(抢占)、forcegc(按时间强制执行 gc),scavenge heap(释放自由列表中多余的项减少内存占用)等处理。
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
//如果处于系统调度之下,需要检查是否需要抢占
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
// _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1
// pd.syscalltick != _p_.syscalltick,说明已经不是上次观察到的系统调用了,
// 而是另外一次系统调用,所以需要重新记录 tick 和 when 值
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
//连续运行超过 10 毫秒了,发起抢占请求
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
// 只要满足下面三个条件中的任意一个,则抢占该 p,否则不抢占
// 1. p 的运行队列里面有等待运行的 goroutine
// 2. 没有无所事事的 p
// 3. 从上一次监控线程观察到 p 对应的 m 处于系统调用之中到现在已经超过 10 毫秒
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
// 寻找一新的 m 接管 p
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
从代码来看,主要会对处于 _Psyscall 和 _Prunning 状态的 p 进行抢占。
对于系统调用中的p一下三种情况都能触发抢占
1、正在进行系统调用的p的本地队列当中还有别的任务,p因为执行系统调用而无法处理这些任务。
2、没有sched.nmspinning (自旋)的p也没有sched.npidle 都为 0的p。
3、从上一次监控线程观察到 p 对应的 m 处于系统调用之中到现在已经超过 10 毫秒。 说明系统调用占用时间过长,需要抢占。
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
//如果 p 本地有工作或者全局有工作,需要绑定一个 m
// if it has local work, start it straight away
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
//没有任务且,所有其它 p 都在运行 goroutine,说明系统比较忙,需要启动一个新的 m
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
// p 没有本地工作,启动一个自旋 m 来找工作
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 全局队列有工作
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
// 没有工作要处理,把 p 放入全局空闲队列
pidleput(_p_)
unlock(&sched.lock)
}
如果p的本地有工作启动一个m,没有工作要启动一个自旋的m。最后,如果实在没有工作要处理,就将 p 放入全局空闲队列里。
// 调用 m 来绑定 p,如果没有 m,那就新建一个
// 如果 p 为空,那就尝试获取一个处于空闲状态的 p,如果找到 p,那就什么都不做
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
// 没有指定 p 则需要从全局空闲队列中获取一个 p
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// 如果找到 p,放弃。还原全局处于自旋状态的 m 的数量
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
// 没有空闲的 p,直接返回
return
}
}
mp := mget()
unlock(&sched.lock)
if mp == nil {
// 如果没有找到 m
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
// 创建新的工作线程
newm(fn, _p_)
return
}
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
// 设置空闲 m 马上要结合的 p
mp.nextp.set(_p_)
// 唤醒空闲 m
notewakeup(&mp.park)
}
先去找一个空闲的m,如果没找到就去新建一个m,如果spinning 为true就设置为自旋,否则就不设置让后返回m。如果p和m都正常,mp.nextp.set(p),让后唤醒m。
//唤醒线程
func notewakeup(n *note) {
var v uintptr
for {
v = atomic.Loaduintptr(&n.key)
//设置 n.key = 1, 被唤醒的线程通过查看该值是否等于 1,来确定是被其它线程唤醒还是意外从睡眠中苏醒
if atomic.Casuintptr(&n.key, v, locked) {
break
}
}
// Successfully set waitm to locked.
// What was it before?
switch {
case v == 0:
// Nothing was waiting. Done.
case v == locked:
// Two notewakeups! Not allowed.
throw("notewakeup - double wakeup")
default:
// Must be the waiting m. Wake it up.
//通过系统调用 唤醒m
semawakeup((*m)(unsafe.Pointer(v)))
}
}
设置note.key =true ,通过系统调用唤醒m。内核在完成唤醒工作之后当前工作线程从内核返回到 futex 函数继续执行 SYSCALL 指令之后的代码并按函数调用链原路返回,继续执行其它代码。
而被唤醒的工作线程则由内核负责在适当的时候调度到 CPU 上运行。
抢占长时间运行的 P
对于处在_Prunning状态的p,sysmon会扫描每一个sysmontick ,同时会记录当前调度器调度的次数和当前时间。
对比 sysmon 记录下的 p 的调度次数和时间,与当前 p 自己记录下的调度次数和时间对比,说明 P 在这一段时间内一直在运行同一个 goroutine。那就来计算一下运行时间是否太长了。
type sysmontick struct {
schedtick uint32
schedwhen int64
syscalltick uint32
syscallwhen int64
}
当运行时间超过10ms,就会触发preemptone(p)。发起抢占
// 被抢占的 goroutine
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
抢占就是让一个占用事件很长的任务,切换成g0,暂停运行,放入全局可运行队列,等待下次有 m 来全局队列找工作时才能继续运行,让后g0重新开始调度。
网友评论