前文
导读
源码分析
- runqput
- schedule
- globrunqget
- runqget
- findrunable
- execute
- gogo
- goexit
- newm
环境
阿里云CentOS7
go1.18
实验
还是上个实验demo
package main
import "fmt"
func main(){
var a int = 1
fmt.Println(a)
}
runqput
image.pngrunqput打断点
dlv exec main
b runqput
c
从之前的学习我们知道了runqput的作用是给G入队
调用关系链
源码
if next {
//优先进入runnext队列(一个指针可以理解为容量为1的队列)
//如果没有老的g被踢出
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
//把老的G踢出到队列
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
//老的尝试进入本地队列,这里的atomic是runtime/atomic
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
//本地队列满了就尝试进入全局队列
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
runqputslow,看注释就知道老g被踢入全局队列时还会携带一部分本地队列的g一起进入
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
//取一半的g
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
//检查,没满就抛异常
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
//构造一个新队列存放本地被移除的g
batch[n] = gp
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// Link the goroutines.
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
//实际上是以链表的形式直接接在后面
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
// Now put the batch on global queue.
//操作全局队列还要加锁,
lock(&sched.lock)
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
runnext,本地队列和全局队列的长度!
在dlv工具下(代码先运行到那一行)
print _p_
image.png
runq是本地队列长度为256,runnext就一个地址指针值,全局是无限的有一个tail和标志
schedule
schedule的第一次调度是通过mstart
func schedule() {
_g_ := getg()
#一系列的验证代码...
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
.....
execute()
这个函数在将gmp的时候也分析过,主要工作是:
1.每61次就要去全局队列找G (加锁执行globrunqget)
2.执行runqget从本地队列找
3.还找不到执行findrunnable
4.execute执行G中的任务
globrunqget
很短的函数
func globrunqget(_p_ *p, max int32) *g {
//关于锁的操作,全局队列需要加锁访问
assertLockHeld(&sched.lock)
//全局队列没G直接返回,runqsize记录长度
if sched.runqsize == 0 {
return nil
}
//计算n的个数,n的数量的G之后要被放入local queue
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
//弹出队头,剩余n-1个放入本地队列
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
globrunqget 会从全局 runq 队列中获取 n 个 G ,其中第一个 G 用于执行,n-1 个 G 从全局队列放入本地队列。
runqget
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
//先从runnext找G,找到直接返回
next := _p_.runnext
// If the runnext is non-0 and the CAS fails, it could only have been stolen by another P,
// because other Ps can race to set runnext to 0, but only the current P can set it to non-0.
// Hence, there's no need to retry this CAS if it falls.
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
//runnext没找到就去本地找,找不到返回nil
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地队列头指针遍历本地队列,本地队列是一个环形队列,方便复用。
findrunnable
好长300行,直接截取部分代码
// 从本地 P 的可运行队列获取 G
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 从全局的可运行队列获取 G
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从I/O轮询器获取 G
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 尝试从netpoller获取Glist
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
//将其余队列放入 P 的可运行G队列
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
//准备开始从其他本地队列偷G
if !_g_.m.spinning {
// 设置 spinning ,表示正在窃取 G
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
#之后休眠前还要再全部队列走一遍找G
#再找不到就休眠
注意这里的m.spining称为自旋,工作线程在从其它工作线程的本地运行队列中盗取 G 时的状态称为自旋状态。
findrunnable从本地和全局找G,找不到去网络轮询找G,将状态设为sping循环遍历所有p的G进行窃取,休眠前再检查一遍,还是没有就休眠M
execute
历经千辛,终于可以进行任务执行
func execute(gp *g, inheritTime bool) {
g := getg()
// Assign gp.m before entering _Grunning so running Gs have an
// M.
//G和M绑定,将G状态切换为Gruning
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
//调度器次数+1
_g_.m.p.ptr().schedtick++
}
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
//gogo完成g0到gp的真正切换
//sched.pc存放着任务,初始化时就是runtime.main
gogo(&gp.sched)
}
gogo和goexit
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
....
TEXT gogo<>(SB), NOSPLIT, $0
get_tls(CX)
MOVQ DX, g(CX)
MOVQ DX, R14 // set the g register
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
//重点来了,sched.pc存放这runtime.main
// bx = sched.pc = &runtime.main
MOVQ gobuf_pc(BX), BX
JMP BX
JMP BX g0切换到执行任务的g的栈开始执行
非runtime.main返回时会直接返回goexit(runtime.main执行完直接exit(0)退出)
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
这行代码在newproc1中,把sched.pc与goexit关联
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)
func goexit1() {
// 调用goexit0函数
mcall(goexit0)
}
goexit通过mcall完成goexit0的调用
func goexit0(gp *g) {
_g_ := getg()
// 设置当前 G 状态为 _Gdead
casgstatus(gp, _Grunning, _Gdead)
// 清理 G
gp.m = nil
...
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
// 解绑 M 和 G
dropg()
...
// 将 G 扔进 gfree 链表中等待复用
gfput(_g_.m.p.ptr(), gp)
// 再次进行调度
schedule()
}
goexit0 会对 G 进行复位操作,解绑 M 和 G 的关联关系,将其 放入 gfree 链表中等待其他的 go 语句创建新的 g。在最后,goexit0 会重新调用 schedule触发新一轮的调度。
newm
这个在GMP中讲过
newm
最后是通过clone调用系统调用克隆出M
小结
至此通过查阅学习和实验阅读代码基本了解了GMP的调度流程
- g0,m0的创建:
rt0_amd64->rt0_go->[check、args、osinit、schedinit、newproc、mstart、about],rt0_go创建了g0,m0,这两者的关系之前通过指针互相关联过,g0负责调度g,newproc创建main gorutine - g如何进入P--即G如何入队
runqput先进入runnext,老G丢入local queue,再丢入glob queue,runqputslow是丢入全局队列时加锁并携带一半本地队列 - M如何调度G--P如何传G给M
schedule指定要每61次检查全局队列(globrunqget指出再获取全局G的时候还要携带n个进入本地队列),runqget是先从runnext找再从本地和全局找,(findrunable)找不到就去网络找和窃取(spining),execute是执行G中的任务,具体调用为execute->gogo->goexit->goexit0->schedule,goexit(非main gorutine函数)会对G进行复位,即回到g0,再触发新一轮的调度 - m0是runtime创建的第一个系统线程,一个GO进程只有一个m0其他M都是克隆产生的,是主线程
- g0是什么?
用户执行任务的叫G,执行runtime.main的叫main gorutine,g0负责执行调度任务。main gorutine不是第一个G,g0 栈用于执行调度器的代码,执行完之后,要跳转到执行用户代码的地方,如何跳转?这中间涉及到栈和寄存器的切换。要知道,函数调用和返回主要靠的也是 CPU 寄存器的切换。goroutine 的切换和此类似。(具体看GMP那章或g0的切换)
参考
1.GO语言调度机制
2.调度器
3.go夜读schedule分析
网友评论