以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect
1. Goroutine调度器
1. goroutine简介
首先为什么Go语言为什么要引入协程Goroutine呢。
这主要是因为操作系统的线程太重了,具体表现在
- 创建和切换太重:所有的线程切换都需要系统调用进入内核,而进入内核的性能代价比较大
- 内存使用太重:一方面,为了尽量避免极端情况下操作系统线程栈的溢出,内核在创建操作系统线程时默认会为其分配一个较大的栈内存(虚拟地址空间,内核并不会一开始就分配这么多的物理内存),然而在绝大多数情况下,系统线程远远用不了这么多内存,这导致了浪费;另一方面,栈内存空间一旦创建和初始化完成之后其大小就不能再有变化,这决定了在某些特殊场景下系统线程栈还是有溢出的风险。
可以使用
ulimit -a
命令查看本机的栈大小。如下图,栈的代销为8192kB=8MB。image-20200512220534183
所有go引进了Goroutine,相比较旧特别轻量
- Goroutine是用户态线程,创建和切换都可以再用户态完成,无需进入内核,开销比较小。
- Goroutine默认大小是2k,相比操作系统线程的8M,简直是太小了。不过虽然协程栈比较小,但是支持自动扩容和收缩,所以也不用担心内存不够用或者浪费。
Goroutine的调度简介
所谓对协程的调度,其实就是Go 按照一定的算法逻辑挑选出合适的 Goroutine并放到CPU上运行的过程。
在go中 有专门负责调度的代码,我们成为goroutine调度器。
大概逻辑如下:
for i:=0;i<N;i++{ // 创建N个操作系统执行schedule函数
create_os_thread(schedule)
}
func schedule(){
for{
g := find_a_runnable_g_from_M_gs()
run_g(g)
save_status_of_g(g)
}
}
总结:也就算程序开始创建N个线程去执行schedule
函数。
每个schedule
函数 回去寻找一个可以执行的g,然后执行g,在保存g的状态,循环往复。
调度器相关的数据结构源码概述
无他,所谓调度,本质上都是对寄存器的一些操作。
只不过线程调度是在内核态,由操作系统完成。
而协程调度是在用户态,由go自己完成。
如上,go自己要完成对协程的调度。那么go就需要引入一个数据结构来保存协程中的一些值(寄存器等)。
这就是g
。
1. 当协程被调离的时候,它的相关信息都保存在g中
2. 当协程被恢复的时候,调度器又可以通过g将其状态还原。
现在有了g
,那么对g
进行调度就需要调度器。不过调度器本身也是需要保存自身状态的,所以这就有了schedt
结构体。因为一个go程序只有一个调度器,所以schedt全局唯一。
有了调度器,有了协程,我们还需要一个队列来保存可以运行的g,这就是局部运行队列。考虑到并发,go为每个p
都配了一个局部运行队列,他保存着一系列等待执行的g
.那么什么是p
呢,它是processor的简写,是个掮客,协调着工作线程和g之间的关系。
除了这些,还有一个结构,它需要跟工作线程打交道,同时接收g
,这就是m
。每个工作线程都有唯一的一个m
结构体与之对应,m结构体记录着线程的的栈信息等。
除此,还有一个全局运行队列,它是一个保存在schedt
中的运行队列。具体后面再说。
下面是上面几种结构的对应关系:
<img src="http://picgo.vipkk.work/20200513211350.png" alt="image-20200513211350567" style="zoom:40%;" />
一个细节
前面我们提到m会跟工作线程绑定,但是我们有多个m,那么是如何做到区分的呢。记得上面的伪代码吗,我们调度的程序是一致的,那怎么区分呢。这就用到了前面提到的本地存储,threadlocal。
具体结构体代码
TODO 因为太多不在这里展示
2. 调度器初始化
本章将以一个hello world程序为例,通过跟踪其从启动到退出的这一完整的运行流程来分析Go语言调度器的初始化、goroutine的创建与退出、工作线程的调度循环以及goroutine的切换等内容。
package main import "fmt" func main(){ fmt.Println("Hello World!") }
这节我们主要来分析调度器的初始化。
任何一个编译型语言(不管是C、C++、go)所编写的程序在操作系统上运行都有以下几个阶段
- 从磁盘上把可执行程序读入内存
- 创建进程和主线程
- 为主线程分配栈空间
- 把用户在命令行输入的参数拷贝到主线程的栈上
- 把主线程放在操作系统的运行队列等待被调度执行起来。
在主线程第一次被调度起来之前,栈如下图所示:
<img src="http://picgo.vipkk.work/20200513221220.png" alt="image-20200513221220322" style="zoom:50%;" />
接下来我们从代码出发讲解下 调度器中的各个数据结构是怎么关联起来的
程序入口
首先我们需要找到梦(代码)开始的地方。
首先我们编译程序,然后通过gdb调试go build -o hello;gdb hello
,然后输入info files
查看调试信息

可以发现入口在0x454b70
,然后 在这里打断点b *0x454b70
,可以发现程序入口的地方。对应代码如下,暂时不接受。
以下代码太多,建议结合源码食用
// 梦开始的地方 runtime/rt0_linux_amd64.s:8
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
// runtime/asm_amd64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto 16字节对齐
ANDQ $~15, SP
MOVQ AX, 16(SP)
MOVQ BX, 24(SP)
g0初始化
在rt0_go
中将要完成了我们程序启动所有的的初始化工作。
上面代码基本上就是将SP对齐,接着往下
// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
// g0 -> DI
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX // BX = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard0(DI) // g0.stackguard0 = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard1(DI) // g0.stackguard1 = SP - 64 * 1024 + 104
MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.lo = SP - 64 * 1024 + 104
MOVQ SP, (g_stack+stack_hi)(DI) // g0.stack.l1 = SP
这里完成了对g0的初始化, 可见g0的栈大小为 64K。
此时g0与程序栈之间的关系如图
<img src="http://picgo.vipkk.work/20200515221806.png" alt="image-20200515221806358" style="zoom:40%;" />
注意,g0是全局变量,所以这时候g0的内存已经申请好了,在全局变量区(data+bss).
上面图的关系 我们可以通过gdb 打印下地址查看下

从图中可以看到 hi 指向了sp,其他亦然。
m0初始化&绑定OS线程
g0的栈好之后,我们来看下m0的初始化。
跳过CPU型号检查和cgo初始化的代码,往下看
// 初始化m的tls。DI= &m0.tls, 取 m0的tls成员地址到DI寄存器
LEAQ runtime·m0+m_tls(SB), DI
// 调用 settls设置 线程本地存储,settls 函数的参数在DI寄存器中。 之后,可以通过fs断寄存器找到 m.tls。
CALL runtime·settls(SB)
// store through it, to make sure it works
get_tls(BX) // 获取fs段基址并放入BX寄存器, 其实就是 m0.tls[1] 的地址,get_tls的代码由编译器生成
MOVQ $0x123, g(BX) // set m0.tls[0] = 0x123, 也就算 fs地址-8的内存位置。 g 代码-8
MOVQ runtime·m0+m_tls(SB), AX // AX = m0.tls[0]
CMPQ AX, $0x123 // 比较
JEQ 2(PC)
CALL runtime·abort(SB) // 如果线程本地存储不能正常工作,退出程序
这段代码调用 settls 来实例化 m0.tls, tls就是对应之前提到thread local storage,目的是为了把m0这个数据结构和线程关联在一起,从而我们可以把m0当做线程的抽象。
其中settls函数就是把fs寄存器的地址设置为了tls[1]的地址。
继续:
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX // CX = g0
MOVQ CX, g(BX) // m0.tls[0]= g0
LEAQ runtime·m0(SB), AX // AX = &m0
// 以下两行代码是把 m0和g0串了起来
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
以上,在m0于主线程绑定之后,我们又通过指针把m0和g0关联在了一起。此时内存布局如下

可以看到,内存指针一致。
另外m0.tls[0] 存储的也是g0地址,tls[1]的地址其实是fs基地址(这个可以参考之前的tls章节)。
此时内存如下图:
<img src="http://picgo.vipkk.work/20200516172239.png" alt="image-20200516172239213" style="zoom:50%;" />
继续
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB) // 调度器初始化
m0初始化完成之后,开始对调度器初始化
调度器初始化
在这里就是go 语言了
// The bootstrap sequence is:
//
// call osinit
// call schedinit
// make & queue new G
// call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
// raceinit must be the first call to race detector.
// In particular, it must be done before mallocinit below calls racemapshadow.
// getg 是个什么鬼,getg 由编译器实现,大概原理就是 通过fs 找到 tls[1], 再-8找到g0,也就算当前运行的g
_g_ := getg() // _g_=&g0
// ......
//设置最多启动10000个操作系统线程,也是最多10000个M
sched.maxmcount = 10000
// ......
mcommoninit(_g_.m) //初始化m0,因为从前面的代码我们知道g0->m = &m0
// ......
sched.lastpoll = uint64(nanotime())
procs := ncpu //系统中有多少核,就创建和初始化多少个p结构体对象
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n //如果环境变量指定了GOMAXPROCS,则创建指定数量的p
}
if procresize(procs) != nil {//创建和初始化全局变量allp
throw("unknown runnable goroutine during bootstrap")
}
// ......
}
以上我们对schedinit
函数提取了下大纲。这次我们主要关注以下几点
-
getg()
getg()是汇编实现的,大概就是从tls取出当前运行的g,在这里就是g0。
-
设置最多线程个数为10000个,(对应内存也就是80000M ~= 80G)
-
mcommoninit
- 对m0
- 对p
mcommoninit
- 对m0做另外的一些初始化
- 创建和实例化p
func mcommoninit(mp *m) {
_g_ := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if _g_ != _g_.m.g0 {
callers(1, mp.createstack[:])
}
// 因为sched是一个全局变量,多个线程同时操作 sched会有并发问题,因此要先加锁,操作结束后再解锁。
lock(&sched.lock)
if sched.mnext+1 < sched.mnext {
throw("runtime: thread ID overflow")
}
mp.id = sched.mnext // 给id赋值
sched.mnext++ // m0的 id就是0,并且之后创建的m的id是递增的。
checkmcount() // 检查是否超过数量限制
// random 初始化
mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
if mp.fastrand[0]|mp.fastrand[1] == 0 {
mp.fastrand[1] = 1
}
// 创建用于信号处理的goroutine ,gsignal,只是简单的从堆上分配一个g结构体对象,然后把栈设置好久返回了
mpreinit(mp)
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
}
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
// 把m 挂入全局链表 allm之中
// 类比 m0.next = allm allm = m0
mp.alllink = allm
// NumCgoCall() iterates over allm w/o schedlock,
// so we need to publish it safely.
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
// Allocate memory to hold a cgo traceback if the cgo call crashes.
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
mp.cgoCallers = new(cgoCallers)
}
}
这里主要对m0做了以下几个事情
- 赋值id
- 初始化m的随机数
- 信号初始化(后续分析抢占调度的时候用得到)
- 把m0自己放到allm全局变量里。
此时 allm 指向m0

procresize
// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) { // 初始化的时候 len(allp) = 0,因为目前还没有P
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)]) // 代码中途调整p的数量?
allp = nallp
}
unlock(&allpLock)
}
// initialize new P's
// 循环创建 nproc个p,并完成基本初始化
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p) // 调用内存分配器从堆上分配一个 struct p
}
pp.init(i) // 设置p的id,mcache等。
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// continue to use the current P
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else { // 初始化的时候执行这个分支
// release the current P and acquire allp[0].
//
// We must do this before destroying our current P
// because p.destroy itself has write barriers, so we
// need to do that from a valid P.
if _g_.m.p != 0 {
if trace.enabled {
// Pretend that we were descheduled
// and then scheduled again to keep
// the trace sane.
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
// 把 p和m0关联起来
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// release resources from unused P's
// 调整P的个数
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
// 把所有空闲的P放入空闲链表
// 它将除了p0的所有非空闲的P,放入P链表runnablePs,并返回procresize函数的调用者,
// 并由其来调度这些P
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p { // allp[0] 跟m0 关联了,所有不放入
continue
}
p.status = _Pidle
// 如果P 是空闲的,则放入 idle里
if runqempty(p) { // 通过 runqempty 判断p 的LRQ中是否包含G,
pidleput(p) // 初始化除了allp[0]其他P 全部执行这个分支,分入空闲链表
} else {
// ??
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
// 初始化一个随机的分配器
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
这个代码干了以下几个事情
- 初始化全局变量allp
- 填充allp
- m0和allp[0]绑定
- 除allp[0]之外的所有p放到 等待队列中
allp是全局变量
并且会把m0和allp[0]关联起来。


这里没太搞明白内存的对应关系
至此,m0,g0和p就关联在一起了。
如图:
<img src="http://picgo.vipkk.work/20200516181939.png" alt="image-20200516181939265" style="zoom:50%;" />
网友评论