美文网首页
golang 调度器

golang 调度器

作者: Stevennnmmm | 来源:发表于2021-01-18 21:33 被阅读0次

    今天来讲一下调度器,我本来写了两个版本,后面发现都好像不太好,其实核心差不太多,就是层次不够清晰,然后在度娘上又啃了几篇相关的文章,又进行了综合一下,文章末尾有引用的文章链接。不得不说,大佬们画图还是非常厉害的。其实突然在这个期间发现一些问题:就是markdown模式下的话容易让人看不清重点,最近在找替代简书的地方,有推荐的可以留言推荐

    (一)调度器的核心点:

    • 1.复用线程
      避免频繁的创建销毁线程,我们知道线程的启停销毁是很耗费性能的一件事情,我们就要reuse thread(线程复用) ,那么具体该怎么来处理。golang :”create threads when needed ;keep them around for reuse“,当我们需要创建的时候就创建,然后保留它来复用,看过M结构的人就知道M有个allM的字段就是保存所有的M的链表。
    • 2.利用并行
      设置GOMAXPROCS 来进行设置go program的核心数,程序的并行
    • 3.stealing working
      利用小偷算法,当本地的队列没有g了,去别的地方偷一半的g进行运行,保证任务的公平性
    • 4.handoff
      利用移交算法,当本线程因为系统调用进行阻塞的时候,线程释放绑定的P,把P给其他的M执行

    值得一说的是:Go1.1之前只有G-M模型,没有P,Dmitry Vyukov在Scalable Go Scheduler Design Doc提出该模型在并发伸缩性方面的问题,并通过加入P(Processors)来改进该问题。

    (二)重要结构体

    G:goroutine

    每次go调用的时候,都会创建一个G对象,它包括栈、指令指针以及对于调用goroutines很重要的其它信息,比如阻塞它的任何channel,其主要数据结构

    // Go1.11版本默认stack大小为2KB
    
    _StackMin = 2048
     
    // 创建一个g对象,然后放到g队列
    // 等待被执行
    func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
        _g_ := getg()
    
        _g_.m.locks++
        siz := narg
        siz = (siz + 7) &^ 7
    
        _p_ := _g_.m.p.ptr()
        newg := gfget(_p_)    
        if newg == nil {        
           // 初始化g stack大小
            newg = malg(_StackMin)
            casgstatus(newg, _Gidle, _Gdead)
            allgadd(newg)
        }    
        // 以下省略}
    
    M:mechine

    代表一个线程,每次创建一个M的时候,都会有一个底层线程创建;所有的G任务,最终还是在M上执行,其主要数据结构

    type m struct {    
        /*
            1.  所有调用栈的Goroutine,这是一个比较特殊的Goroutine。
            2.  普通的Goroutine栈是在Heap分配的可增长的stack,而g0的stack是M对应的线程栈。
            3.  所有调度相关代码,会先切换到该Goroutine的栈再执行。
        */
        g0       *g
        curg     *g         // M当前绑定的结构体G
    
        // SP、PC寄存器用于现场保护和现场恢复
        vdsoSP uintptr
        vdsoPC uintptr
    
        // 省略…}
    
    P:Processor

    代表一个处理器,每一个运行的M都必须绑定一个P,就像线程必须在么一个CPU核上执行一样,由P来调度G在M上的运行,P的个数就是GOMAXPROCS(最大256),启动时固定的,一般不修改;M的个数和P的个数不一定一样多(会有休眠的M或者不需要太多的M)(最大10000);每一个P保存着本地G任务队列,也有一个全局G任务队列。P的数据结构

    // 自定义设置GOMAXPROCS数量
    func GOMAXPROCS(n int) int {    
        /*
            1.  GOMAXPROCS设置可执行的CPU的最大数量,同时返回之前的设置。
            2.  如果n < 1,则不更改当前的值。
        */
        ret := int(gomaxprocs)
    
        stopTheWorld("GOMAXPROCS")    
        // startTheWorld启动时,使用newprocs。
        newprocs = int32(n)
        startTheWorld()    
        return ret
    }
    
    // 默认P被绑定到所有CPU核上
    // P == cpu.cores
    
    func getproccount() int32 {    
        const maxCPUs = 64 * 1024
        var buf [maxCPUs / 8]byte
    
    
        // 获取CPU Core
        r := sched_getaffinity(0, unsafe.Sizeof(buf), &buf[0])
    
        n := int32(0)    
        for _, v := range buf[:r] {        
           for v != 0 {
                n += int32(v & 1)
                v >>= 1
            }
        }    
        if n == 0 {
           n = 1
        }    
        return n
    }
    // 一个进程默认被绑定在所有CPU核上,返回所有CPU core。
    // 获取进程的CPU亲和性掩码系统调用
    // rax 204                          ; 系统调用码
    // system_call sys_sched_getaffinity; 系统调用名称
    // rid  pid                         ; 进程号
    // rsi unsigned int len             
    // rdx unsigned long *user_mask_ptr
    sys_linux_amd64.s:
    TEXT runtime·sched_getaffinity(SB),NOSPLIT,$0
        MOVQ    pid+0(FP), DI
        MOVQ    len+8(FP), SI
        MOVQ    buf+16(FP), DX
        MOVL    $SYS_sched_getaffinity, AX
        SYSCALL
        MOVL    AX, ret+24(FP)
        RET
    

    (三)调度过程

    image
    • 我们通过 go func()来创建一个goroutine;g 的结构是可复用的,对于可复用的g也是有local队列和global队列的,用:p.freeg 这个参数,全局队列就是sched.pfree,获取参数都是差不多的,优先从p.gfree中获取,这一步是无锁的,否者就从sched.pfree中获取一部分过来这是有锁的一个操作

    • 有两个存储G的队列,一个是局部调度器P的本地队列、一个是全局G队列。新创建的G会优先尝试放到p的runnext中,作为下一个执行G,如果不行就得放到我们的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;

    • G只能运行在M中,一个M必须持有一个P,M与P是1:1的关系。M会从P的本地队列弹出一个可执行状态的G来
      执行,如果P的本地队列为空,就会想其他的MP组合偷取一个可执行的G来执行;

    • 一个M调度G执行的过程是一个循环机制;

    • 当M执行某一个G时候如果发生了syscall或则其余阻塞操作,M会阻塞,如果当前有一些G在执行,runtime会把
      这个线程M从P中摘除(detach),然后再创建一个新的操作系统的线程(如果有空闲的线程可用就复用空闲线程)来
      服务于这个P;

    • 当M系统调用结束时候,这个G会尝试获取一个空闲的P执行,并放入到这个P的本地队列。如果获取不到P,
      那么这个线程M变成休眠状态, 加入到空闲线程中,然后这个G会被放入全局队列中

    3.1G的几种暂停方式
      1. gosched: 将当前的G暂停,保存堆栈状态,以_GRunnable状态放入Global队列中,让当前M继续执行其它任务。无需对G进行唤醒操作,因为总会有M从Global队列取得并执行该G。抢占调度即使用该方式
    • 2.gopark: 与goched的最大区别在于gopark没有将G放回执行队列,而是位于某个等待队列中(如channel的waitq,此时G状态为_Gwaitting),因此G必须被手动唤醒(通过goready),否则会丢失任务。应用层阻塞通常使用这种方式。
    • 3.notesleep: 既不让出M,也不让G和P重新调度,直接让线程休眠直到被唤醒(notewakeup),该方式更快,通常用于gcMark,stopm这类自旋场景
    • 4.notesleepg: 阻塞G和M,放飞P,P可以和其它M绑定继续执行,比如可能阻塞的系统调用会主动调用entersyscallblock,则会触发 notesleepg
    • 5.goexit: 立即终止G任务,不管其处于调用堆栈的哪个层次,在终止前,确保所有defer正确执行。

    (四)调度源码

    
    // go1.9.1  src/runtime/proc.go
    // 省略了GC检查等其它细节,只保留了主要流程
    // g:       G结构体定义
    // sched:   Global队列
    // 获取一个待执行的G
    func findrunnable() (gp *g, inheritTime bool) {
        // 获取当前的G对象
        _g_ := getg()
    
    top:
        // 获取当前P对象
        _p_ := _g_.m.p.ptr()
    
        // 1. 尝试从P的Local队列中取得G 优先_p_.runnext 然后再从Local队列中取
        if gp, inheritTime := runqget(_p_); gp != nil {
            return gp, inheritTime
        }
    
        // 2. 尝试从Global队列中取得G
        if sched.runqsize != 0 {
            lock(&sched.lock)
            // globrunqget从Global队列中获取G 并转移一批G到_p_的Local队列
            gp := globrunqget(_p_, 0)
            unlock(&sched.lock)
            if gp != nil {
                return gp, false
            }
        }
    
        // 3. 检查netpoll任务:检测是否存在M阻塞
        if netpollinited() && sched.lastpoll != 0 {
            if gp := netpoll(false); gp != nil { // non-blocking
                // netpoll返回的是G链表,将其它G放回Global队列
                injectglist(gp.schedlink.ptr())
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {
                    traceGoUnpark(gp, 0)
                }
                return gp, false
            }
        }
    
        // 4. 尝试从其它P窃取任务
        procs := uint32(gomaxprocs)
        if atomic.Load(&sched.npidle) == procs-1 {
            goto stop
        }
        if !_g_.m.spinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
        }
        for i := 0; i < 4; i++ {
            // 随机P的遍历顺序
            for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
                if sched.gcwaiting != 0 {
                    goto top
                }
                stealRunNextG := i > 2 // first look for ready queues with more than 1 g
                // runqsteal执行实际的steal工作,从目标P的Local队列转移一般的G过来
                // stealRunNextG指是否steal目标P的p.runnext G
                if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                    return gp, false
                }
            }
        }
        ...
    }
    
    
    4.1用户态的阻塞

    当Goroutine因为Channel操作而阻塞(通过gopark)时,对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G。

    当阻塞的G被G2唤醒(通过goready)时(比如channel可读/写),G会尝试加入G2所在P的runnext,然后再是P Local队列和Global队列。简单解释一下:当G是chan<-的接收消息,被阻塞了,如果G2是Chan的写消息,当G阻塞,G2写入了一个数据,那么G就被G2唤醒了,G就被放到了G2的P的runnext,如果放成功了,就是跳过了排队,然后执行,如果失败了就丢入local队列

    4.2系统调用阻塞:syscall

    当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall状态,M也处于block on syscall状态,此时仍然可被抢占调度: 执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G。如果没有其它idle的M,但队列中仍然有G需要执行,则创建一个新的M。

    当系统调用完成后,G会重新尝试获取一个idle的P,并恢复执行,如果没有idle的P,G将加入到Global队列。

    系统调用能被调度的关键有两点:

    runtime/syscall包中,将系统调用分为SysCall和RawSysCall,前者和后者的区别是前者会在系统调用前后分别调用entersyscall和exitsyscall(位于src/runtime/proc.go),做一些现场保存和恢复操作,这样才能使P安全地与M解绑,并在其它M上继续执行其它G。某些系统调用本身可以确定会长时间阻塞(比如锁),会调用entersyscallblock在发起系统调用前直接让P和M解绑(handoffp)。

    4.3GMP的几个状态
    • P的几个状态:

    (五)sysmon

    sysmon是一个由runtime启动的M,也叫监控线程,它无需P也可以运行,它每20us~10ms唤醒一次,主要执行:

    释放闲置超过5分钟的span物理内存;
    如果超过2分钟没有垃圾回收,强制执行;
    将长时间未处理的netpoll结果添加到任务队列;
    向长时间运行的G任务发出抢占调度;
    收回因syscall长时间阻塞的P;
    

    入口在src/runtime/proc.go:sysmon函数,它通过retake实现对syscall和长时间运行的G进行调度:

    func retake(now int64) uint32 {
        n := 0
        for i := int32(0); i < gomaxprocs; i++ {
            _p_ := allp[i]
            if _p_ == nil {
                continue
            }
            pd := &_p_.sysmontick
            s := _p_.status
            if s == _Psyscall {
                // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
                t := int64(_p_.syscalltick)
                if int64(pd.syscalltick) != t {
                    pd.syscalltick = uint32(t)
                    pd.syscallwhen = now
                    continue
                }
                // 如果当前P Local队列没有其它G,当前有其它P处于Idle状态,并且syscall执行事件不超过10ms,则不用解绑当前P(handoffp)
                if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                    continue
                }
                // handoffp
                incidlelocked(-1)
                if atomic.Cas(&_p_.status, s, _Pidle) {
                    if trace.enabled {
                        traceGoSysBlock(_p_)
                        traceProcStop(_p_)
                    }
                    n++
                    _p_.syscalltick++
                    handoffp(_p_)
                }
                incidlelocked(1)
            } else if s == _Prunning {
                // Preempt G if it's running for too long.
                t := int64(_p_.schedtick)
                if int64(pd.schedtick) != t {
                    pd.schedtick = uint32(t)
                    pd.schedwhen = now
                    continue
                }
                // 如果当前G执行时间超过10ms,则抢占(preemptone)
                if pd.schedwhen+forcePreemptNS > now {
                    continue
                }
                // 执行抢占
                preemptone(_p_)
            }
        }
        return uint32(n)
    }
    
    

    抢占式调度

    当某个goroutine执行超过10ms,sysmon会向其发起抢占调度请求,由于Go调度不像OS调度那样有时间片的概念,因此实际抢占机制要弱很多: Go中的抢占实际上是为G设置抢占标记(g.stackguard0),当G调用某函数时(更确切说,在通过newstack分配函数栈时),被编译器安插的指令会检查这个标记,并且将当前G以runtime.Goched的方式暂停,并加入到全局队列。源代码如下:

    // src/runtime/stack.go
    // Called from runtime·morestack when more stack is needed.
    // Allocate larger stack and relocate to new stack.
    // Stack growth is multiplicative, for constant amortized cost.
    func newstack(ctxt unsafe.Pointer) {
        ...
        // gp为当前G
        preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
        if preempt {
            ...
    
            // Act like goroutine called runtime.Gosched.
            // G状态由_Gwaiting变为 _Grunning 这是为了能以Gosched的方式暂停Go
            casgstatus(gp, _Gwaiting, _Grunning)
            gopreempt_m(gp) // never return
        }
    }
    
    // 以goched的方式将G重新放入
    func goschedImpl(gp *g) {
        status := readgstatus(gp)
        // 由Running变为Runnable
        casgstatus(gp, _Grunning, _Grunnable)
        // 与M解除绑定
        dropg()
        lock(&sched.lock)
        // 将G放入Global队列
        globrunqput(gp)
        unlock(&sched.lock)
        // 重新调度
        schedule()
    }
    
    
    func gopreempt_m(gp *g) {
        if trace.enabled {
            traceGoPreempt()
        }
        goschedImpl(gp)
    }
    
    

    netpoll

    前面的findrunnable,G的获取除了p.runnext,p.runq和sched.runq外,还有一中G从netpoll中获取,netpoll是Go针对网络IO的一种优化,本质上为了避免网络IO陷入系统调用之中,这样使得即便G发起网络I/O操作也不会导致M被阻塞(仅阻塞G),从而不会导致大量M被创建出来。

    相关文章

      网友评论

          本文标题:golang 调度器

          本文链接:https://www.haomeiwen.com/subject/jwvinktx.html