美文网首页
从ants看goroutine 2022-06-07

从ants看goroutine 2022-06-07

作者: 9_SooHyun | 来源:发表于2022-06-07 17:56 被阅读0次

    https://cloud.tencent.com/developer/news/598822 ants作者本人博客

    goroutine 混合型线程模型

    Goroutine并非传统意义上的协程
    主流的线程模型分三种:内核级线程模型、用户级线程模型和两级线程模型

    传统的协程库属于用户级线程模型,线程调度是在用户层面完成的,例如python通过send & yield 实现的协程协作

    python n = yield r,赋值语句先计算= 右边,由于右边是 yield 语句,所以yield语句执行完以后,进入暂停,而赋值语句在下一次启动生成器的时候首先被执行,赋的是什么值呢: send(value)中的value;

    而 Goroutine 和它的Go Scheduler在底层实现上其实是属于两级线程模型

    任何用户线程最终肯定都是要交由 OS 线程来执行的,Goroutine(称为 G)也不例外,但是 G 并不直接绑定 OS 线程运行,而是由 Goroutine Scheduler 中的 P - Logical Processor (逻辑处理器)来作为两者的『中介』,P 可以看作是一个抽象的资源或者一个上下文,一个 P 绑定一个 OS 线程,在 Golang 的实现里把 OS 线程抽象成一个数据结构:M,G 实际上是由 M 通过 P 来进行调度运行的,但是在 G 的层面来看,P 提供了 G 运行所需的一切资源和环境,因此在 G 看来 P 就是运行它的 “CPU”,由 G、P、M 这三种由 Go 抽象出来的实现,最终形成了 Go 调度器的基本结构:

    G: 表示 Goroutine,每个 Goroutine 对应一个 G 结构体,G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用。G 并非执行体,每个 G 需要绑定到 P 才能被调度执行。

    P: Processor,表示逻辑处理器, 对 G 来说,P 相当于 CPU 核,G 只有绑定到 P(在 P 的 local runq 中)才能被调度。对 M 来说,P 提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等,P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量),P 的数量由用户设置的 GoMAXPROCS 决定,但是不论 GoMAXPROCS 设置为多大,P 的数量最大为 256。

    M: Machine,OS 线程抽象,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取 G,切换到 G 的执行栈上并执行 G 的函数,调用 Goexit 做清理工作并回到 M,如此反复。M 并不保留 G 状态,这是 G 可以跨 M 调度的基础,M 的数量是不定的,由 Go Runtime 调整,为了防止创建过多 OS 线程导致系统调度不过来,目前默认最大限制为 10000 个。

    goroutine的阻塞

    注意,用户级协程一般都要求是非阻塞的。因为若干用户级协程实际都绑定在一个内核线程上(n:1),如果协程会产生阻塞的话,那么所绑定的内核线程就阻塞了,那么cpu控制权就可能被调动到别的线程上,于是这一系列协程就全部阻塞了。所以很多的协程库会把自己一些阻塞的操作重新封装为完全的非阻塞形式,然后在以前要阻塞的点上,主动让出当前协程,并通过某种方式通知或唤醒其他待执行的用户协程在该 KSE(Kernel Scheduling Entity,也就是内核线程) 上运行,从而避免了内核调度器由于 KSE 阻塞而做上下文切换,这样整个进程也不会被阻塞了

    goroutine阻塞调度

    • 用户态阻塞/唤醒
      当 Goroutine 因为 channel 操作或者 network I/O 而阻塞时(实际上 Golang 已经用 netpoller 实现了 Goroutine 网络 I/O 阻塞不会导致 M 被阻塞,仅阻塞 G,这里仅仅是举个栗子),对应的 G 会被放置到某个 wait 队列(如 channel 的 waitq),该 G 的状态由_Gruning变为_Gwaitting,而 M 会跳过该 G 尝试获取并执行下一个 G,如果此时没有 runnable 的 G 供 M 运行,那么 M 将解绑 P,并进入 sleep 状态;当阻塞的 G 被另一端的 G2 唤醒时(比如 channel 的可读/写通知),G 被标记为 runnable,尝试加入 G2 所在 P 的 runnext,然后再是 P 的 Local 队列和 Global 队列。

    • 系统调用阻塞
      当 G 被阻塞在某个系统调用上时,此时 G 会阻塞在_Gsyscall状态,M 也处于 block on syscall 状态,此时的 M 可被抢占调度:执行该 G 的 M 会与 P 解绑,而 P 则尝试与其它 idle 的 M 绑定,继续执行其它 G。如果没有其它 idle 的 M,但 P 的 Local 队列中仍然有 G 需要执行,则创建一个新的 M;当系统调用完成后,G 会重新尝试获取一个 idle 的 P 进入它的 Local 队列恢复执行,如果没有 idle 的 P,G 会被标记为 runnable 加入到 Global 队列

    goroutine 爆内存

    当我们无限开goroutine的时候,每个goroutine即使只占用8KB,百万千万个goroutine也会打爆内存

    ants的goroutine池化复用

    goroutine池化复用就是goroutine的I/O多路复用,避免goroutine撑爆内存的情况

    ants.Pool结构体

    // Pool accepts the tasks from client, it limits the total of goroutines to a given number by recycling goroutines.
    type Pool struct {
        // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
        // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
        // which submits a new task to the same pool.
        capacity int32
    
        // running is the number of the currently running goroutines.
        running int32
    
        // lock for protecting the worker queue.
        lock sync.Locker
    
        // workers is a slice that store the available workers.
      // workers存储空闲可用的goWorker,一般是一个queue
        workers workerArray
    
        // state is used to notice the pool to closed itself.
        state int32
    
        // cond for waiting to get an idle worker.
        cond *sync.Cond
    
        // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
      // 查找空闲可用的goWorker时,先去workers找,workers不存在空闲可用的goWorker,再到workerCache里面找
      // 那workerCache里面的goWorker怎么来的呢?已创建的goWorker在其关联的goroutine return的时候,
      // 这个goWorker失去了干活的goroutine,它的“今生”就结束了,然后会被放入workerCache,而不会被放回workers
      // 第一版的ants.Pool是没有workerCache的。放入workerCache是一种优化,当workers无空闲goWorker的时候,
      // 需要new一个goWorker去处理task,而在workerCache机制下,可以复用之前“死掉”的goWorker
        workerCache sync.Pool
    
        // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
        waiting int32
    
        heartbeatDone int32
        stopHeartbeat context.CancelFunc
    
        options *Options
    }
    

    池化的2个核心:如何分配worker & 同一个worker在执行不同任务时如何复用同一个goroutine

    • 如何分配worker
      retrieveWorker方法描述了worker分配算法
    初始化 worker 池;
    提交任务给 worker 池,检查是否有空闲的 worker:
    
    - 有,获取空闲 worker
    - 无,检查池中的 worker 数量是否已到池容量上限:
    
      - 已到上限,检查 worker 池是否是非阻塞的:
       - 非阻塞,直接返回nil表示执行失败
       - 阻塞,等待 worker 空闲
    
      - 未到上限,创建一个新的 worker 处理任务
    任务处理完成,将 worker 交还给池,以待处理下一个任务
    
    // retrieveWorker returns an available worker to run the tasks.
    func (p *Pool) retrieveWorker() (w *goWorker) {
        spawnWorker := func() {
            w = p.workerCache.Get().(*goWorker)
            w.run()
        }
    
        p.lock.Lock()
    
        w = p.workers.detach()
        if w != nil { // first try to fetch the worker from the queue
            p.lock.Unlock()
        } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
            // if the worker queue is empty and we don't run out of the pool capacity,
            // then just spawn a new worker goroutine.
            p.lock.Unlock()
        // 从workerCache里“取”一个goWorker
            spawnWorker()
        } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
            if p.options.Nonblocking {
                p.lock.Unlock()
                return
            }
        retry:
            if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
                p.lock.Unlock()
                return
            }
            p.addWaiting(1)
            p.cond.Wait() // block and wait for an available worker
            p.addWaiting(-1)
    
            if p.IsClosed() {
                p.lock.Unlock()
                return
            }
    
            var nw int
            if nw = p.Running(); nw == 0 { // awakened by the scavenger
                p.lock.Unlock()
                spawnWorker()
                return
            }
            if w = p.workers.detach(); w == nil {
                if nw < p.Cap() {
                    p.lock.Unlock()
                    spawnWorker()
                    return
                }
                goto retry
            }
            p.lock.Unlock()
        }
        return
    }
    
    • 同一个worker在执行不同任务时如何复用同一个goroutine
      worker和goroutine是一一对应的关系,一个worker在生命周期内只会在worker被创建的时候开启一个goroutine,goroutine内死循环监听worker.task,一旦worker.task有任务就执行。有点类似worker是master,goroutine是真正干活的slave。pool选择一个worker,往worker.task里面塞入任务,监听该worker的goroutine发现任务立马取出执行
    // run starts a goroutine to repeat the process
    // that performs the function calls.
    func (w *goWorker) run() {
        w.pool.addRunning(1)
        go func() {
            defer func() {
                w.pool.addRunning(-1)
          // w.pool.workerCache是一个sync.Pool
          // 每个goroutine return前,将当前goWorker放回workerCache池子里,实现goWorker复用
                w.pool.workerCache.Put(w)
          // 处理panic
                if p := recover(); p != nil {
                    if ph := w.pool.options.PanicHandler; ph != nil {
                        ph(p)
                    } else {
                        w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
                        var buf [4096]byte
                        n := runtime.Stack(buf[:], false)
                        w.pool.options.Logger.Printf("worker exits from panic: %s\n", string(buf[:n]))
                    }
                }
                // Call Signal() here in case there are goroutines waiting for available workers.
                w.pool.cond.Signal()
            }()
    
        // 不停监听related worker的task channel
            for f := range w.task {
          // 什么时候task channel会传来nil呢?当要清理这个worker的时候,给w.task传入nil
          // 例如,pool自身用来清理过期worker的goroutine会给过期worker传入nil,来终止该worker关联的goroutine
    
          // 1. 先看看是不是要终止goroutine
                if f == nil {
                    return
                }
          // 2. goroutine干活
                f()
          // 3. goroutine干完活了,worker回收回workers,复用去接受新task
                if ok := w.pool.revertWorker(w); !ok {
                    return
                }
            }
        }()
    }
    

    相关文章

      网友评论

          本文标题:从ants看goroutine 2022-06-07

          本文链接:https://www.haomeiwen.com/subject/ihiemrtx.html