美文网首页
sysmon 工作线程

sysmon 工作线程

作者: freedom117 | 来源:发表于2021-05-20 11:42 被阅读0次

    sysmon 不依赖 P 直接执行,通过 newm 函数创建一个工作线程,每20微秒一次的扫描p
    这个工作线程是系统线程,不受调度器控制。sysmon同样可以见监控影响整个调度
    //创建 m 对象

    func newm(fn func(), _p_ *p) {
        mp := allocm(_p_, fn)
        mp.nextp.set(_p_)
        mp.sigmask = initSigmask
        if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
            // We're on a locked M or a thread that may have been
            // started by C. The kernel state of this thread may
            // be strange (the user may have locked it for that
            // purpose). We don't want to clone that into another
            // thread. Instead, ask a known-good thread to create
            // the thread for us.
            //
            // This is disabled on Plan 9. See golang.org/issue/22227.
            //
            // TODO: This may be unnecessary on Windows, which
            // doesn't model thread creation off fork.
            lock(&newmHandoff.lock)
            if newmHandoff.haveTemplateThread == 0 {
                throw("on a locked thread with no template thread")
            }
            mp.schedlink = newmHandoff.newm
            newmHandoff.newm.set(mp)
            if newmHandoff.waiting {
                newmHandoff.waiting = false
                notewakeup(&newmHandoff.wake)
            }
            unlock(&newmHandoff.lock)
            return
        }
        newm1(mp)
    }
    
    
    func newm1(mp *m) {
        if iscgo {
            var ts cgothreadstart
            if _cgo_thread_start == nil {
                throw("_cgo_thread_start missing")
            }
            ts.g.set(mp.g0)
            ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
            ts.fn = unsafe.Pointer(funcPC(mstart))
            if msanenabled {
                msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
            }
            execLock.rlock() // Prevent process clone.
            asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
            execLock.runlock()
            return
        }
        execLock.rlock() // Prevent process clone.
    // 通过系统调用新建工作线程
        newosproc(mp)
        execLock.runlock()
    }
    

    接下来,我们就来看 sysmon 函数到底做了什么?
    sysmon 执行一个无限循环,一开始每次循环休眠 20us,之后(1 ms 后)每次休眠时间倍增,最终每一轮都会休眠 10ms。
    sysmon 中会进行 netpool(获取 fd 事件)、retake(抢占)、forcegc(按时间强制执行 gc),scavenge heap(释放自由列表中多余的项减少内存占用)等处理。

    
    func retake(now int64) uint32 {
        n := 0
        // Prevent allp slice changes. This lock will be completely
        // uncontended unless we're already stopping the world.
        lock(&allpLock)
        // We can't use a range loop over allp because we may
        // temporarily drop the allpLock. Hence, we need to re-fetch
        // allp each time around the loop.
        for i := 0; i < len(allp); i++ {
            _p_ := allp[i]
            if _p_ == nil {
                // This can happen if procresize has grown
                // allp but not yet created new Ps.
                continue
            }
            pd := &_p_.sysmontick
            s := _p_.status
            sysretake := false
      //如果处于系统调度之下,需要检查是否需要抢占 
            if s == _Prunning || s == _Psyscall {
                // Preempt G if it's running for too long.
                t := int64(_p_.schedtick)
                if int64(pd.schedtick) != t {
    // _p_.syscalltick 用于记录系统调用的次数,在完成系统调用之后加 1
    // pd.syscalltick != _p_.syscalltick,说明已经不是上次观察到的系统调用了,
    // 而是另外一次系统调用,所以需要重新记录 tick 和 when 值
                    pd.schedtick = uint32(t)
                    pd.schedwhen = now
                } else if pd.schedwhen+forcePreemptNS <= now {
    //连续运行超过 10 毫秒了,发起抢占请求
                    preemptone(_p_)
                    // In case of syscall, preemptone() doesn't
                    // work, because there is no M wired to P.
                    sysretake = true
                }
            }
            if s == _Psyscall {
                // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
                t := int64(_p_.syscalltick)
                if !sysretake && int64(pd.syscalltick) != t {
                    pd.syscalltick = uint32(t)
                    pd.syscallwhen = now
                    continue
                }
                // On the one hand we don't want to retake Ps if there is no other work to do,
                // but on the other hand we want to retake them eventually
                // because they can prevent the sysmon thread from deep sleep.
    // 只要满足下面三个条件中的任意一个,则抢占该 p,否则不抢占
    // 1. p 的运行队列里面有等待运行的 goroutine
    // 2. 没有无所事事的 p
    // 3. 从上一次监控线程观察到 p 对应的 m 处于系统调用之中到现在已经超过 10 毫秒
                if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                    continue
                }
                // Drop allpLock so we can take sched.lock.
                unlock(&allpLock)
                // Need to decrement number of idle locked M's
                // (pretending that one more is running) before the CAS.
                // Otherwise the M from which we retake can exit the syscall,
                // increment nmidle and report deadlock.
                incidlelocked(-1)
                if atomic.Cas(&_p_.status, s, _Pidle) {
                    if trace.enabled {
                        traceGoSysBlock(_p_)
                        traceProcStop(_p_)
                    }
                    n++
                    _p_.syscalltick++
    // 寻找一新的 m 接管 p
                    handoffp(_p_)
                }
                incidlelocked(1)
                lock(&allpLock)
            }
        }
        unlock(&allpLock)
        return uint32(n)
    }
    

    从代码来看,主要会对处于 _Psyscall 和 _Prunning 状态的 p 进行抢占。
    对于系统调用中的p一下三种情况都能触发抢占
    1、正在进行系统调用的p的本地队列当中还有别的任务,p因为执行系统调用而无法处理这些任务。
    2、没有sched.nmspinning (自旋)的p也没有sched.npidle 都为 0的p。
    3、从上一次监控线程观察到 p 对应的 m 处于系统调用之中到现在已经超过 10 毫秒。 说明系统调用占用时间过长,需要抢占。

    
    func handoffp(_p_ *p) {
        // handoffp must start an M in any situation where
        // findrunnable would return a G to run on _p_.
            //如果 p 本地有工作或者全局有工作,需要绑定一个 m
        // if it has local work, start it straight away
        if !runqempty(_p_) || sched.runqsize != 0 {
            startm(_p_, false)
            return
        }
        // if it has GC work, start it straight away
        if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
            startm(_p_, false)
            return
        }
        // no local work, check that there are no spinning/idle M's,
        // otherwise our help is not required
    //没有任务且,所有其它 p 都在运行 goroutine,说明系统比较忙,需要启动一个新的 m
        if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
    // p 没有本地工作,启动一个自旋 m 来找工作
            startm(_p_, true)
            return
        }
        lock(&sched.lock)
        if sched.gcwaiting != 0 {
            _p_.status = _Pgcstop
            sched.stopwait--
            if sched.stopwait == 0 {
                notewakeup(&sched.stopnote)
            }
            unlock(&sched.lock)
            return
        }
        if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
            sched.safePointFn(_p_)
            sched.safePointWait--
            if sched.safePointWait == 0 {
                notewakeup(&sched.safePointNote)
            }
        }
    // 全局队列有工作
        if sched.runqsize != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
        // If this is the last running P and nobody is polling network,
        // need to wakeup another M to poll network.
        if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
            unlock(&sched.lock)
            startm(_p_, false)
            return
        }
        if when := nobarrierWakeTime(_p_); when != 0 {
            wakeNetPoller(when)
        }
    // 没有工作要处理,把 p 放入全局空闲队列
        pidleput(_p_)
        unlock(&sched.lock)
    }
    

    如果p的本地有工作启动一个m,没有工作要启动一个自旋的m。最后,如果实在没有工作要处理,就将 p 放入全局空闲队列里。

    // 调用 m 来绑定 p,如果没有 m,那就新建一个
    // 如果 p 为空,那就尝试获取一个处于空闲状态的 p,如果找到 p,那就什么都不做
    func startm(_p_ *p, spinning bool) {
        lock(&sched.lock)
        if _p_ == nil {
    // 没有指定 p 则需要从全局空闲队列中获取一个 p
            _p_ = pidleget()
            if _p_ == nil {
                unlock(&sched.lock)
                if spinning {
    // 如果找到 p,放弃。还原全局处于自旋状态的 m 的数量
                    // The caller incremented nmspinning, but there are no idle Ps,
                    // so it's okay to just undo the increment and give up.
                    if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                        throw("startm: negative nmspinning")
                    }
                }
    // 没有空闲的 p,直接返回
                return
            }
        }
        mp := mget()
        unlock(&sched.lock)
        if mp == nil {
    // 如果没有找到 m
            var fn func()
            if spinning {
                // The caller incremented nmspinning, so set m.spinning in the new M.
                fn = mspinning
            }
    // 创建新的工作线程
            newm(fn, _p_)
            return
        }
        if mp.spinning {
            throw("startm: m is spinning")
        }
        if mp.nextp != 0 {
            throw("startm: m has p")
        }
        if spinning && !runqempty(_p_) {
            throw("startm: p has runnable gs")
        }
        // The caller incremented nmspinning, so set m.spinning in the new M.
        mp.spinning = spinning
    // 设置空闲 m 马上要结合的 p
        mp.nextp.set(_p_)
    // 唤醒空闲 m
        notewakeup(&mp.park)
    }
    

    先去找一个空闲的m,如果没找到就去新建一个m,如果spinning 为true就设置为自旋,否则就不设置让后返回m。如果p和m都正常,mp.nextp.set(p),让后唤醒m。

    //唤醒线程
    func notewakeup(n *note) {
        var v uintptr
        for {
            v = atomic.Loaduintptr(&n.key)
      //设置 n.key = 1, 被唤醒的线程通过查看该值是否等于 1,来确定是被其它线程唤醒还是意外从睡眠中苏醒
            if atomic.Casuintptr(&n.key, v, locked) {
                break
            }
        }
    
        // Successfully set waitm to locked.
        // What was it before?
        switch {
        case v == 0:
            // Nothing was waiting. Done.
        case v == locked:
            // Two notewakeups! Not allowed.
            throw("notewakeup - double wakeup")
        default:
            // Must be the waiting m. Wake it up.
    //通过系统调用 唤醒m
            semawakeup((*m)(unsafe.Pointer(v)))
        }
    }
    

    设置note.key =true ,通过系统调用唤醒m。内核在完成唤醒工作之后当前工作线程从内核返回到 futex 函数继续执行 SYSCALL 指令之后的代码并按函数调用链原路返回,继续执行其它代码。
    而被唤醒的工作线程则由内核负责在适当的时候调度到 CPU 上运行。

    抢占长时间运行的 P

    对于处在_Prunning状态的p,sysmon会扫描每一个sysmontick ,同时会记录当前调度器调度的次数和当前时间。
    对比 sysmon 记录下的 p 的调度次数和时间,与当前 p 自己记录下的调度次数和时间对比,说明 P 在这一段时间内一直在运行同一个 goroutine。那就来计算一下运行时间是否太长了。

    type sysmontick struct {
        schedtick   uint32
        schedwhen   int64
        syscalltick uint32
        syscallwhen int64
    }
    

    当运行时间超过10ms,就会触发preemptone(p)。发起抢占

    //  被抢占的 goroutine
    func preemptone(_p_ *p) bool {
        mp := _p_.m.ptr()
        if mp == nil || mp == getg().m {
            return false
        }
        gp := mp.curg
        if gp == nil || gp == mp.g0 {
            return false
        }
    
        gp.preempt = true
    
        // Every call in a go routine checks for stack overflow by
        // comparing the current stack pointer to gp->stackguard0.
        // Setting gp->stackguard0 to StackPreempt folds
        // preemption into the normal stack overflow check.
        gp.stackguard0 = stackPreempt
    
        // Request an async preemption of this P.
        if preemptMSupported && debug.asyncpreemptoff == 0 {
            _p_.preempt = true
            preemptM(mp)
        }
    
        return true
    }
    

    抢占就是让一个占用事件很长的任务,切换成g0,暂停运行,放入全局可运行队列,等待下次有 m 来全局队列找工作时才能继续运行,让后g0重新开始调度。

    相关文章

      网友评论

          本文标题:sysmon 工作线程

          本文链接:https://www.haomeiwen.com/subject/rlxcjltx.html