sync.WaitGroup 是控制一组 goroutine 的并发控制技术
- 信号量
信号量是UNIX 系统提供的一种保护共享资源的,用于防止多个线程同时访问谋和资源,信号量可以简单理解为一个数值:- 当信号量 > 0 时,表示资源可用,获取信号量时系统自动将信号量减1
- 当信号量 == 0 时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒。
WaitGroup 数据结构
type WaitGroup struct {
state1 [3]uint32
}
state1
是一个长度为3的数组:
- counter
当前还未执行结束的 goroutine 计数器 - waiter count
等待 结束的 goroutine 数量,即有多少个等候者 - semaphore
信号量
这里为什么要定义1个数组,而不是分别定义3个变量呢?
WaitGroup 对外提供了3个接口:
- Add(delta int) 将 delta 值加到 counter 中。
- Wait() waiter 递增1,并阻塞等待信号量。
- Done() counter 递减1,按照 waiter 数值释放相应次数的信号量。
Add
Add 做了2 件事,一是把 delta 值累加到 counter 中,因为delta 可以为负值,也就是说有可能变为 0 或 负值。所以第二件事就是当 couner 的值变为0时,根据 waiter 数值释放等量的信号量,将调用Wait()
而等待 goroutine 的全部唤醒,如果 couner 为负值,则触发panic。
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() // 获取state 和 semaphore 地址指针
state := atomic.AddUint64(statep, uint64(delta)<<32) // 把delta 左移2位和累加到state中,即加counter
v := int32(state >> 32) // 获取counter
w := uint32(state) // 获取waiter
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 经过累加后,此时counter >= 0
// 如果counter 为正,不需要释放信号量。如果waiter 为0,则说明没有等待者,也不需要释放信号量
if v > 0 || w == 0 {
return
}
// 此时 counter 一定等于0,而waiter 一定大于0
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0) // 释放信号量,释放一次,唤醒一个等待者
}
}
wait
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 获取counter
w := uint32(state) // 获取waiter
if v == 0 {
// Counter == 0,说明所有的 goroutine 都退出了,不需要等待。
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap) // 累加成功后,等待信号量唤醒自己
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Done
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
网友评论