美文网首页
Go scheduler 源码分析

Go scheduler 源码分析

作者: ddu_sw | 来源:发表于2019-04-08 10:12 被阅读0次

    1、进程/线程/协程基本概念

    一个进程可以有多个线程,一般情况下固定2MB内存块来做栈,用来保存当前被调用/挂起的函数内部的变量,CPU在执行调度的时候切换的是线程,如果下一个线程也是当前进程的,就只有线程切换,“很快”就能完成;如果下一个线程不是当前的进程,就需要切换进程,这就得费点时间了。

    线程分为内核态线程用户态线程,用户态线程需要绑定内核态线程,CPU并不能感知用户态线程的存在,它只知道它在运行1个线程,这个线程实际是内核态线程。

    用户态线程实际有个名字叫协程(co-routine),为了容易区分,我们使用协程指用户态线程,使用线程指内核态线程。

    协程跟线程是有区别的,线程由CPU调度是抢占式的,协程由用户态调度是协作式的,一个协程让出CPU后,才执行下一个协程。

    协程和线程绑定关系有以下3种:

    N:1,N个协程绑定1个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1个进程的所有协程都绑定在1个线程上,一是某个程序用不了硬件的多核加速能力,二是一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。

    1:1,1个协程绑定1个线程,这种最容易实现。协程的调度都由CPU完成了,不存在N:1缺点,但有一个缺点是协程的创建、删除和切换的代价都由CPU完成,有点略显昂贵了。

    M:N,M个协程绑定N个线程,是N:1和1:1类型的结合,克服了以上2种模型的缺点,但实现起来最为复杂。

    2、Golang简介

    2.1 Goroutine 概念

    因为线程切换需要很大的上下文,这种切换消耗了大量CPU时间,所以Go的并行单元并不是传统意义上的线程,而是采用更轻量的协程(goroutine)来处理,大大提高了并行度,因此Go被称为“最并行的语言”。

    2.2与其他并发模型的对比

    Python等解释性语言采用的是多进程并发模型,进程的上下文是最大的,所以切换耗费巨大,同时由于多进程通信只能用socket通讯,或者专门设置共享内存,给编程带来了极大的困扰与不便;

    C++等语言通常会采用多线程并发模型,相比进程,线程的上下文要小很多,而且多个线程之间本来就是共享内存的,所以编程相比要轻松很多。但是线程的启动和销毁,切换依然要耗费大量CPU时间;于是出现了线程池技术,将线程先储存起来,保持一定的数量,来避免频繁开启/关闭线程的时间消耗,但是这种初级的技术存在一些问题,比如有线程一直被IO阻塞,这样的话这个线程一直占据着坑位,导致后面的任务排不到队,拿不到线程来执行;

    Go的并发较为复杂,Go采用了更轻量的数据结构来代替线程,这种数据结构相比线程更轻量,他有自己的栈,切换起来更快。然而真正执行并发的还是线程,Go通过调度器将goroutine调度到线程中执行,并适时地释放和创建新的线程,并且当一个正在运行的goroutine进入阻塞(常见场景就是等待IO)时,将其脱离占用的线程,将其他准备好运行的goroutine放在该线程上执行。通过较为复杂的调度手段,使得整个系统获得极高的并行度同时又不耗费大量的CPU资源。

    2.3 Goroutine的特点

    非阻塞。Goroutine的引入是为了方便高并发程序的编写。一个Goroutine在进行阻塞操作(比如系统调用)时,会把当前线程中的其他Goroutine移交到其他线程中继续执行,从而避免了整个程序的阻塞。

    调度器。虽然Golang引入了垃圾回收(gc),在执行gc时就要求Goroutine是停止的,但Go通过自己实现调度器,也可以方便的实现该功能。 通过多个Goroutine来实现并发程序,既有异步IO的优势,又具有多线程、多进程编写程序的便利性。

    自己维护堆栈。当然引入Goroutine,也意味着引入了极大的复杂性。一个Goroutine既要包含要执行的代码,又要包含用于执行该代码的栈、PC(PC值=当前程序执行位置+8)和SP指针。堆栈指针需要保证各种模式下程序完成性。

    既然每个Goroutine都有自己的栈,那么在创建Goroutine时,就要同时创建对应的栈。Goroutine在执行时,栈空间会不停增长。栈通常是连续增长的,由于每个进程中的各个线程共享虚拟内存空间,当有多个线程时,就需要为每个线程分配不同起始地址的栈。这就需要在分配栈之前先预估每个线程栈的大小。如果线程数量非常多,就很容易栈溢出。

    为了解决这个问题,就有了Split Stacks 技术:创建栈时,只分配一块比较小的内存,如果进行某次函数调用导致栈空间不足时,就会在其他地方分配一块新的栈空间。新的空间不需要和老的栈空间连续。函数调用的参数会拷贝到新的栈空间中,接下来的函数执行都在新栈空间中进行。Golang的栈管理方式与此类似,但是为了更高的效率,使用了连续栈( Golang连续栈) 实现方式也是先分配一块固定大小的栈,在栈空间不足时,分配一块更大的栈,并把旧的栈全部拷贝到新栈中。这样避免了Split Stacks方法可能导致的频繁内存分配和释放。

    Goroutine的执行是可以被抢占的。如果一个Goroutine一直占用CPU,长时间没有被调度过,就会被runtime抢占掉,把CPU时间交给其他Goroutine。 这个可以通过 debug/goroutine 阻塞实现。

    2.4 结构体

    M:指go中的工作者线程,是真正执行代码的单元;

    P:是一种调度goroutine的上下文,goroutine依赖于P进行调度,P是真正的并行单元;

    G:即goroutine,是go语言中的一段代码(以一个函数的形式展现),最小的并行单元;

    P必须绑定在M上才能运行,M必须绑定了P才能运行,而一般情况下,最多有MAXPROCS(通常等于CPU数量)个P,但是可能有很多个M,真正运行的只有绑定了M的P,所以P是真正的并行单元。

    每个P有一个自己的runnableG队列,可以从里面拿出一个G来运行,同时也有一个全局的runnable G队列,G通过P依附在M上面执行。不单独使用全局的runnable G队列的原因是,分布式的队列有利于减小临界区大小,想一想多个线程同时请求可用的G的时候,如果只有全局的资源,那么这个全局的锁会导致多少线程一直在等待。

    但是如果一个正在执行的G进入了阻塞,典型的例子就是等待IO,那么他和它所在的M会在那边等待,而上下文P会传递到其他可用的M上面,这样这个阻塞就不会影响程序的并行度。

    G结构体

    typegstruct{// Stack parameters.// stack describes the actual stack memory: [stack.lo, stack.hi).// stackguard0 is the stack pointer compared in the Go stack growth prologue.// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.// stackguard1 is the stack pointer compared in the C stack growth prologue.// It is stack.lo+StackGuard on g0 and gsignal stacks.// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).stack      stack// offset known to runtime/cgo //描述了真实的栈内存,包括上下界、stackguard0uintptr// offset known to liblinkstackguard1uintptr// offset known to liblink_panic        *_panic// innermost panic - offset known to liblink_defer        *_defer// innermost deferm              *m// current m; offset known to arm liblink  //当前的Msched          gobuf//goroutine切换时,用于保存g的上下文syscallspuintptr// if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpcuintptr// if status==Gsyscall, syscallpc = sched.pc to use during gcstktopspuintptr// expected sp at top of stack, to check in tracebackparam          unsafe.Pointer// passed parameter on wakeup 用于传递参数,睡眠时 其他goroutine可以设置param,唤醒时该goroutine可以获取atomicstatusuint32stackLockuint32// sigprof/scang lock;TODO:fold in to atomicstatusgoidint64//goroutine 的IDwaitsinceint64// approx time when the g become blocked  g被阻塞的 大概时间waitreasonstring// if status==Gwaitingschedlink      guintptr  preemptbool// preemption signal, duplicates stackguard0 = stackpreemptpaniconfaultbool// panic (instead of crash) on unexpected fault addresspreemptscanbool// preempted g does scan for gcgcscandonebool// g has scanned stack; protected by _Gscan bit in statusgcscanvalidbool// false at start of gc cycle, true if G has not run since last scan;TODO:remove?throwsplitbool// must not split stackraceignoreint8// ignore race detection eventssysblocktracedbool// StartTrace has emitted EvGoInSyscall about this goroutinesysexitticksint64// cputicks when syscall has returned (for tracing)tracesequint64// trace event sequencertracelastp    puintptr// last P emitted an event for this goroutinelockedm        muintptr//G被锁定只能在这个M运行siguint32writebuf      []bytesigcode0uintptrsigcode1uintptrsigpcuintptrgopcuintptr// pc of go statement that created this goroutinestartpcuintptr// pc of goroutine functionracectxuintptrwaiting        *sudog// sudog structures this g is waiting on (that have a valid elem ptr); in lock ordercgoCtxt        []uintptr// cgo traceback contextlabels        unsafe.Pointer// profiler labelstimer          *timer// cached timer for time.SleepselectDoneuint32// are we participating in a select and did someone win the race?// Per-G GC state// gcAssistBytes is this G's GC assist credit in terms of// bytes allocated. If this is positive, then the G has credit// to allocate gcAssistBytes bytes without assisting. If this// is negative, then the G must correct this by performing// scan work. We track this in bytes to make it fast to update// and check for debt in the malloc hot path. The assist ratio// determines how this corresponds to scan work debt.gcAssistBytesint64}

    Gobuf结构体

    typegobuf struct {spuintptrpcuintptrgguintptrctxtunsafe.Pointerretsys.Uintreglruintptrbpuintptr // for GOEXPERIMENT=framepointer}

    其中最主要的当然是sched了,保存了goroutine的上下文。goroutine切换的时候不同于线程有OS来负责这部分数据,而是由一个gobuf对象来保存,这样能够更加轻量级,再来看看gobuf的结构

    M结构体

    typem struct {g0*g    // 带有调度栈的goroutinegsignal*g        // 处理信号的goroutinetls[6]uintptr // thread-local storagemstartfnfunc()curg*g      // 当前运行的goroutinecaughtsigguintptrppuintptr // 关联p和执行的go代码nextppuintptridint32mallocingint32 // 状态spinningbool // m是否out of workblockedbool // m是否被阻塞inwbbool // m是否在执行写屏蔽printlockint8incgobool // m在执行cgo吗fastranduint32ncgocalluint64      // cgo调用的总数ncgoint32      // 当前cgo调用的数目parknotealllink*m // 用于链接allmschedlinkmuintptrmcache*mcache // 当前m的内存缓存lockedg*g // 锁定g在当前m上执行,而不会切换到其他mcreatestack[32]uintptr // thread创建的栈}

    结构体M中有两个G是需要关注一下的:

    一个是curg,代表结构体M当前绑定的结构体G。

    另一个是g0,是带有调度栈的goroutine,这是一个比较特殊的goroutine。普通的goroutine的栈是在堆上分配的可增长的栈,而g0的栈是M对应的线程的栈。所有调度相关的代码,会先切换到该goroutine的栈中再执行。也就是说线程的栈也是用的g实现,而不是使用的OS的。

    P结构体

    typep struct {lockmutexidint32statusuint32 // 状态,可以为pidle/prunning/...linkpuintptrschedtickuint32    // 每调度一次加1syscalltickuint32    // 每一次系统调用加1sysmonticksysmontickmmuintptr  // 回链到关联的mmcache*mcacheracectxuintptrgoidcacheuint64 // goroutine的ID的缓存goidcacheenduint64//可运行的goroutine的队列runqheaduint32runqtailuint32runq[256]guintptrrunnextguintptr // 下一个运行的gsudogcache[]*sudogsudogbuf[128]*sudogpallocpersistentAlloc // per-P to avoid mutexpad[sys.CacheLineSize]byte}

    其中P的状态有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列runqhead里面有可运行的goroutine,P优先从内部获取执行的g,这样能够提高效率。

    Schedt结构体

    typeschedtstruct{  goidgenuint64lastpolluint64lock mutex    midle        muintptr// idle状态的mnmidleint32// idle状态的m个数nmidlelockedint32// lockde状态的m个数mcountint32// 创建的m的总数maxmcountint32// m允许的最大个数ngsysuint32// 系统中goroutine的数目,会自动更新pidle      puintptr// idle的pnpidleuint32nmspinninguint32// 全局的可运行的g队列runqhead guintptr    runqtail guintptr    runqsizeint32// dead的G的全局缓存gflock      mutex    gfreeStack  *g    gfreeNoStack *g    ngfreeint32// sudog的缓存中心sudoglock  mutex    sudogcache *sudog}

    大多数需要的信息都已放在了结构体M、G和P中,schedt结构体只是一个壳。可以看到,其中有M的idle队列,P的idle队列,以及一个全局的就绪的G队列。schedt结构体中的Lock是非常必须的,如果M或P等做一些非局部的操作,它们一般需要先锁住调度器。

    2.5具体函数

    goroutine调度器的代码在/src/runtime/proc.go中,一些比较关键的函数分析如下。

    2.5.1 schedule函数

    schedule函数在runtime需要进行调度时执行,为当前的P寻找一个可以运行的G并执行它,寻找顺序如下:

    1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;

    2) 如果1)失败,则调用findrunnable函数去寻找一个可以执行的G;

    3) 如果2)也没有得到可以执行的G,那么结束调度,从上次的现场继续执行。

    4) 注意)//偶尔会先检查一次全局可运行队列,以确保公平性。否则,两个goroutine可以完全占用本地runqueue。 通过 schedtick计数 %61来保证

    代码如下:

    // One round of scheduler: find a runnable goroutine and execute it.// Never returns.funcschedule(){  _g_ := getg()if_g_.m.locks !=0{      throw("schedule: holding locks")  }if_g_.m.lockedg !=0{      stoplockedm()      execute(_g_.m.lockedg.ptr(),false)// Never returns.}// We should not schedule away from a g that is executing a cgo call,// since the cgo call is using the m's g0 stack.if_g_.m.incgo {      throw("schedule: in cgo")  } top:ifsched.gcwaiting !=0{      gcstopm()gototop  }if_g_.m.p.ptr().runSafePointFn !=0{      runSafePointFn()  }vargp *gvarinheritTimebooliftrace.enabled || trace.shutdown {      gp = traceReader()ifgp !=nil{        casgstatus(gp, _Gwaiting, _Grunnable)        traceGoUnpark(gp,0)      }  }ifgp ==nil&& gcBlackenEnabled !=0{      gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())  }ifgp ==nil{// Check the global runnable queue once in a while to ensure fairness.// Otherwise two goroutines can completely occupy the local runqueue// by constantly respawning each other.if_g_.m.p.ptr().schedtick%61==0&& sched.runqsize >0{        lock(&sched.lock)        gp = globrunqget(_g_.m.p.ptr(),1)        unlock(&sched.lock)      }  }ifgp ==nil{      gp, inheritTime = runqget(_g_.m.p.ptr())ifgp !=nil&& _g_.m.spinning {        throw("schedule: spinning with local work")      }  }ifgp ==nil{      gp, inheritTime = findrunnable()// blocks until work is available}// This thread is going to run a goroutine and is not spinning anymore,// so if it was marked as spinning we need to reset it now and potentially// start a new spinning M.if_g_.m.spinning {      resetspinning()  }ifgp.lockedm !=0{// Hands off own p to the locked m,// then blocks waiting for a new p.startlockedm(gp)gototop  }    execute(gp, inheritTime)}

    2.5.2 findrunnable函数

    findrunnable函数负责给一个P寻找可以执行的G,它的寻找顺序如下:

    1) 调用runqget函数来从P自己的runnable G队列中得到一个可以执行的G;

    2) 如果1)失败,调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;

    3) 如果2)失败,调用netpoll(非阻塞)函数取一个异步回调的G

    4) 如果3)失败,尝试从其他P那里偷取一半数量的G过来;

    5) 如果4)失败,再次调用globrunqget函数从全局runnableG队列中得到一个可以执行的G;

    6) 如果5)失败,调用netpoll(阻塞)函数取一个异步回调的G;

    7) 如果6)仍然没有取到G,那么调用stopm函数停止这个M。

    代码如下:

    // Finds a runnable goroutine to execute.// Tries to steal from other P's, get g from global queue, poll network.funcfindrunnable()(gp *g, inheritTimebool){  _g_ := getg()// The conditions here and in handoffp must agree: if// findrunnable would return a G to run, handoffp must start// an M.top:  _p_ := _g_.m.p.ptr()ifsched.gcwaiting !=0{      gcstopm()gototop  }if_p_.runSafePointFn !=0{      runSafePointFn()  }iffingwait && fingwake {ifgp := wakefing(); gp !=nil{        ready(gp,0,true)      }  }if*cgo_yield !=nil{      asmcgocall(*cgo_yield,nil)  }// local runqifgp, inheritTime := runqget(_p_); gp !=nil{returngp, inheritTime  }// global runqifsched.runqsize !=0{      lock(&sched.lock)      gp := globrunqget(_p_,0)      unlock(&sched.lock)ifgp !=nil{returngp,false}  }// Poll network.// This netpoll is only an optimization before we resort to stealing.// We can safely skip it if there are no waiters or a thread is blocked// in netpoll already. If there is any kind of logical race with that// blocked thread (e.g. it has already returned from netpoll, but does// not set lastpoll yet), this thread will do blocking netpoll below// anyway.ifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Load64(&sched.lastpoll) !=0{ifgp := netpoll(false); gp !=nil{// non-blocking// netpoll returns list of goroutines linked by schedlink.injectglist(gp.schedlink.ptr())        casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {            traceGoUnpark(gp,0)        }returngp,false}  }// Steal work from other P's.procs :=uint32(gomaxprocs)ifatomic.Load(&sched.npidle) == procs-1{// Either GOMAXPROCS=1 or everybody, except for us, is idle already.// New work can appear from returning syscall/cgocall, network or timers.// Neither of that submits to local run queues, so no point in stealing.gotostop  }// If number of spinning M's >= number of busy P's, block.// This is necessary to prevent excessive CPU consumption// when GOMAXPROCS>>1 but the program parallelism is low.if!_g_.m.spinning &&2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {gotostop  }if!_g_.m.spinning {      _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)  }fori :=0; i <4; i++ {forenum := stealOrder.start(fastrand()); !enum.done(); enum.next() {ifsched.gcwaiting !=0{gototop        }        stealRunNextG := i >2// first look for ready queues with more than 1 gifgp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp !=nil{returngp,false}      }  } stop:// We have nothing to do. If we're in the GC mark phase, can// safely scan and blacken objects, and have work to do, run// idle-time marking rather than give up the P.ifgcBlackenEnabled !=0&& _p_.gcBgMarkWorker !=0&& gcMarkWorkAvailable(_p_) {      _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode      gp := _p_.gcBgMarkWorker.ptr()      casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {        traceGoUnpark(gp,0)      }returngp,false}// Before we drop our P, make a snapshot of the allp slice,// which can change underfoot once we no longer block// safe-points. We don't need to snapshot the contents because// everything up to cap(allp) is immutable.allpSnapshot := allp// return P and blocklock(&sched.lock)ifsched.gcwaiting !=0|| _p_.runSafePointFn !=0{      unlock(&sched.lock)gototop  }ifsched.runqsize !=0{      gp := globrunqget(_p_,0)      unlock(&sched.lock)returngp,false}ifreleasep() != _p_ {      throw("findrunnable: wrong p")  }  pidleput(_p_)  unlock(&sched.lock)// Delicate dance: thread transitions from spinning to non-spinning state,// potentially concurrently with submission of new goroutines. We must// drop nmspinning first and then check all per-P queues again (with// #StoreLoad memory barrier in between). If we do it the other way around,// another thread can submit a goroutine after we've checked all run queues// but before we drop nmspinning; as the result nobody will unpark a thread// to run the goroutine.// If we discover new work below, we need to restore m.spinning as a signal// for resetspinning to unpark a new worker thread (because there can be more// than one starving goroutine). However, if after discovering new work// we also observe no idle Ps, it is OK to just park the current thread:// the system is fully loaded so no spinning threads are required.// Also see "Worker thread parking/unparking" comment at the top of the file.wasSpinning := _g_.m.spinningif_g_.m.spinning {      _g_.m.spinning =falseifint32(atomic.Xadd(&sched.nmspinning,-1)) <0{        throw("findrunnable: negative nmspinning")      }  }// check all runqueues once againfor_, _p_ :=rangeallpSnapshot {if!runqempty(_p_) {        lock(&sched.lock)        _p_ = pidleget()        unlock(&sched.lock)if_p_ !=nil{            acquirep(_p_)ifwasSpinning {              _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)            }gototop        }break}  }// Check for idle-priority GC work again.ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(nil) {      lock(&sched.lock)      _p_ = pidleget()if_p_ !=nil&& _p_.gcBgMarkWorker ==0{        pidleput(_p_)        _p_ =nil}      unlock(&sched.lock)if_p_ !=nil{        acquirep(_p_)ifwasSpinning {            _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)        }// Go back to idle GC check.gotostop      }  }// poll networkifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Xchg64(&sched.lastpoll,0) !=0{if_g_.m.p !=0{        throw("findrunnable: netpoll with p")      }if_g_.m.spinning {        throw("findrunnable: netpoll with spinning")      }      gp := netpoll(true)// block until new work is availableatomic.Store64(&sched.lastpoll,uint64(nanotime()))ifgp !=nil{        lock(&sched.lock)        _p_ = pidleget()        unlock(&sched.lock)if_p_ !=nil{            acquirep(_p_)            injectglist(gp.schedlink.ptr())            casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {              traceGoUnpark(gp,0)            }returngp,false}        injectglist(gp)      }  }  stopm()gototop}

    2.5.3 newproc函数

    newproc函数负责创建一个可以运行的G并将其放在当前的P的runnable G队列中,它是类似”go func() { … }”语句真正被编译器翻译后的调用,核心代码在newproc1函数。这个函数执行顺序如下:

    1) 获得当前的G所在的 P,然后从free G队列中取出一个G;

    2) 如果1)取到则对这个G进行参数配置,否则新建一个G;

    3) 将G加入P的runnable G队列。

    代码如下:

    // Go1.10.8版本默认stack大小为2KB_StackMin =2048// 创建一个g对象,然后放到g队列// 等待被执行// Create a new g running fn with narg bytes of arguments starting// at argp. callerpc is the address of the go statement that created// this. The new g is put on the queue of g's waiting to run.funcnewproc1(fn *funcval, argp *uint8, nargint32, callerpcuintptr){  _g_ := getg()iffn ==nil{      _g_.m.throwing =-1// do not dump full stacksthrow("go of nil func value")  }  _g_.m.locks++// disable preemption because it can be holding p in a local varsiz := narg  siz = (siz +7) &^7// We could allocate a larger initial stack if necessary.// Not worth it: this is almost always an error.// 4*sizeof(uintreg): extra space added below// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).ifsiz >= _StackMin-4*sys.RegSize-sys.RegSize {      throw("newproc: function arguments too large for new goroutine")  }    _p_ := _g_.m.p.ptr()  newg := gfget(_p_)ifnewg ==nil{      newg = malg(_StackMin)      casgstatus(newg, _Gidle, _Gdead)      allgadd(newg)// publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}ifnewg.stack.hi ==0{      throw("newproc1: newg missing stack")  }ifreadgstatus(newg) != _Gdead {      throw("newproc1: new g is not Gdead")  }    totalSize :=4*sys.RegSize +uintptr(siz) + sys.MinFrameSize// extra space in case of reads slightly beyond frametotalSize += -totalSize & (sys.SpAlign -1)// align to spAlignsp := newg.stack.hi - totalSize  spArg := spifusesLR {// caller's LR*(*uintptr)(unsafe.Pointer(sp)) =0prepGoExitFrame(sp)      spArg += sys.MinFrameSize  }ifnarg >0{      memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp),uintptr(narg))// This is a stack-to-stack copy. If write barriers// are enabled and the source stack is grey (the// destination is always black), then perform a// barrier copy. We do this *after* the memmove// because the destination stack may have garbage on// it.ifwriteBarrier.needed && !_g_.m.curg.gcscandone {        f := findfunc(fn.fn)        stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))// We're in the prologue, so it's always stack map index 0.bv := stackmapdata(stkmap,0)        bulkBarrierBitmap(spArg, spArg,uintptr(narg),0, bv.bytedata)      }  }    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))  newg.sched.sp = sp  newg.stktopsp = sp  newg.sched.pc = funcPC(goexit) + sys.PCQuantum// +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))  gostartcallfn(&newg.sched, fn)  newg.gopc = callerpc  newg.startpc = fn.fnif_g_.m.curg !=nil{      newg.labels = _g_.m.curg.labels  }ifisSystemGoroutine(newg) {      atomic.Xadd(&sched.ngsys, +1)  }  newg.gcscanvalid =falsecasgstatus(newg, _Gdead, _Grunnable)if_p_.goidcache == _p_.goidcacheend {// Sched.goidgen is the last allocated id,// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].// At startup sched.goidgen=0, so main goroutine receives goid=1._p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)      _p_.goidcache -= _GoidCacheBatch -1_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch  }  newg.goid =int64(_p_.goidcache)  _p_.goidcache++ifraceenabled {      newg.racectx = racegostart(callerpc)  }iftrace.enabled {      traceGoCreate(newg, newg.startpc)  }  runqput(_p_, newg,true)ifatomic.Load(&sched.npidle) !=0&& atomic.Load(&sched.nmspinning) ==0&& mainStarted {      wakep()  }  _g_.m.locks--if_g_.m.locks ==0&& _g_.preempt {// restore the preemption request in case we've cleared it in newstack_g_.stackguard0 = stackPreempt  }}

    2.5.4 goexit0函数

    goexit函数是当G退出时调用的。这个函数对G进行一些设置后,将它放入free G列表中,供以后复用,之后调用schedule函数调度。

    // goexit continuation on g0.funcgoexit0(gp *g){  _g_ := getg()//设置g的 status从 _Grunning变为 _Gdeadcasgstatus(gp, _Grunning, _Gdead)ifisSystemGoroutine(gp) {      atomic.Xadd(&sched.ngsys,-1)  }//对该g 进行释放设置 基本为nil /0gp.m =nillocked := gp.lockedm !=0gp.lockedm =0_g_.m.lockedg =0gp.paniconfault =falsegp._defer =nil// should be true already but just in case.gp._panic =nil// non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf =nilgp.waitreason =""gp.param =nilgp.labels =nilgp.timer =nilifgcBlackenEnabled !=0&& gp.gcAssistBytes >0{// Flush assist credit to the global pool. This gives// better information to pacing if the application is// rapidly creating an exiting goroutines.scanCredit :=int64(gcController.assistWorkPerByte *float64(gp.gcAssistBytes))      atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)      gp.gcAssistBytes =0}// Note that gp's stack scan is now "valid" because it has no// stack.gp.gcscanvalid =truedropg()if_g_.m.lockedInt !=0{print("invalid m->lockedInt = ", _g_.m.lockedInt,"\n")      throw("internal lockOSThread error")  }  _g_.m.lockedExt =0//把这个g 推到free G 列表gfput(_g_.m.p.ptr(), gp)iflocked {// The goroutine may have locked this thread because// it put it in an unusual kernel state. Kill it// rather than returning it to the thread pool.// Return to mstart, which will release the P and exit// the thread.ifGOOS !="plan9"{// See golang.org/issue/22227.gogo(&_g_.m.g0.sched)      }  }  schedule()}

    2.5.5 handoffp函数

    handoffp函数将P从系统调用或阻塞的M中传递出去,如果P还有runnable G队列,那么新开一个M,调用startm函数,新开的M不空旋。

    // Hands off P from syscall or locked M.// Always runs without a P, so write barriers are not allowed.//go:nowritebarrierrecfunchandoffp(_p_ *p){// handoffp must start an M in any situation where// findrunnable would return a G to run on _p_.//如果这个P的队列不为空或调度内的size不为空 那么 进行startm 且不空旋if!runqempty(_p_) || sched.runqsize !=0{      startm(_p_,false)return}//如果正在进行GC处理  同上ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(_p_) {      startm(_p_,false)return}//如果没活可做了,检查下有没有 空闲/自旋的 M//否则 不需要我们做自旋ifatomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) ==0&& atomic.Cas(&sched.nmspinning,0,1) {//TODO:fast atomicstartm(_p_,true)return}//调度上锁  将这个P 摘除走lock(&sched.lock)ifsched.gcwaiting !=0{      _p_.status = _Pgcstop      sched.stopwait--ifsched.stopwait ==0{        notewakeup(&sched.stopnote)      }      unlock(&sched.lock)return}if_p_.runSafePointFn !=0&& atomic.Cas(&_p_.runSafePointFn,1,0) {      sched.safePointFn(_p_)      sched.safePointWait--ifsched.safePointWait ==0{        notewakeup(&sched.safePointNote)      }  }ifsched.runqsize !=0{      unlock(&sched.lock)      startm(_p_,false)return}// If this is the last running P and nobody is polling network,// need to wakeup another M to poll network.ifsched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) !=0{      unlock(&sched.lock)      startm(_p_,false)return}  pidleput(_p_)  unlock(&sched.lock)}

    2.5.6 startm函数

    startm函数调度一个M或者必要时创建一个M来运行指定的P。

    // Schedules some M to run the p (creates an M if necessary).// If p==nil, tries to get an idle P, if no idle P's does nothing.// May run with m.p==nil, so write barriers are not allowed.// If spinning is set, the caller has incremented nmspinning and startm will// either decrement nmspinning or set m.spinning in the newly started M.//go:nowritebarrierrecfuncstartm(_p_ *p, spinning bool){//加锁lock(&sched.lock)if_p_ ==nil{            _p_ = pidleget()if_p_ ==nil{        unlock(&sched.lock)ifspinning {// The caller incremented nmspinning, but there are no idle Ps,// so it's okay to just undo the increment and give up.ifint32(atomic.Xadd(&sched.nmspinning, -1)) <0{throw("startm: negative nmspinning")            }        }return}  }      mp := mget()  unlock(&sched.lock)ifmp ==nil{varfnfunc()ifspinning {// The caller incremented nmspinning, so set m.spinning in the new M.fn = mspinning      }      newm(fn, _p_)return}ifmp.spinning {throw("startm: m is spinning")  }ifmp.nextp !=0{throw("startm: m has p")  }ifspinning && !runqempty(_p_) {throw("startm: p has runnable gs")  }// The caller incremented nmspinning, so set m.spinning in the new M.mp.spinning = spinning  mp.nextp.set(_p_)  notewakeup(&mp.park)}

    2.5.7 sysmon函数

    sysmon函数是Go runtime启动时创建的,负责监控所有goroutine的状态,判断是否需要GC,进行netpoll等操作。sysmon函数中会调用retake函数进行抢占式调度。

    // Always runs without a P, so write barriers are not allowed.////go:nowritebarrierrecfuncsysmon(){  lock(&sched.lock)  sched.nmsys++  checkdead()  unlock(&sched.lock)// If a heap span goes unused for 5 minutes after a garbage collection,// we hand it back to the operating system.scavengelimit :=int64(5*60*1e9)ifdebug.scavenge >0{// Scavenge-a-lot for testing.forcegcperiod =10*1e6scavengelimit =20*1e6}    lastscavenge := nanotime()  nscavenge :=0lasttrace :=int64(0)  idle :=0// how many cycles in succession we had not wokeup somebodydelay :=uint32(0)for{ifidle ==0{// start with 20us sleep...delay =20}elseifidle >50{// start doubling the sleep after 1ms...delay *=2}ifdelay >10*1000{// up to 10msdelay =10*1000}      usleep(delay)ifdebug.schedtrace <=0&& (sched.gcwaiting !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs)) {        lock(&sched.lock)ifatomic.Load(&sched.gcwaiting) !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs) {            atomic.Store(&sched.sysmonwait,1)            unlock(&sched.lock)// Make wake-up period small enough// for the sampling to be correct.maxsleep := forcegcperiod /2ifscavengelimit < forcegcperiod {              maxsleep = scavengelimit /2}            shouldRelax :=trueifosRelaxMinNS >0{              next := timeSleepUntil()              now := nanotime()ifnext-now < osRelaxMinNS {                  shouldRelax =false}            }ifshouldRelax {              osRelax(true)            }            notetsleep(&sched.sysmonnote, maxsleep)ifshouldRelax {              osRelax(false)            }            lock(&sched.lock)            atomic.Store(&sched.sysmonwait,0)            noteclear(&sched.sysmonnote)            idle =0delay =20}        unlock(&sched.lock)      }// trigger libc interceptors if neededif*cgo_yield !=nil{        asmcgocall(*cgo_yield,nil)      }// poll network if not polled for more than 10mslastpoll :=int64(atomic.Load64(&sched.lastpoll))      now := nanotime()ifnetpollinited() && lastpoll !=0&& lastpoll+10*1000*1000< now {        atomic.Cas64(&sched.lastpoll,uint64(lastpoll),uint64(now))        gp := netpoll(false)// non-blocking - returns list of goroutinesifgp !=nil{// Need to decrement number of idle locked M's// (pretending that one more is running) before injectglist.// Otherwise it can lead to the following situation:// injectglist grabs all P's but before it starts M's to run the P's,// another M returns from syscall, finishes running its G,// observes that there is no work to do and no other running M's// and reports deadlock.incidlelocked(-1)            injectglist(gp)            incidlelocked(1)        }      }// retake P's blocked in syscalls// and preempt long running G'sifretake(now) !=0{        idle =0}else{        idle++      }// check if we need to force a GCift := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) !=0{        lock(&forcegc.lock)        forcegc.idle =0forcegc.g.schedlink =0injectglist(forcegc.g)        unlock(&forcegc.lock)      }// scavenge heap once in a whileiflastscavenge+scavengelimit/2< now {        mheap_.scavenge(int32(nscavenge),uint64(now),uint64(scavengelimit))        lastscavenge = now        nscavenge++      }ifdebug.schedtrace >0&& lasttrace+int64(debug.schedtrace)*1000000<= now {        lasttrace = now        schedtrace(debug.scheddetail >0)      }  }}

    2.5.8 retake函数

    枚举所有的P 如果P在系统调用中(_Psyscall), 且经过了一次sysmon循环(20us~10ms), 则抢占这个P, 调用handoffp解除M和P之间的关联, 如果P在运行中(_Prunning), 且经过了一次sysmon循环并且G运行时间超过forcePreemptNS(10ms), 则抢占这个P

    并设置g.preempt = true,g.stackguard0 = stackPreempt。

    为什么设置了stackguard就可以实现抢占?

    因为这个值用于检查当前栈空间是否足够, go函数的开头会比对这个值判断是否需要扩张栈。

    newstack函数判断g.stackguard0等于stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。

    抢占机制保证了不会有一个G长时间的运行导致其他G无法运行的情况发生。

    funcretake(nowint64)uint32{  n :=0// Prevent allp slice changes. This lock will be completely// uncontended unless we're already stopping the world.lock(&allpLock)// We can't use a range loop over allp because we may// temporarily drop the allpLock. Hence, we need to re-fetch// allp each time around the loop.fori :=0; i 0&& pd.syscallwhen+10*1000*1000> now {continue}// Drop allpLock so we can take sched.lock.unlock(&allpLock)// Need to decrement number of idle locked M's// (pretending that one more is running) before the CAS.// Otherwise the M from which we retake can exit the syscall,// increment nmidle and report deadlock.incidlelocked(-1)ifatomic.Cas(&_p_.status, s, _Pidle) {iftrace.enabled {              traceGoSysBlock(_p_)              traceProcStop(_p_)            }            n++            _p_.syscalltick++            handoffp(_p_)        }        incidlelocked(1)        lock(&allpLock)      }elseifs == _Prunning {// Preempt G if it's running for too long.t :=int64(_p_.schedtick)ifint64(pd.schedtick) != t {            pd.schedtick =uint32(t)            pd.schedwhen = nowcontinue}ifpd.schedwhen+forcePreemptNS > now {continue}        preemptone(_p_)      }  }  unlock(&allpLock)returnuint32(n)}

    3、调度器总结

    3.1 调度器的两大思想

    复用线程:协程本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有2个体现:1)work stealing,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。2)handoff,当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。

    利用并行:GOMAXPROCS设置P的数量,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程处于运行状态,这些线程可能分布在多个CPU核上同时运行,使得并发利用并行。另外,GOMAXPROCS也限制了并发的程度,比如GOMAXPROCS = 核数/2,则最多利用了一半的CPU核进行并行。

    3.2调度器的两小策略:

    抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死,这就是goroutine不同于coroutine的一个地方。

    全局G队列:在新的调度器中依然有全局G队列,但功能已经被弱化了,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

    4、参考资料

    Golang代码仓库:https://github.com/golang/go

    《ScalableGo Schedule》:https://docs.google.com/docum...

    《GoPreemptive Scheduler》:https://docs.google.com/docum...

    网上文章:

    https://studygolang.com/artic...

    https://studygolang.com/artic...

    https://studygolang.com/artic...

    https://studygolang.com/artic...

    https://studygolang.com/artic... 调度实例分析 

    https://www.cnblogs.com/sunsk... 抢占式

    https://blog.csdn.net/u010853... schedule 剖析理解 分析的很到位--建议大家认真阅读几遍-因为图形很形象。

    相关文章

      网友评论

          本文标题:Go scheduler 源码分析

          本文链接:https://www.haomeiwen.com/subject/kuzyiqtx.html