美文网首页
Golang 中 GPM 之 G 的执行(一、准备阶段)

Golang 中 GPM 之 G 的执行(一、准备阶段)

作者: 未tu | 来源:发表于2020-10-04 19:02 被阅读0次

    从之前的文章《Golang 中 GPM 之 G 从哪里来》,我们知道了 Golang 对于使用 G 是有一定的复用机制,这样减少了资源的浪费,接下来我们要看下 G 生成后会怎么样

    func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
      ...
      newg := gfget(_p_)
      if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead) //这里将新建的 G 中对应的状态是从 Gidle -> Gdead 中 
        ...
      }
      // 以上我们得到了这次新产生的 G 
      ...
      newg.startpc = fn.fn
      ...
      casgstatus(newg, _Gdead, _Grunnable) // 当新的 G 初始化,并将其状态变更  Gdead -> Grunnable
      // 并将新的 G 放到当前 G 运行的 M 对应的 P 中
      runqput(_p_, newg, true)
      ...
    }
    

    那接下来的重点就在 runqput(_p_ *p, gp *g, next bool)

    // runqput 优先将 G 放在本地的可运行队列
    // 如果 next 是 false ,则加入可运行队列的队列的尾部
    // 如果 next 是 true ,则 runqput 将 G 放到当前 P 下一个执行的位置
    // 如果当前 P 队列已经满了,则会将当前队列的 “一半+1” 放到 G 的全局队列中
    func runqput(_p_ *p, gp *g, next bool) { 
      // 这里表示如果是随机的,则不将新产生的 G 放到当前 P 的 runnext
      if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
      }
    ​
      // 判断是否将新增的 G 放到当前 P 的 runnext,
      // 后续将被替换出来的 G 放到本地 G 队列的尾部
      if next {
      retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
          goto retryNext
        }
        if oldnext == 0 {
          return
        }
    ​
        gp = oldnext.ptr()
      }
    ​
    retry:
      // atomic.LoadAcq 获取上次 atomic.CaSRel 的 _p_.runqhead 的值
      h := atomic.LoadAcq(&_p_.runqhead) 
      t := _p_.runqtail 
      // 这里的 _p_.runq 的数据类型是[256]guintptr,说明总计一个 P 最多
      // 有 256 个 G 放在本地可运行队列中和 1 个接下来运行的 G(runnext)
      if t-h < uint32(len(_p_.runq)) { 
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)//通过 % 说明这是一个循环队列
        atomic.StoreRel(&_p_.runqtail, t+1) //将末尾的位置 +1
        return
      }
      //如果队列满了,将当前 P 本地可运行队列的一半以及 oldnext 放到全局队列中
      if runqputslow(_p_, gp, h, t) {
        return
      }
    ​
      goto retry
    }
    

    runqputslow(_p_ *p, gp *g, h, t uint32) 的实现细节可以看下面

    func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
      //这里的长度取的是 p 本地可运行队列总长的一半 +1,即 256/2+1
      var batch [len(_p_.runq)/2 + 1]*g 
    ​
    ​
      n := t - h
      n = n / 2
      if n != uint32(len(_p_.runq)/2) {
        throw("runqputslow: queue is not full")
      }
      // 这个 for 循环次数就是可运行队列总量的一半
      for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
      }
      
      // 这里根据 _p_.runqhead 判断是否还是老位置,
      // 如果是的话,则将起始位置增加 n ,如果失败则说明起始位置有变更过
      if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
        return false
      }
      batch[n] = gp
    ​
      if randomizeScheduler { // 这里只是将 batch 里面的 G 来进行随机排序
        for i := uint32(1); i <= n; i++ {
          j := fastrandn(i + 1)
          batch[i], batch[j] = batch[j], batch[i]
        }
      }
    ​
      // 这里的意思是将排序后的 batch 用链表的形式绑定起来
      for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
      }
      var q gQueue 
      q.head.set(batch[0])
      q.tail.set(batch[n])
      lock(&sched.lock)
      
      // 这里是将这个 batch 放到全局可运行队列(这使用了批量转移),
      // 且将 batch 放在全局可运行队列的尾部
      globrunqputbatch(&q, int32(n+1)) 
      
      unlock(&sched.lock)
      return true
    }
    

    总结:
    第一步:生成一个状态为 _Grunnable 的新 G
    第二步:将新 G 放到当前 G 相关的 P 的下一个运行位(即直接插队成下一个要执行的G,替换出来的 G 就成为 oldnext)
    第三步:根据 oldnext 的情况
    (1)如果 oldnext 本来就没有,则结束
    (2)如果有,则接下去后面的步骤
    第四步:根据当前 P 的本地可运行队列是否满了
    (1)如果未满,则将 oldnext 放到本地可运行队列的后面
    (2)如果满了,则将当前 P 本地可运行队列队列的一半以及当前的 oldnext 放到一起放到全局可运行队列的尾部

    相关文章

      网友评论

          本文标题:Golang 中 GPM 之 G 的执行(一、准备阶段)

          本文链接:https://www.haomeiwen.com/subject/fzinuktx.html