美文网首页golang
GO实验(2)查看GMP调度

GO实验(2)查看GMP调度

作者: 温岭夹糕 | 来源:发表于2022-07-28 16:54 被阅读0次

前文

实验1.go程序执行流程

导读

源码分析

  • runqput
  • schedule
  • globrunqget
  • runqget
  • findrunable
  • execute
  • gogo
  • goexit
  • newm

环境

阿里云CentOS7
go1.18

实验

还是上个实验demo

package main

import "fmt"

func main(){
    var a int = 1
    fmt.Println(a)
}

runqput

image.png

runqput打断点

dlv exec main
b runqput
c
从之前的学习我们知道了runqput的作用是给G入队 调用关系链

源码

        if next {
//优先进入runnext队列(一个指针可以理解为容量为1的队列)
//如果没有老的g被踢出
        retryNext:
                oldnext := _p_.runnext
                if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
                        goto retryNext
                }
                if oldnext == 0 {
                        return
                }

//把老的G踢出到队列
                // Kick the old runnext out to the regular run queue.
                gp = oldnext.ptr()
        }


retry:
     //老的尝试进入本地队列,这里的atomic是runtime/atomic
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
        t := _p_.runqtail
        if t-h < uint32(len(_p_.runq)) {
                _p_.runq[t%uint32(len(_p_.runq))].set(gp)
                atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
                return
        }
//本地队列满了就尝试进入全局队列
        if runqputslow(_p_, gp, h, t) {
                return
        }
        // the queue is not full, now the put above must succeed
        goto retry
}

runqputslow,看注释就知道老g被踢入全局队列时还会携带一部分本地队列的g一起进入

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
        //取一半的g
        var batch [len(_p_.runq)/2 + 1]*g

        // First, grab a batch from local queue.
        n := t - h
        n = n / 2
    //检查,没满就抛异常
        if n != uint32(len(_p_.runq)/2) {
                throw("runqputslow: queue is not full")
        }
        for i := uint32(0); i < n; i++ {
                batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
        }
        if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
                return false
        }
     //构造一个新队列存放本地被移除的g
        batch[n] = gp

        if randomizeScheduler {
                for i := uint32(1); i <= n; i++ {
                        j := fastrandn(i + 1)
                        batch[i], batch[j] = batch[j], batch[i]
                }
        }
        // Link the goroutines.
        for i := uint32(0); i < n; i++ {
                batch[i].schedlink.set(batch[i+1])
        }
         //实际上是以链表的形式直接接在后面
        var q gQueue
        q.head.set(batch[0])
        q.tail.set(batch[n])

        // Now put the batch on global queue.
       //操作全局队列还要加锁,
        lock(&sched.lock)
        globrunqputbatch(&q, int32(n+1))
        unlock(&sched.lock)
        return true

runnext,本地队列和全局队列的长度!
在dlv工具下(代码先运行到那一行)

print _p_
image.png

runq是本地队列长度为256,runnext就一个地址指针值,全局是无限的有一个tail和标志

他们存放在runtime.p结构体中 runtime2.go

schedule

schedule的第一次调度是通过mstart

func schedule() {
        _g_ := getg()
       #一系列的验证代码...
        if gp == nil {
                // Check the global runnable queue once in a while to ensure fairness.
                // Otherwise two goroutines can completely occupy the local runqueue
                // by constantly respawning each other.
                if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
                        lock(&sched.lock)
                        gp = globrunqget(_g_.m.p.ptr(), 1)
                        unlock(&sched.lock)
                }
        }
        if gp == nil {
                gp, inheritTime = runqget(_g_.m.p.ptr())
                // We can see gp != nil here even if the M is spinning,
                // if checkTimers added a local goroutine via goready.
        }
        if gp == nil {
                gp, inheritTime = findrunnable() // blocks until work is available
        }
       .....
      execute()

这个函数在将gmp的时候也分析过,主要工作是:
1.每61次就要去全局队列找G (加锁执行globrunqget)
2.执行runqget从本地队列找
3.还找不到执行findrunnable
4.execute执行G中的任务

globrunqget

很短的函数

func globrunqget(_p_ *p, max int32) *g {
        //关于锁的操作,全局队列需要加锁访问
        assertLockHeld(&sched.lock)
        //全局队列没G直接返回,runqsize记录长度
        if sched.runqsize == 0 {
                return nil
        }
        //计算n的个数,n的数量的G之后要被放入local queue
        n := sched.runqsize/gomaxprocs + 1
        if n > sched.runqsize {
                n = sched.runqsize
        }
        if max > 0 && n > max {
                n = max
        }
        if n > int32(len(_p_.runq))/2 {
                n = int32(len(_p_.runq)) / 2
        }

        sched.runqsize -= n
         //弹出队头,剩余n-1个放入本地队列
        gp := sched.runq.pop()
        n--
        for ; n > 0; n-- {
                gp1 := sched.runq.pop()
                runqput(_p_, gp1, false)
        }
        return gp
}

globrunqget 会从全局 runq 队列中获取 n 个 G ,其中第一个 G 用于执行,n-1 个 G 从全局队列放入本地队列。

runqget

func runqget(_p_ *p) (gp *g, inheritTime bool) {
        // If there's a runnext, it's the next G to run.
     //先从runnext找G,找到直接返回
        next := _p_.runnext
        // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
        // because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
        // Hence, there's no need to retry this CAS if it falls.
        if next != 0 && _p_.runnext.cas(next, 0) {
                return next.ptr(), true
        }
    //runnext没找到就去本地找,找不到返回nil
        for {
                h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
                t := _p_.runqtail
                if t == h {
                        return nil, false
                }
                gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
                if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
                        return gp, false
                }
        }
}

本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地队列头指针遍历本地队列,本地队列是一个环形队列,方便复用。

findrunnable

好长300行,直接截取部分代码

    // 从本地 P 的可运行队列获取 G
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // 从全局的可运行队列获取 G
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    } 

    // 从I/O轮询器获取 G
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        // 尝试从netpoller获取Glist
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            //将其余队列放入 P 的可运行G队列
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }
//准备开始从其他本地队列偷G
    if !_g_.m.spinning {
        // 设置 spinning ,表示正在窃取 G
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
#之后休眠前还要再全部队列走一遍找G
#再找不到就休眠

注意这里的m.spining称为自旋,工作线程在从其它工作线程的本地运行队列中盗取 G 时的状态称为自旋状态。
findrunnable从本地和全局找G,找不到去网络轮询找G,将状态设为sping循环遍历所有p的G进行窃取,休眠前再检查一遍,还是没有就休眠M

execute

历经千辛,终于可以进行任务执行
func execute(gp *g, inheritTime bool) {
g := getg()

    // Assign gp.m before entering _Grunning so running Gs have an
    // M.
    //G和M绑定,将G状态切换为Gruning
    _g_.m.curg = gp
    gp.m = _g_.m
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
           //调度器次数+1
            _g_.m.p.ptr().schedtick++
    }

    // Check whether the profiler needs to be turned on or off.
    hz := sched.profilehz
    if _g_.m.profilehz != hz {
            setThreadCPUProfiler(hz)
    }

    if trace.enabled {
            // GoSysExit has to happen when we have a P, but before GoStart.
            // So we emit it here.
            if gp.syscallsp != 0 && gp.sysblocktraced {
                    traceGoSysExit(gp.sysexitticks)
            }
            traceGoStart()
    }
     //gogo完成g0到gp的真正切换
   //sched.pc存放着任务,初始化时就是runtime.main
    gogo(&gp.sched)

}

gogo和goexit

// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
....
TEXT gogo<>(SB), NOSPLIT, $0
        get_tls(CX)
        MOVQ    DX, g(CX)
        MOVQ    DX, R14         // set the g register
        MOVQ    gobuf_sp(BX), SP        // restore SP
        MOVQ    gobuf_ret(BX), AX
        MOVQ    gobuf_ctxt(BX), DX
        MOVQ    gobuf_bp(BX), BP
        MOVQ    $0, gobuf_sp(BX)        // clear to help garbage collector
        MOVQ    $0, gobuf_ret(BX)
        MOVQ    $0, gobuf_ctxt(BX)
        MOVQ    $0, gobuf_bp(BX)
        //重点来了,sched.pc存放这runtime.main
        // bx = sched.pc = &runtime.main
        MOVQ    gobuf_pc(BX), BX
        JMP     BX

JMP BX g0切换到执行任务的g的栈开始执行

非runtime.main返回时会直接返回goexit(runtime.main执行完直接exit(0)退出)

newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum

这行代码在newproc1中,把sched.pc与goexit关联

TEXT runtime·goexit(SB),NOSPLIT,$0-0
    CALL    runtime·goexit1(SB)

func goexit1() {
    // 调用goexit0函数 
    mcall(goexit0)
}

goexit通过mcall完成goexit0的调用

func goexit0(gp *g) {
    _g_ := getg()
    // 设置当前 G 状态为 _Gdead
    casgstatus(gp, _Grunning, _Gdead) 
    // 清理 G
    gp.m = nil
    ...
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    // 解绑 M 和 G
    dropg() 
    ...
    // 将 G 扔进 gfree 链表中等待复用
    gfput(_g_.m.p.ptr(), gp)
    // 再次进行调度
    schedule()
}

goexit0 会对 G 进行复位操作,解绑 M 和 G 的关联关系,将其 放入 gfree 链表中等待其他的 go 语句创建新的 g。在最后,goexit0 会重新调用 schedule触发新一轮的调度。

newm

这个在GMP中讲过


newm

最后是通过clone调用系统调用克隆出M

小结

至此通过查阅学习和实验阅读代码基本了解了GMP的调度流程

  • g0,m0的创建:
    rt0_amd64->rt0_go->[check、args、osinit、schedinit、newproc、mstart、about],rt0_go创建了g0,m0,这两者的关系之前通过指针互相关联过,g0负责调度g,newproc创建main gorutine
  • g如何进入P--即G如何入队
    runqput先进入runnext,老G丢入local queue,再丢入glob queue,runqputslow是丢入全局队列时加锁并携带一半本地队列
  • M如何调度G--P如何传G给M
    schedule指定要每61次检查全局队列(globrunqget指出再获取全局G的时候还要携带n个进入本地队列),runqget是先从runnext找再从本地和全局找,(findrunable)找不到就去网络找和窃取(spining),execute是执行G中的任务,具体调用为execute->gogo->goexit->goexit0->schedule,goexit(非main gorutine函数)会对G进行复位,即回到g0,再触发新一轮的调度
  • m0是runtime创建的第一个系统线程,一个GO进程只有一个m0其他M都是克隆产生的,是主线程
  • g0是什么?
    用户执行任务的叫G,执行runtime.main的叫main gorutine,g0负责执行调度任务。main gorutine不是第一个G,g0 栈用于执行调度器的代码,执行完之后,要跳转到执行用户代码的地方,如何跳转?这中间涉及到栈和寄存器的切换。要知道,函数调用和返回主要靠的也是 CPU 寄存器的切换。goroutine 的切换和此类似。(具体看GMP那章或g0的切换

参考

1.GO语言调度机制
2.调度器
3.go夜读schedule分析

相关文章

  • GO实验(2)查看GMP调度

    前文 实验1.go程序执行流程[https://www.jianshu.com/p/baa775b400b3] 导...

  • go 调度器实现

    GO 语言的调度器 目录 GMP 模型简介 调度器实现机制 GMP 模型简介 先来一张经典的GMP 关系图 G 是...

  • [视频版]-Golang深入理解GMP

    HELLO GOPHER! 相信越来越多的Go浪小伙伴,都对Golang的GMP调度器流连忘返,GMP很多书籍都有...

  • golang协程调度面试总结

    1.go的GMP模型 goroutine运行在用户态,是由runtime来控制调度的,调度过程中主要涉及到三个对象...

  • go并发的那些事

    思考:go为什么那么擅长并发? 答:从设计理解上来讲我觉着golang的CSP并发模型与GMP调度器是基石。你看虽...

  • go 的并发调度(一) GMP 模型

    协程和线程的历史关系? 抢占式和协同式抢占式就是线程无法决定自己执行多久,由操作系统(或其他分配系统)来分配一个线...

  • 调度器——GMP 调度模型

    调度器——GMP 调度模型 Goroutine 调度器,它是负责在工作线程上分发准备运行的 goroutines。...

  • golang内存管理之内存分配

    一、知识准备 GMP运行时调度模型 go原生支持并发,不需要像Java那样需要显示地开启一个线程,也不像Pytho...

  • Go GMP

    一文彻底弄懂go中的调度GMP先说,协程的本质是用户态的线程,用户对其有控制权限,内存占用少,切换代价低。 再来解...

  • Go语言中调度器之GMP模型

    前言 随着服务器硬件迭代升级,配置也越来越高。为充分利用服务器资源,并发编程也变的越来越重要。在开始之前,需要了解...

网友评论

    本文标题:GO实验(2)查看GMP调度

    本文链接:https://www.haomeiwen.com/subject/bwjqwrtx.html