写在开头
非原创,知识搬运工/整合工,仅用于自己学习,实验环境为阿里云Centos7,go1.20
GMP往期回顾
知识总览
image.pngimage.png
带着问题去阅读
- g0和m0是什么?创建和初始化时机?
- p和m创建和初始化的时机?
- 为什么在堆上创建G
- 调度器初始化截断g0发生了栈切换吗
- 从代码角度解释为什么新创建的G有更多的机会被调度
- 为什么协程有内存泄露的风险
1.Schedt结构体
在调度器那一章节我们提到了GMP的三天王还有第四个就是schedt负责总览全局
schedt结构体
type schedt struct {
lock mutex
// 由空闲的工作线程组成的链表
midle muintptr // idle m's waiting for work
// 空闲的工作线程数量
nmidle int32 // number of idle m's waiting for work
// 空闲的且被 lock 的 m 计数
nmidlelocked int32 // number of locked m's waiting for work
// 已经创建的工作线程数量
mcount int32 // number of m's that have been created
// 表示最多所能创建的工作线程数量
maxmcount int32 // maximum number of m's allowed (or die)
// goroutine 的数量,自动更新
ngsys uint32 // number of system goroutines; updated atomically
// 由空闲的 p 结构体对象组成的链表
pidle puintptr // idle p's
// 空闲的 p 结构体对象的数量
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// 由空闲的 p 结构体对象组成的链表
pidle puintptr // idle p's
// 空闲的 p 结构体对象的数量
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue.
// 全局可运行的 G队列
runqhead guintptr // 队列头
runqtail guintptr // 队列尾
runqsize int32 // 元素数量
// Global cache of dead G's.
// dead G 的全局缓存
// 已退出的 goroutine 对象,缓存下来
// 避免每次创建 goroutine 时都重新分配内存
gflock mutex
gfreeStack *g
gfreeNoStack *g
// 空闲 g 的数量
ngfree int32
// Central cache of sudog structs.
// sudog 结构的集中缓存
sudoglock mutex
sudogcache *sudog
包含了线程、协程和P的数量于信息,还有一个全局队列runq,将已经用完的G(本质是结构体)缓存下来避免每次创建G重新分配内存
在程序的运行过程中,schedt对象只存在一份实体
2.实验观察初始化
demo
package main
import "fmt"
func main() {
fmt.Println("Hello World")
}
编译,-gcflags是为了关闭编译器优化和函数内联,防止后面设置断点找不到相关代码
go mod init my
go build -gcflags "-N -l" -o main main.go
查看文件入口
readelf -h main
image.png
entry point address为程序入口地址
反编译查看汇编代码
objdump -S ./main > out.txt
vim out.txt
/45ed60
image.png
这个是rt0_amd64_linux.s文件,该地址存放的指令是一条jmp指令
jump 45b340 rt0_amd64
跳转到0x4b340地址执行代码(存放于asm_amd64.s中)
image.png
jump runtime.rt0_go
跳转到runtime的rt0_go执行代码,这个函数位于asm_amd64.s,函数很长,完成go启动时所有的初始化工作,这段代码完成之后,整个GO程序就可以跑起来了,是非常核心代码
2.1 runtime.rt0_go
汇编看着头疼,挑部分代码分析
1.g0在该函数完成初始化
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)
这段汇编给g0分配栈空间(g是一个特殊的函数,它的栈就是特殊的函数栈,函数栈特性是高地址向低地址开辟空间),把g0地址存入di寄存器
BX = SP-64*1024+104 (实际g0栈空间大小为64K-104B,这里是将SP下移这么多空间来划分给g0)
2.初始化m的tls字段,这里的m是第一个m,后续m照着它产生
LEAQ runtime·m0+m_tls(SB), DI
CALL runtime·settls(SB)
3.将g0的地址保存到线程m0的tls字段(线程私有数据,这是个数组,单字段指向数组第一个元素),即m0.tls[0] = &g0,
m0的信息也被保存在g0
m0.g0 = &g0
g0.m = &m0
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
MOVQ CX, m_g0(AX)
MOVQ AX, g_m(CX)
image.png
这里m0不是栈就是单纯的内存,我画快了忽略掉
4.系统核心初始化和调度器初始化,基本上schedinit执行完后,调度器相关参数就初始化好了
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
5.创建一个新的协程来执行用户的代码
MOVQ $runtime·mainPC(SB), AX
CALL runtime·newproc(SB)
6.主线程进入循环调度,运行刚创建的新协程
CALL runtime·mstart(SB)
TLS(THread Local Storage)线程本地私有全局变量
小结下:m0和g0的关系通过m0.tls串联起来,我们可以通过m0.tls或m0.g0找到g0,也能通过g0.m找到m0,osinit初始化的是系统的核心数于ncpu中,schedinit才是调度器启动的核心
2.2 schedinit
源码位置
前面省略一堆锁的初始化,涉及到全局变量,肯定要加锁
getg由编译器实现,获取函数当前运行的G结构体对象的地址,并告知系统线程的最多数量为1万个
gp:=getg()
sched.maxmcount = 10000
初始化栈空间
stackinit()
mallocinit()
继续初始化m0和gc等,在程序启动时,m0的初始化是在mcommoninit中完成的
mcommoninit(gp.m, -1)
gcinit()
mcommoninit源码地址,因为sched是全局唯一对象,多线程操作需要加锁,刚开始m0.id=0后续递增,在m的初始化过程中m会被挂到allm中(对象池,复用线程栈,避免频繁创建线程,allm是一个环状的链表),mp.alllink = allm即链表最后一个位置
func mcommoninit(mp *m, id int64) {
lock(&sched.lock)
if id >= 0 {
mp.id = id
} else {
mp.id = mReserveID()
}
mp.alllink = allm
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
}
image.png
image.png
回到schedule初始化p个数,且初始化所有p
procs := ncpu
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
procresize是调度器启动的最后一步,之后调度器就启动相应数量的m,等待运行G
2.2.1 procresize
源码地址
procresize不仅在初始化的时候会执行。中途改变procs值也会执行
初始化所有P,allp是保存所有p的全局变量,最大数量为1024,该段代码为每个p申请(new)新对象,将p存放在allp中
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
利用acquirep将p和m关联
gp.m.p = 0
pp := allp[0]
pp.m = 0
pp.status = _Pidle
acquirep(pp)
把所有空闲的p放入空闲链表,runqempty用于判断p是否空闲
for i := nprocs - 1; i >= 0; i-- {
if runqempty(pp) {
pidleput(pp, now)
}
}
返回有本地任务的p链表
return runnablePs
小结一下,该函数从堆上创建了nproc个p,新的p被放入allp中
2.2.2 acquirep
acquirep将p0和m0关联起来,就是一些字段的相互设置
func acquirep(pp *p) {
wirep(pp)
pp.mcache.prepareForSweep()
}
func wirep(pp *p) {
gp.m.p.set(pp)
pp.m.set(gp.m)
pp.status = _Prunning
}
g0.m.p=p0
p0.m = m0
image.png
小结
小结一下初始化涉及到一下全局变量
- allglen 所有g的长度
- allgs []*g 保存所有g
- allm *m 保存所有m
- allp []*p 保存所有p
- ncpu 核数
- sched schedt类型,唯一全局对象,记录调度器工作状态和一些数据
- m0 主线程
- g0 负责调度的g
调度器初始化还干了什么?
- 创建g0和m0,绑定关系m0.tls=&g0,m0.g0=&g,g0.m=&m
- 根据核心数创建P,绑定P0和m0 ,g0.m.p=p0,p0.m = m0
schedinit的procresize执行完意味着调度器初始化完成
m0是启动程序后的id=0的主线程,m0负责执行初始化操作和启动第一个g,之后就和其他m一样了
g0是每启动一个m就会创建的第一个g(后续线程都是拷贝m0,因此也会有g0)
p的创建是在schedinit的procresize完成的,也就是初始化阶段,与CPU核数挂钩(进程可获取到的)
2.3 newproc
回到2.1中的汇编代码
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
AX = &funcval{runtime·main},将用户代码main函数作为newproc的参数压入栈(回忆一下函数栈的压栈过程),注意1.20newproc只有一个参数
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}
type funcval struct {
fn uintptr
// variable-size, fn-specific data here
}
systemstack(是汇编实现的)是在系统栈上运行函数fn,fn之后调用newproc1,newproc1会获取或创建新的G协程结构体
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp) //寻找空闲的G不存在则创建
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
注意这个 malg函数,malg调用stackalloc,这个是在堆上申请栈空间,因为如果最开始的2k空间不够或太多就会存在扩容和缩容的问题,系统线程栈上会产生栈空间碎片。那么在堆上申请最后就需要销毁(还给堆),要进行垃圾回收(释放)
g0栈不需要扩容和缩容,空间更大,固定不变,因此在系统线程栈上申请
回到newproc1代码,确定参数的入栈位置,参数的入参位置是sp处开始
sp := newg.stack.hi - totalSize
spArg := sp
将参数从newproc的函数栈拷贝到新g的栈
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
在新的g上执行指令就要用新g的栈,执行函数需要参数因此拷贝,
因为是初始化,newproc在g0栈上执行,这里我们将参数fn从g0栈拷贝到新g栈上
初始化newg的各种字段
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
sched是g的一个字段,也是一个结构体这个之前学习过是gobuf类型,是G运行时的现场数据,切换G的时候依赖这个数据
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}
newg.sched.pc表示当前新g被调度起来运行时,从这个地址开始执行指令,pc字段为goexit地址+1,即goexit函数的第二条指令,goexit函数是协程退出后的一些清理工作
newg.sched.g为新g的地址
gostartcallfn(&newg.sched, fn)
gostartcallfn解释了为什么要设置pc为goexit地址的第二行,它会将sp减小一个指针,这是给返回地址预留空间(学习函数栈时我们知道会把调用函数的调用方地址也给压入栈到栈顶),也就是说将goexit压入栈顶后,伪造了sp位置(类似函数栈的return address,让cpu看起来是从goexit函数中执行了协程任务,协程执行后返回goexit函数),当协程被执行时,从pc开始执行,之后协程执行完毕执行goexit
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
buf.sp = sp
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
image.png
回到newproc1
修改newg状态为runable
newg.gopc = callerpc
回到newproc代码,设置好了newg,该让他调用了吧
pp := getg().m.p.ptr()
runqput(pp, newg, true)
初始化时是从g0获取p
2.4runqput
源码
先尝试将新g放入到可执行队列中
next=false 放p私有队列LRQ尾部
next=true 放p的runnext字段中,故名思意,下一个运行,也可以理解为LRQ首部
LRQ满了塞不下怎么办?放到全局队列(优先级最低)
if next {
retryNext:
oldnext := pp.runnext
//满员的情况
if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
t := pp.runqtail
//放到队列尾部
if t-h < uint32(len(pp.runq)) {
pp.runq[t%uint32(len(pp.runq))].set(gp)
atomic.StoreRel(&pp.runqtail, t+1) // store-release, makes the item available for consumption
return
}
//全局队列slowqueue
if runqputslow(pp, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
这里我们可以总结出新G的放入P的顺序,先队首p.runnnext,再队尾p.LRQ,再全局slowqueue,这里放如全局队列也有门路,它不是直接放一个,而是会带过去一堆g(p满了,给p腾空间)
func runqputslow(pp *p, gp *g, h, t uint32) bool {
var batch [len(pp.runq)/2 + 1]*g
n := t - h
n = n / 2
for i := uint32(0); i < n; i++ {
batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
}
if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
先将p的所有g都加入一个数组中,数组长度为LRQ的一半加上newg,然后以链表的形式将这些g(一部分g)添加到GRQ上
image.png
实际上p的LRQ长度为256,是一个数组组成的环形链表
到这里g0切换到了newg的栈上来执行fn函数
小结
newproc1是获取g的函数,可以从缓存allgs缓存中获取G,也可以用malg函数在堆栈上创建G,之后根据传参规则塞入队列,一般新创建的g更有机会被调度,因此放到p队首
网友评论