美文网首页
GO基础学习(10)GMP模型-调度器初始化

GO基础学习(10)GMP模型-调度器初始化

作者: 温岭夹糕 | 来源:发表于2023-04-29 19:57 被阅读0次

写在开头

非原创,知识搬运工/整合工,仅用于自己学习,实验环境为阿里云Centos7,go1.20

GMP往期回顾

  1. 程序到进程
  2. 进程到线程再到协程
  3. GO调度器
  4. GMP介绍

知识总览

image.png
image.png

带着问题去阅读

  1. g0和m0是什么?创建和初始化时机?
  2. p和m创建和初始化的时机?
  3. 为什么在堆上创建G
  4. 调度器初始化截断g0发生了栈切换吗
  5. 从代码角度解释为什么新创建的G有更多的机会被调度
  6. 为什么协程有内存泄露的风险

1.Schedt结构体

在调度器那一章节我们提到了GMP的三天王还有第四个就是schedt负责总览全局
schedt结构体

type schedt struct {
    lock mutex
// 由空闲的工作线程组成的链表
    midle        muintptr // idle m's waiting for work
    // 空闲的工作线程数量
    nmidle       int32    // number of idle m's waiting for work
    // 空闲的且被 lock 的 m 计数
    nmidlelocked int32    // number of locked m's waiting for work
    // 已经创建的工作线程数量
    mcount       int32    // number of m's that have been created
    // 表示最多所能创建的工作线程数量
    maxmcount    int32    // maximum number of m's allowed (or die)

    // goroutine 的数量,自动更新
    ngsys uint32 // number of system goroutines; updated atomically

    // 由空闲的 p 结构体对象组成的链表
    pidle      puintptr // idle p's
    // 空闲的 p 结构体对象的数量
    npidle     uint32
    nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
    // 由空闲的 p 结构体对象组成的链表
    pidle      puintptr // idle p's
    // 空闲的 p 结构体对象的数量
    npidle     uint32
    nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

    // Global runnable queue.
    // 全局可运行的 G队列
    runqhead guintptr // 队列头
    runqtail guintptr // 队列尾
    runqsize int32 // 元素数量

    // Global cache of dead G's.
    // dead G 的全局缓存
    // 已退出的 goroutine 对象,缓存下来
    // 避免每次创建 goroutine 时都重新分配内存
    gflock       mutex
    gfreeStack   *g
    gfreeNoStack *g
    // 空闲 g 的数量
    ngfree       int32

    // Central cache of sudog structs.
    // sudog 结构的集中缓存
    sudoglock  mutex
    sudogcache *sudog

包含了线程、协程和P的数量于信息,还有一个全局队列runq,将已经用完的G(本质是结构体)缓存下来避免每次创建G重新分配内存
在程序的运行过程中,schedt对象只存在一份实体

2.实验观察初始化

demo

package main

import "fmt"

func main() {
    fmt.Println("Hello World")
}

编译,-gcflags是为了关闭编译器优化和函数内联,防止后面设置断点找不到相关代码

go mod init my
go build -gcflags  "-N -l" -o main main.go

查看文件入口

readelf -h main
image.png

entry point address为程序入口地址
反编译查看汇编代码

objdump -S ./main > out.txt
vim out.txt
/45ed60
image.png

这个是rt0_amd64_linux.s文件,该地址存放的指令是一条jmp指令

jump  45b340 rt0_amd64
跳转到0x4b340地址执行代码(存放于asm_amd64.s中) image.png
jump runtime.rt0_go

跳转到runtime的rt0_go执行代码,这个函数位于asm_amd64.s,函数很长,完成go启动时所有的初始化工作,这段代码完成之后,整个GO程序就可以跑起来了,是非常核心代码

2.1 runtime.rt0_go

汇编看着头疼,挑部分代码分析
1.g0在该函数完成初始化

    MOVQ    $runtime·g0(SB), DI
    LEAQ    (-64*1024+104)(SP), BX
    MOVQ    BX, g_stackguard0(DI)
    MOVQ    BX, g_stackguard1(DI)
    MOVQ    BX, (g_stack+stack_lo)(DI)
    MOVQ    SP, (g_stack+stack_hi)(DI)

这段汇编给g0分配栈空间(g是一个特殊的函数,它的栈就是特殊的函数栈,函数栈特性是高地址向低地址开辟空间),把g0地址存入di寄存器
BX = SP-64*1024+104 (实际g0栈空间大小为64K-104B,这里是将SP下移这么多空间来划分给g0)

之后将g0.stackguard0/stackguard1/stack.lo放到 bx低地址处(栈顶),g0.stack.hi放到高地址SP处(栈底),g0的栈又依赖于线程栈,那么这张图就能理解了吧 image.png

2.初始化m的tls字段,这里的m是第一个m,后续m照着它产生

    LEAQ    runtime·m0+m_tls(SB), DI
    CALL    runtime·settls(SB)

3.将g0的地址保存到线程m0的tls字段(线程私有数据,这是个数组,单字段指向数组第一个元素),即m0.tls[0] = &g0,
m0的信息也被保存在g0
m0.g0 = &g0
g0.m = &m0

 LEAQ    runtime·g0(SB), CX
MOVQ    CX, g(BX)
LEAQ    runtime·m0(SB), AX
MOVQ    CX, m_g0(AX)
MOVQ    AX, g_m(CX)
image.png

这里m0不是栈就是单纯的内存,我画快了忽略掉

4.系统核心初始化和调度器初始化,基本上schedinit执行完后,调度器相关参数就初始化好了

CALL    runtime·osinit(SB)
CALL    runtime·schedinit(SB)

5.创建一个新的协程来执行用户的代码

MOVQ    $runtime·mainPC(SB), AX
CALL    runtime·newproc(SB)

6.主线程进入循环调度,运行刚创建的新协程

CALL    runtime·mstart(SB)

TLS(THread Local Storage)线程本地私有全局变量

小结下:m0和g0的关系通过m0.tls串联起来,我们可以通过m0.tls或m0.g0找到g0,也能通过g0.m找到m0,osinit初始化的是系统的核心数于ncpu中,schedinit才是调度器启动的核心

2.2 schedinit

源码位置
前面省略一堆锁的初始化,涉及到全局变量,肯定要加锁
getg由编译器实现,获取函数当前运行的G结构体对象的地址,并告知系统线程的最多数量为1万个

gp:=getg()
sched.maxmcount = 10000

初始化栈空间

    stackinit()
    mallocinit()

继续初始化m0和gc等,在程序启动时,m0的初始化是在mcommoninit中完成的

mcommoninit(gp.m, -1)
gcinit()

mcommoninit源码地址,因为sched是全局唯一对象,多线程操作需要加锁,刚开始m0.id=0后续递增,在m的初始化过程中m会被挂到allm中(对象池,复用线程栈,避免频繁创建线程,allm是一个环状的链表),mp.alllink = allm即链表最后一个位置

func mcommoninit(mp *m, id int64) {
    lock(&sched.lock)
    if id >= 0 {
        mp.id = id
    } else {
        mp.id = mReserveID()
    }
    mp.alllink = allm
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)
}
image.png image.png

回到schedule初始化p个数,且初始化所有p

    procs := ncpu
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }

procresize是调度器启动的最后一步,之后调度器就启动相应数量的m,等待运行G

2.2.1 procresize

源码地址
procresize不仅在初始化的时候会执行。中途改变procs值也会执行
初始化所有P,allp是保存所有p的全局变量,最大数量为1024,该段代码为每个p申请(new)新对象,将p存放在allp中

    for i := old; i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
        }
        pp.init(i)
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

利用acquirep将p和m关联

        gp.m.p = 0
        pp := allp[0]
        pp.m = 0
        pp.status = _Pidle
        acquirep(pp)

把所有空闲的p放入空闲链表,runqempty用于判断p是否空闲

for i := nprocs - 1; i >= 0; i-- {
        if runqempty(pp) {
            pidleput(pp, now)
        }
}

返回有本地任务的p链表

return runnablePs

小结一下,该函数从堆上创建了nproc个p,新的p被放入allp中

2.2.2 acquirep

acquirep将p0和m0关联起来,就是一些字段的相互设置

func acquirep(pp *p) {
    wirep(pp)
    pp.mcache.prepareForSweep()
}

func wirep(pp *p) {
    gp.m.p.set(pp)
    pp.m.set(gp.m)
    pp.status = _Prunning
}

g0.m.p=p0
p0.m = m0


image.png

小结

小结一下初始化涉及到一下全局变量

  • allglen 所有g的长度
  • allgs []*g 保存所有g
  • allm *m 保存所有m
  • allp []*p 保存所有p
  • ncpu 核数
  • sched schedt类型,唯一全局对象,记录调度器工作状态和一些数据
  • m0 主线程
  • g0 负责调度的g

调度器初始化还干了什么?

  • 创建g0和m0,绑定关系m0.tls=&g0,m0.g0=&g,g0.m=&m
  • 根据核心数创建P,绑定P0和m0 ,g0.m.p=p0,p0.m = m0
    schedinit的procresize执行完意味着调度器初始化完成

m0是启动程序后的id=0的主线程,m0负责执行初始化操作和启动第一个g,之后就和其他m一样了
g0是每启动一个m就会创建的第一个g(后续线程都是拷贝m0,因此也会有g0)
p的创建是在schedinit的procresize完成的,也就是初始化阶段,与CPU核数挂钩(进程可获取到的)

2.3 newproc

回到2.1中的汇编代码

    MOVQ    $runtime·mainPC(SB), AX     // entry
    PUSHQ   AX
    CALL    runtime·newproc(SB)

AX = &funcval{runtime·main},将用户代码main函数作为newproc的参数压入栈(回忆一下函数栈的压栈过程),注意1.20newproc只有一个参数

func newproc(fn *funcval) {
    gp := getg()
    pc := getcallerpc()
    systemstack(func() {
        newg := newproc1(fn, gp, pc)

        pp := getg().m.p.ptr()
        runqput(pp, newg, true)

        if mainStarted {
            wakep()
        }
    })
}

type funcval struct {
    fn uintptr
    // variable-size, fn-specific data here
}

systemstack(是汇编实现的)是在系统栈上运行函数fn,fn之后调用newproc1,newproc1会获取或创建新的G协程结构体

    mp := acquirem() // disable preemption because we hold M and P in local vars.
    pp := mp.p.ptr()
    newg := gfget(pp) //寻找空闲的G不存在则创建
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }

注意这个 malg函数,malg调用stackalloc,这个是在堆上申请栈空间,因为如果最开始的2k空间不够或太多就会存在扩容和缩容的问题,系统线程栈上会产生栈空间碎片。那么在堆上申请最后就需要销毁(还给堆),要进行垃圾回收(释放)

g0栈不需要扩容和缩容,空间更大,固定不变,因此在系统线程栈上申请

回到newproc1代码,确定参数的入栈位置,参数的入参位置是sp处开始

sp := newg.stack.hi - totalSize
spArg := sp

将参数从newproc的函数栈拷贝到新g的栈

memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))

在新的g上执行指令就要用新g的栈,执行函数需要参数因此拷贝,
因为是初始化,newproc在g0栈上执行,这里我们将参数fn从g0栈拷贝到新g栈上
初始化newg的各种字段

    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
    newg.sched.g = guintptr(unsafe.Pointer(newg))

sched是g的一个字段,也是一个结构体这个之前学习过是gobuf类型,是G运行时的现场数据,切换G的时候依赖这个数据

type gobuf struct {
    sp   uintptr
    pc   uintptr
    g    guintptr
    ctxt unsafe.Pointer
    ret  uintptr
    lr   uintptr
    bp   uintptr // for framepointer-enabled architectures
}

newg.sched.pc表示当前新g被调度起来运行时,从这个地址开始执行指令,pc字段为goexit地址+1,即goexit函数的第二条指令,goexit函数是协程退出后的一些清理工作
newg.sched.g为新g的地址

gostartcallfn(&newg.sched, fn)

gostartcallfn解释了为什么要设置pc为goexit地址的第二行,它会将sp减小一个指针,这是给返回地址预留空间(学习函数栈时我们知道会把调用函数的调用方地址也给压入栈到栈顶),也就是说将goexit压入栈顶后,伪造了sp位置(类似函数栈的return address,让cpu看起来是从goexit函数中执行了协程任务,协程执行后返回goexit函数),当协程被执行时,从pc开始执行,之后协程执行完毕执行goexit

func gostartcallfn(gobuf *gobuf, fv *funcval) {
    var fn unsafe.Pointer
    gostartcall(gobuf, fn, unsafe.Pointer(fv))
}

func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
    sp := buf.sp
    sp -= goarch.PtrSize
    *(*uintptr)(unsafe.Pointer(sp)) = buf.pc
    buf.sp = sp
    buf.pc = uintptr(fn)
    buf.ctxt = ctxt
}
image.png

回到newproc1
修改newg状态为runable

newg.gopc = callerpc

回到newproc代码,设置好了newg,该让他调用了吧

 pp := getg().m.p.ptr()
runqput(pp, newg, true)

初始化时是从g0获取p

2.4runqput

源码
先尝试将新g放入到可执行队列中
next=false 放p私有队列LRQ尾部
next=true 放p的runnext字段中,故名思意,下一个运行,也可以理解为LRQ首部
LRQ满了塞不下怎么办?放到全局队列(优先级最低)

    if next {
    retryNext:
        oldnext := pp.runnext
//满员的情况
        if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
    t := pp.runqtail
//放到队列尾部
    if t-h < uint32(len(pp.runq)) {
        pp.runq[t%uint32(len(pp.runq))].set(gp)
        atomic.StoreRel(&pp.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
//全局队列slowqueue
    if runqputslow(pp, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry

这里我们可以总结出新G的放入P的顺序,先队首p.runnnext,再队尾p.LRQ,再全局slowqueue,这里放如全局队列也有门路,它不是直接放一个,而是会带过去一堆g(p满了,给p腾空间)

func runqputslow(pp *p, gp *g, h, t uint32) bool {
    var batch [len(pp.runq)/2 + 1]*g


    n := t - h
    n = n / 2

    for i := uint32(0); i < n; i++ {
        batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
    }
    if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp
先将p的所有g都加入一个数组中,数组长度为LRQ的一半加上newg,然后以链表的形式将这些g(一部分g)添加到GRQ上 image.png

实际上p的LRQ长度为256,是一个数组组成的环形链表

到这里g0切换到了newg的栈上来执行fn函数

小结

newproc1是获取g的函数,可以从缓存allgs缓存中获取G,也可以用malg函数在堆栈上创建G,之后根据传参规则塞入队列,一般新创建的g更有机会被调度,因此放到p队首

总结

image.png

相关文章

  • go 调度器实现

    GO 语言的调度器 目录 GMP 模型简介 调度器实现机制 GMP 模型简介 先来一张经典的GMP 关系图 G 是...

  • 调度器——GMP 调度模型

    调度器——GMP 调度模型 Goroutine 调度器,它是负责在工作线程上分发准备运行的 goroutines。...

  • go并发的那些事

    思考:go为什么那么擅长并发? 答:从设计理解上来讲我觉着golang的CSP并发模型与GMP调度器是基石。你看虽...

  • [视频版]-Golang深入理解GMP

    HELLO GOPHER! 相信越来越多的Go浪小伙伴,都对Golang的GMP调度器流连忘返,GMP很多书籍都有...

  • Go语言中调度器之GMP模型

    前言 随着服务器硬件迭代升级,配置也越来越高。为充分利用服务器资源,并发编程也变的越来越重要。在开始之前,需要了解...

  • 一文入门 Go 的性能分析

    Go 为了实现更高的并发,自己实现了用户态的调度器,称之为 GMP 模型,在上一篇文章中,我们已经简单分析了它的实...

  • golang协程调度面试总结

    1.go的GMP模型 goroutine运行在用户态,是由runtime来控制调度的,调度过程中主要涉及到三个对象...

  • Golang后端面试汇总-001

    基础面试 go的调度 为什么在内核的线程调度器之外Go还需要一个自己的调度器? go struct能不能比较 go...

  • 【go笔记】goroutine调度器的GMP模型简介

    说起go语言,离不开goroutine。之前使用go语言开发的时候,也没多少机会用到goroutine。趁这些天了...

  • Golang调度器和GMP模型

    一、调度器的由来 调度本身是指操作系统中为每个任务分配其所需资源的方法。 在操作系充中,线程是任务执行的最小单位,...

网友评论

      本文标题:GO基础学习(10)GMP模型-调度器初始化

      本文链接:https://www.haomeiwen.com/subject/tjrgjdtx.html