环境
go1.20
1.WaitGroup概览
当我们需要把一个任务拆分给多个g完成,并且要等待所有g完成工作才能进入下一步时我们可以怎么做?
1.主协程G休眠time.Sleep足够的时间
2.select阻塞住
3.使用waitGroup
waitGroup使用案例,需要注意的add和done需要匹配,加多了wait就一直阻塞,引起g泄露,减多了就直接panic
2.waitgroup实现
2.1主要结构体
waitgroup和pool一样也是要求不能值赋值的(nocopy是一个特殊的结构体,给编译器看的)
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64
sema uint32
}
//sync/cond.go
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
sema和state是不是很眼熟,和互斥锁Mutex一样分别存储信号量和状态。1.17版本中将state和sema保存在一个12个字节无符号整型成员state1当中,1.20的为state占8个字节,这也是一个复合字段,前4个表示计数,后4个表示等待数量,即等价于
//等价于,这是抽象出来的东西,方便后续理解
type state struct {
count uint32
waitcount uint32
}
waitcount指调用Wait方法的协程g
2.2Add方法
func (wg *WaitGroup) Add(delta int) {
//更新计数器,<<和>>是左移和右移操作
// 先赋值
state := wg.state.Add(uint64(delta) << 32)
//获取计数值,state.count
v := int32(state >> 32)
//获取等待者数量 state.wait
w := uint32(state)
//出现负数程序崩溃
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// w!=0说明已经有人在等待了
// delta>0&&v==int32(delta) 表示目前在添加计数器
// 添加计数器的时候,同时添加等待者规定为非法操作,panic
// 添加等待者必须在添加计数操作之后
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//计数器v不为0,不需要唤醒等待者
// 等待者w为0,没有等待者,不需要唤醒
if v > 0 || w == 0 {
return
}
//双重检查double-check又见面了
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//程序到达这步说明v=0,w>0,
// v=0意味着任务都执行完了,
//唤醒等待者,并重置状态,方便复用
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
到这里我们就知道了为啥减多了就会panic,因为这行代码
if v < 0 {
panic("sync: negative WaitGroup counter")
}
2.2Done
Done是对Add的调用
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
注意wg.Done不要过多调用
2.3Wait
阅读Add我们知道,实际是对state.count的操作,那么wait就应该是对state.waitcount的操作了
func (wg *WaitGroup) Wait() {
for {
//获取对应的值
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
//再检查一下是不是没有任务了
//没有就直接返回,不卡住主协程
if v == 0 {return}
//CAS操作
//到这一步说明计数值v>0,那么state.waitcount+=1
//这一步会有数据竞争,即多个Wait调用,或执行这步时同时有add调用
//如果增加没有成功就继续for循环重新走一步,
if wg.state.CompareAndSwap(state, state+1) {
//添加成功就休眠等待被唤醒啦
runtime_Semacquire(&wg.sema)
//发现还有任务就报错
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
所以看出协程在Wait休眠时,添加Add是无效的,还会引发panic,因为被唤醒时还会进行二次检查
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
再极端点的情况就是Add和Wait的同步调用可能会引发panic,因为你不知道休眠会在什么时候执行,如例子
func doSomething(wg *sync.WaitGroup) {
wg.Add(1)
fmt.Println("do something")
defer wg.Done()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
go doSomething(&wg)
}
wg.Wait()
fmt.Println("main")
}
可能在你wait的时候,最后一个协程刚执行任务,还没add,你就休眠了,醒来发现漏了一个任务,就触发panic
正确的是确保wait和add是同步代码,按顺序执行
wg.Add(3)
for i := 0; i < 3; i++ {
go doSomething(&wg)
}
wg.Wait()
小结一下: 打个比方,
- Waitgroup是饭店,
- Wait就是消费者,但是这个消费者是大胃王,为了展现他们的实力(可能也是处女座),要等待菜上全了才开动,即state.count==0,可能存在多个大胃王一起等待,菜还没上全就睡觉,他们的睡觉前必须确保菜品都决定好了,否则就投诉(panic);
- ADD是厨子,厨子也不是只有一个,谁想到了今天的菜品谁就Add几道菜
- Done一下就把一道菜往餐桌上送,直到把今天的菜品全部完成,就走出厨房叫醒他们开饭(继续执行wait之后的代码),谁把餐桌上的菜收回去(Done减多了)就会立即被投诉(panic)
- 之后厨子们打扫厨房,(wg.state.Store(0))等下一批大胃王
3.Cond
Cond也能让协程主动进入休眠,当收到信号量时被唤醒。这么一看是不是很像channel?是的,所以 大部分场景都被channel给替代了,所以这部分作为了解即可
type copyChecker uintptr
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
func (c *Cond) Wait() //休眠
func (c *Cond) Signal() //发送一个唤醒信号
func (c *Cond) Broadcast() //发送全部唤醒信号
- L是为了保护notify
- notify是一个 挂着g的链表结构
- checker是为了检测自身是否被值复制使用的
//sync/runtime2.go
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
wait和notify分别表示当前正在等待和已经通知到的G的索引
3.1方法
Wait
func (c *Cond) Wait() {
c.checker.check()
//将等待器的计数+1
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 获取当前g将它追加到链表末端,之后就陷入休眠
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
我们可以看到Wait是先解锁再加锁的,所以使用前不加锁是会出问题的
Signal和Broadcast
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
内容很简单,发送唤醒信号
网友评论