美文网首页
深入分析go调度(二)

深入分析go调度(二)

作者: lucasgao | 来源:发表于2021-04-03 22:11 被阅读0次

以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?__biz=MzU1OTg5NDkzOA==&hid=1&sn=8fc2b63f53559bc0cee292ce629c4788&scene=18#wechat_redirect

1. Goroutine调度器

1. goroutine简介

首先为什么Go语言为什么要引入协程Goroutine呢。

这主要是因为操作系统的线程太重了,具体表现在

  1. 创建和切换太重:所有的线程切换都需要系统调用进入内核,而进入内核的性能代价比较大
  2. 内存使用太重:一方面,为了尽量避免极端情况下操作系统线程栈的溢出,内核在创建操作系统线程时默认会为其分配一个较大的栈内存(虚拟地址空间,内核并不会一开始就分配这么多的物理内存),然而在绝大多数情况下,系统线程远远用不了这么多内存,这导致了浪费;另一方面,栈内存空间一旦创建和初始化完成之后其大小就不能再有变化,这决定了在某些特殊场景下系统线程栈还是有溢出的风险。

可以使用 ulimit -a命令查看本机的栈大小。如下图,栈的代销为8192kB=8MB。

image-20200512220534183

所有go引进了Goroutine,相比较旧特别轻量

  1. Goroutine是用户态线程,创建和切换都可以再用户态完成,无需进入内核,开销比较小。
  2. Goroutine默认大小是2k,相比操作系统线程的8M,简直是太小了。不过虽然协程栈比较小,但是支持自动扩容和收缩,所以也不用担心内存不够用或者浪费。

Goroutine的调度简介

所谓对协程的调度,其实就是Go 按照一定的算法逻辑挑选出合适的 Goroutine并放到CPU上运行的过程。

在go中 有专门负责调度的代码,我们成为goroutine调度器。

大概逻辑如下:


for i:=0;i<N;i++{ // 创建N个操作系统执行schedule函数
  create_os_thread(schedule)
}

func schedule(){
  for{
    g := find_a_runnable_g_from_M_gs()
    run_g(g)
    save_status_of_g(g)
  }
}

总结:也就算程序开始创建N个线程去执行schedule函数。

每个schedule函数 回去寻找一个可以执行的g,然后执行g,在保存g的状态,循环往复。

调度器相关的数据结构源码概述

无他,所谓调度,本质上都是对寄存器的一些操作。

只不过线程调度是在内核态,由操作系统完成。

而协程调度是在用户态,由go自己完成。

如上,go自己要完成对协程的调度。那么go就需要引入一个数据结构来保存协程中的一些值(寄存器等)。

这就是g

1. 当协程被调离的时候,它的相关信息都保存在g中
2. 当协程被恢复的时候,调度器又可以通过g将其状态还原。

现在有了g,那么对g进行调度就需要调度器。不过调度器本身也是需要保存自身状态的,所以这就有了schedt结构体。因为一个go程序只有一个调度器,所以schedt全局唯一。

有了调度器,有了协程,我们还需要一个队列来保存可以运行的g,这就是局部运行队列。考虑到并发,go为每个p都配了一个局部运行队列,他保存着一系列等待执行的g.那么什么是p呢,它是processor的简写,是个掮客,协调着工作线程和g之间的关系。

除了这些,还有一个结构,它需要跟工作线程打交道,同时接收g,这就是m。每个工作线程都有唯一的一个m结构体与之对应,m结构体记录着线程的的栈信息等。

除此,还有一个全局运行队列,它是一个保存在schedt中的运行队列。具体后面再说。

下面是上面几种结构的对应关系:

<img src="http://picgo.vipkk.work/20200513211350.png" alt="image-20200513211350567" style="zoom:40%;" />

一个细节

前面我们提到m会跟工作线程绑定,但是我们有多个m,那么是如何做到区分的呢。记得上面的伪代码吗,我们调度的程序是一致的,那怎么区分呢。这就用到了前面提到的本地存储,threadlocal。

具体结构体代码

TODO 因为太多不在这里展示

2. 调度器初始化

本章将以一个hello world程序为例,通过跟踪其从启动到退出的这一完整的运行流程来分析Go语言调度器的初始化、goroutine的创建与退出、工作线程的调度循环以及goroutine的切换等内容。

package main

import "fmt"

func main(){
  fmt.Println("Hello World!")
}

这节我们主要来分析调度器的初始化。

任何一个编译型语言(不管是C、C++、go)所编写的程序在操作系统上运行都有以下几个阶段

  1. 从磁盘上把可执行程序读入内存
  2. 创建进程和主线程
  3. 为主线程分配栈空间
  4. 把用户在命令行输入的参数拷贝到主线程的栈上
  5. 把主线程放在操作系统的运行队列等待被调度执行起来。

在主线程第一次被调度起来之前,栈如下图所示:

<img src="http://picgo.vipkk.work/20200513221220.png" alt="image-20200513221220322" style="zoom:50%;" />

接下来我们从代码出发讲解下 调度器中的各个数据结构是怎么关联起来的

程序入口

首先我们需要找到梦(代码)开始的地方。

首先我们编译程序,然后通过gdb调试go build -o hello;gdb hello,然后输入info files查看调试信息

image-20200515215956835

可以发现入口在0x454b70,然后 在这里打断点b *0x454b70,可以发现程序入口的地方。对应代码如下,暂时不接受。

以下代码太多,建议结合源码食用


// 梦开始的地方 runtime/rt0_linux_amd64.s:8
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
    JMP _rt0_amd64(SB)

TEXT _rt0_amd64(SB),NOSPLIT,$-8
    MOVQ    0(SP), DI   // argc
    LEAQ    8(SP), SI   // argv
    JMP runtime·rt0_go(SB)
    
// runtime/asm_amd64.s  
TEXT runtime·rt0_go(SB),NOSPLIT,$0
    // copy arguments forward on an even stack
    MOVQ    DI, AX      // argc
    MOVQ    SI, BX      // argv
    SUBQ    $(4*8+7), SP        // 2args 2auto 16字节对齐
    ANDQ    $~15, SP
    MOVQ    AX, 16(SP)
    MOVQ    BX, 24(SP)

g0初始化

rt0_go中将要完成了我们程序启动所有的的初始化工作。

上面代码基本上就是将SP对齐,接着往下

// create istack out of the given (operating system) stack.
    // _cgo_init may update stackguard.
    // g0 -> DI
    MOVQ    $runtime·g0(SB), DI

    LEAQ    (-64*1024+104)(SP), BX  // BX = SP - 64 * 1024 + 104
    MOVQ    BX, g_stackguard0(DI)   // g0.stackguard0 =  SP - 64 * 1024 + 104
    MOVQ    BX, g_stackguard1(DI)   // g0.stackguard1 =  SP - 64 * 1024 + 104
    MOVQ    BX, (g_stack+stack_lo)(DI)  // g0.stack.lo = SP - 64 * 1024 + 104
    MOVQ    SP, (g_stack+stack_hi)(DI)  // g0.stack.l1 = SP

这里完成了对g0的初始化, 可见g0的栈大小为 64K。

此时g0与程序栈之间的关系如图

<img src="http://picgo.vipkk.work/20200515221806.png" alt="image-20200515221806358" style="zoom:40%;" />

注意,g0是全局变量,所以这时候g0的内存已经申请好了,在全局变量区(data+bss).

上面图的关系 我们可以通过gdb 打印下地址查看下

image-20200515222801728

从图中可以看到 hi 指向了sp,其他亦然。

m0初始化&绑定OS线程

g0的栈好之后,我们来看下m0的初始化。

跳过CPU型号检查和cgo初始化的代码,往下看

    // 初始化m的tls。DI= &m0.tls, 取 m0的tls成员地址到DI寄存器
    LEAQ    runtime·m0+m_tls(SB), DI
    // 调用 settls设置 线程本地存储,settls 函数的参数在DI寄存器中。 之后,可以通过fs断寄存器找到 m.tls。
    CALL    runtime·settls(SB)

    // store through it, to make sure it works
    get_tls(BX) // 获取fs段基址并放入BX寄存器, 其实就是 m0.tls[1] 的地址,get_tls的代码由编译器生成
    MOVQ    $0x123, g(BX) // set m0.tls[0] = 0x123, 也就算 fs地址-8的内存位置。 g 代码-8
    MOVQ    runtime·m0+m_tls(SB), AX // AX = m0.tls[0]
    CMPQ    AX, $0x123 // 比较
    JEQ 2(PC)
    CALL    runtime·abort(SB) // 如果线程本地存储不能正常工作,退出程序

这段代码调用 settls 来实例化 m0.tls, tls就是对应之前提到thread local storage,目的是为了把m0这个数据结构和线程关联在一起,从而我们可以把m0当做线程的抽象。

其中settls函数就是把fs寄存器的地址设置为了tls[1]的地址。

继续:

ok:
    // set the per-goroutine and per-mach "registers"
    get_tls(BX)
    LEAQ    runtime·g0(SB), CX // CX = g0
    MOVQ    CX, g(BX) // m0.tls[0]= g0
    LEAQ    runtime·m0(SB), AX // AX = &m0

  // 以下两行代码是把 m0和g0串了起来
    // save m->g0 = g0
    MOVQ    CX, m_g0(AX)
    // save m0 to g0->m
    MOVQ    AX, g_m(CX)

以上,在m0于主线程绑定之后,我们又通过指针把m0和g0关联在了一起。此时内存布局如下

image-20200516160536516

可以看到,内存指针一致。

另外m0.tls[0] 存储的也是g0地址,tls[1]的地址其实是fs基地址(这个可以参考之前的tls章节)。

此时内存如下图:

<img src="http://picgo.vipkk.work/20200516172239.png" alt="image-20200516172239213" style="zoom:50%;" />

继续

    MOVL    16(SP), AX      // copy argc
    MOVL    AX, 0(SP)
    MOVQ    24(SP), AX      // copy argv
    MOVQ    AX, 8(SP)
    CALL    runtime·args(SB)
    CALL    runtime·osinit(SB)
    CALL    runtime·schedinit(SB) // 调度器初始化

m0初始化完成之后,开始对调度器初始化

调度器初始化

在这里就是go 语言了

// The bootstrap sequence is:
//
//  call osinit
//  call schedinit
//  make & queue new G
//  call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
    // raceinit must be the first call to race detector.
    // In particular, it must be done before mallocinit below calls racemapshadow.
    // getg 是个什么鬼,getg 由编译器实现,大概原理就是 通过fs 找到 tls[1], 再-8找到g0,也就算当前运行的g
    _g_ := getg() // _g_=&g0
    //  ......

  //设置最多启动10000个操作系统线程,也是最多10000个M
  sched.maxmcount = 10000

  // ......

  mcommoninit(_g_.m) //初始化m0,因为从前面的代码我们知道g0->m = &m0

  // ......

  sched.lastpoll = uint64(nanotime())
  procs := ncpu  //系统中有多少核,就创建和初始化多少个p结构体对象
  if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n //如果环境变量指定了GOMAXPROCS,则创建指定数量的p
  }
  if procresize(procs) != nil {//创建和初始化全局变量allp
    throw("unknown runnable goroutine during bootstrap")
  }

  // ......

}

以上我们对schedinit函数提取了下大纲。这次我们主要关注以下几点

  1. getg()

    getg()是汇编实现的,大概就是从tls取出当前运行的g,在这里就是g0。

  2. 设置最多线程个数为10000个,(对应内存也就是80000M ~= 80G)

  3. mcommoninit

    1. 对m0
    2. 对p

mcommoninit

  1. 对m0做另外的一些初始化
  2. 创建和实例化p
func mcommoninit(mp *m) {
    _g_ := getg()

    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        callers(1, mp.createstack[:])
    }

    // 因为sched是一个全局变量,多个线程同时操作 sched会有并发问题,因此要先加锁,操作结束后再解锁。
    lock(&sched.lock)
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext // 给id赋值
    sched.mnext++       // m0的 id就是0,并且之后创建的m的id是递增的。
    checkmcount()       // 检查是否超过数量限制

    // random 初始化
    mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
    mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
    if mp.fastrand[0]|mp.fastrand[1] == 0 {
        mp.fastrand[1] = 1
    }

    // 创建用于信号处理的goroutine ,gsignal,只是简单的从堆上分配一个g结构体对象,然后把栈设置好久返回了
    mpreinit(mp)
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // 把m 挂入全局链表 allm之中
    // 类比 m0.next = allm allm = m0
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
        mp.cgoCallers = new(cgoCallers)
    }
}

这里主要对m0做了以下几个事情

  1. 赋值id
  2. 初始化m的随机数
  3. 信号初始化(后续分析抢占调度的时候用得到)
  4. 把m0自己放到allm全局变量里。

此时 allm 指向m0

image-20200516175358260

procresize

// Change number of processors. The world is stopped, sched is locked.
// gcworkbufs are not being modified by either the GC or
// the write barrier code.
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
    if trace.enabled {
        traceGomaxprocs(nprocs)
    }

    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    if nprocs > int32(len(allp)) { // 初始化的时候 len(allp) = 0,因为目前还没有P
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)]) // 代码中途调整p的数量?
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    // 循环创建 nproc个p,并完成基本初始化
    for i := old; i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p) // 调用内存分配器从堆上分配一个 struct p
        }
        pp.init(i) // 设置p的id,mcache等。
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else { // 初始化的时候执行这个分支
        // release the current P and acquire allp[0].
        //
        // We must do this before destroying our current P
        // because p.destroy itself has write barriers, so we
        // need to do that from a valid P.
        if _g_.m.p != 0 {
            if trace.enabled {
                // Pretend that we were descheduled
                // and then scheduled again to keep
                // the trace sane.
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0
        }
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 把 p和m0关联起来
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }

    // release resources from unused P's
    // 调整P的个数
    for i := nprocs; i < old; i++ {
        p := allp[i]
        p.destroy()
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs {
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }

    // 把所有空闲的P放入空闲链表
    // 它将除了p0的所有非空闲的P,放入P链表runnablePs,并返回procresize函数的调用者,
    // 并由其来调度这些P
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p { // allp[0] 跟m0 关联了,所有不放入
            continue
        }
        p.status = _Pidle
        // 如果P 是空闲的,则放入 idle里
        if runqempty(p) { // 通过 runqempty 判断p 的LRQ中是否包含G,
            pidleput(p) // 初始化除了allp[0]其他P 全部执行这个分支,分入空闲链表
        } else {
            // ??
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    // 初始化一个随机的分配器
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}

这个代码干了以下几个事情

  1. 初始化全局变量allp
  2. 填充allp
  3. m0和allp[0]绑定
  4. 除allp[0]之外的所有p放到 等待队列中

allp是全局变量

并且会把m0和allp[0]关联起来。

image-20200516181153374 image-20200516181924462

这里没太搞明白内存的对应关系

至此,m0,g0和p就关联在一起了。

如图:

<img src="http://picgo.vipkk.work/20200516181939.png" alt="image-20200516181939265" style="zoom:50%;" />

相关文章

  • 深入分析go调度(二)

    以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?...

  • 深入分析go调度(一)

    以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?...

  • 深入分析go调度(三)

    以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?...

  • 深入分析go调度(四)

    以下文章均为拜读公众号 源码游记 的笔记 http://mp.weixin.qq.com/mp/homepage?...

  • Go调度相关

    go 调度go routinue在线程中进行调度 GPM的概念: G(Goroutine): 即Go协程,每个go...

  • Golang后端面试汇总-001

    基础面试 go的调度 为什么在内核的线程调度器之外Go还需要一个自己的调度器? go struct能不能比较 go...

  • 谈谈Go的调度实现

    谈谈Go的调度实现 本章主要针对Go调度相关介绍。仅关注linux系统下的逻辑。代码版本GO1.9.2。 本章例子...

  • Go 语言调度(二): goroutine 调度器

    介绍 上一篇文章我对操作系统级别的调度进行了讲解,这对理解 Go 语言的调度器是很重要的。这篇文章,我将解释下 G...

  • 2019-03-20

    1. go的并发调度模型? go的并发调度模型可以简称为GPM模型,其中G代表goroutine,P代表gorou...

  • 高伸缩性Go调度器设计(译)

    阅读该文档前假设你已经对go语言及其当前调度实现的有所了解 当前调度器所存在的问题 当前的调度器限制了go并发的伸...

网友评论

      本文标题:深入分析go调度(二)

      本文链接:https://www.haomeiwen.com/subject/pvfkkltx.html