美文网首页
sync.WaitGroup

sync.WaitGroup

作者: wayyyy | 来源:发表于2022-05-01 00:08 被阅读0次

sync.WaitGroup 是控制一组 goroutine 的并发控制技术

  • 信号量
    信号量是UNIX 系统提供的一种保护共享资源的,用于防止多个线程同时访问谋和资源,信号量可以简单理解为一个数值:
    • 当信号量 > 0 时,表示资源可用,获取信号量时系统自动将信号量减1
    • 当信号量 == 0 时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。
WaitGroup 数据结构
type WaitGroup struct {
    state1 [3]uint32
}

state1 是一个长度为3的数组:

  • counter
    当前还未执行结束的 goroutine 计数器
  • waiter count
    等待 结束的 goroutine 数量,即有多少个等候者
  • semaphore
    信号量
image.png

这里为什么要定义1个数组,而不是分别定义3个变量呢?

WaitGroup 对外提供了3个接口:

  • Add(delta int) 将 delta 值加到 counter 中。
  • Wait() waiter 递增1,并阻塞等待信号量。
  • Done() counter 递减1,按照 waiter 数值释放相应次数的信号量。
Add

Add 做了2 件事,一是把 delta 值累加到 counter 中,因为delta 可以为负值,也就是说有可能变为 0 或 负值。所以第二件事就是当 couner 的值变为0时,根据 waiter 数值释放等量的信号量,将调用Wait()而等待 goroutine 的全部唤醒,如果 couner 为负值,则触发panic。

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()  // 获取state 和 semaphore 地址指针

    state := atomic.AddUint64(statep, uint64(delta)<<32)  // 把delta 左移2位和累加到state中,即加counter
    v := int32(state >> 32)  // 获取counter 
    w := uint32(state)  // 获取waiter
    
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 经过累加后,此时counter >= 0
    // 如果counter 为正,不需要释放信号量。如果waiter 为0,则说明没有等待者,也不需要释放信号量
    if v > 0 || w == 0 {
        return
    }

    // 此时 counter 一定等于0,而waiter 一定大于0
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)  // 释放信号量,释放一次,唤醒一个等待者
    }
}
wait
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()

    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)   // 获取counter 
        w := uint32(state)  // 获取waiter
        if v == 0 {
            // Counter == 0,说明所有的 goroutine 都退出了,不需要等待。
            return
        }
        // Increment waiters count.
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)  // 累加成功后,等待信号量唤醒自己
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}
Done
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

相关文章

网友评论

      本文标题:sync.WaitGroup

      本文链接:https://www.haomeiwen.com/subject/zyovxrtx.html