美文网首页
GO阅读-Sync包-WaitGroup和Cond

GO阅读-Sync包-WaitGroup和Cond

作者: 温岭夹糕 | 来源:发表于2023-03-27 21:44 被阅读0次

环境

go1.20

1.WaitGroup概览

当我们需要把一个任务拆分给多个g完成,并且要等待所有g完成工作才能进入下一步时我们可以怎么做?
1.主协程G休眠time.Sleep足够的时间
2.select阻塞住
3.使用waitGroup
waitGroup使用案例,需要注意的add和done需要匹配,加多了wait就一直阻塞,引起g泄露,减多了就直接panic

2.waitgroup实现

2.1主要结构体

waitgroup和pool一样也是要求不能值赋值的(nocopy是一个特殊的结构体,给编译器看的)

type WaitGroup struct {
    noCopy noCopy

    state atomic.Uint64 
    sema  uint32
}

//sync/cond.go
type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

sema和state是不是很眼熟,和互斥锁Mutex一样分别存储信号量和状态。1.17版本中将state和sema保存在一个12个字节无符号整型成员state1当中,1.20的为state占8个字节,这也是一个复合字段,前4个表示计数,后4个表示等待数量,即等价于

//等价于,这是抽象出来的东西,方便后续理解
type state struct {
     count     uint32
     waitcount uint32
}

waitcount指调用Wait方法的协程g

2.2Add方法

add

func (wg *WaitGroup) Add(delta int) {
//更新计数器,<<和>>是左移和右移操作
// 先赋值
    state := wg.state.Add(uint64(delta) << 32)
//获取计数值,state.count 
    v := int32(state >> 32)
//获取等待者数量 state.wait
    w := uint32(state)
//出现负数程序崩溃
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
// w!=0说明已经有人在等待了
// delta>0&&v==int32(delta) 表示目前在添加计数器
// 添加计数器的时候,同时添加等待者规定为非法操作,panic
// 添加等待者必须在添加计数操作之后
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
//计数器v不为0,不需要唤醒等待者
// 等待者w为0,没有等待者,不需要唤醒
    if v > 0 || w == 0 {
        return
    }
//双重检查double-check又见面了
    if wg.state.Load() != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
//程序到达这步说明v=0,w>0,
// v=0意味着任务都执行完了,
//唤醒等待者,并重置状态,方便复用
    wg.state.Store(0)
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false, 0)
    }
}

到这里我们就知道了为啥减多了就会panic,因为这行代码

    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }

2.2Done

Done是对Add的调用

func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

注意wg.Done不要过多调用

2.3Wait

阅读Add我们知道,实际是对state.count的操作,那么wait就应该是对state.waitcount的操作了

func (wg *WaitGroup) Wait() {
    for {
//获取对应的值
        state := wg.state.Load()
        v := int32(state >> 32)
        w := uint32(state)
//再检查一下是不是没有任务了
//没有就直接返回,不卡住主协程
        if v == 0 {return}
//CAS操作
//到这一步说明计数值v>0,那么state.waitcount+=1
//这一步会有数据竞争,即多个Wait调用,或执行这步时同时有add调用
//如果增加没有成功就继续for循环重新走一步,
        if wg.state.CompareAndSwap(state, state+1) {
//添加成功就休眠等待被唤醒啦
            runtime_Semacquire(&wg.sema)
//发现还有任务就报错
            if wg.state.Load() != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

所以看出协程在Wait休眠时,添加Add是无效的,还会引发panic,因为被唤醒时还会进行二次检查

            runtime_Semacquire(&wg.sema)
            if wg.state.Load() != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }

再极端点的情况就是Add和Wait的同步调用可能会引发panic,因为你不知道休眠会在什么时候执行,如例子

func doSomething(wg *sync.WaitGroup) {
    wg.Add(1)
    fmt.Println("do  something")
    defer wg.Done()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        go doSomething(&wg)
    }

    wg.Wait()
    fmt.Println("main")
}

可能在你wait的时候,最后一个协程刚执行任务,还没add,你就休眠了,醒来发现漏了一个任务,就触发panic
正确的是确保wait和add是同步代码,按顺序执行

    wg.Add(3)
    for i := 0; i < 3; i++ {
        go doSomething(&wg)
    }

    wg.Wait()

小结一下: 打个比方,

  • Waitgroup是饭店,
  • Wait就是消费者,但是这个消费者是大胃王,为了展现他们的实力(可能也是处女座),要等待菜上全了才开动,即state.count==0,可能存在多个大胃王一起等待,菜还没上全就睡觉,他们的睡觉前必须确保菜品都决定好了,否则就投诉(panic);
  • ADD是厨子,厨子也不是只有一个,谁想到了今天的菜品谁就Add几道菜
  • Done一下就把一道菜往餐桌上送,直到把今天的菜品全部完成,就走出厨房叫醒他们开饭(继续执行wait之后的代码),谁把餐桌上的菜收回去(Done减多了)就会立即被投诉(panic)
  • 之后厨子们打扫厨房,(wg.state.Store(0))等下一批大胃王

3.Cond

Cond也能让协程主动进入休眠,当收到信号量时被唤醒。这么一看是不是很像channel?是的,所以 大部分场景都被channel给替代了,所以这部分作为了解即可

type copyChecker uintptr

type Cond struct {
    noCopy noCopy
    L Locker
    notify  notifyList
    checker copyChecker
}

func (c *Cond) Wait() //休眠
func (c *Cond) Signal() //发送一个唤醒信号
func (c *Cond) Broadcast() //发送全部唤醒信号
  • L是为了保护notify
  • notify是一个 挂着g的链表结构
  • checker是为了检测自身是否被值复制使用的
//sync/runtime2.go
type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr // key field of the mutex
    head   unsafe.Pointer
    tail   unsafe.Pointer
}

wait和notify分别表示当前正在等待和已经通知到的G的索引

3.1方法

Wait

func (c *Cond) Wait() {
    c.checker.check()
 //将等待器的计数+1
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
// 获取当前g将它追加到链表末端,之后就陷入休眠
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

我们可以看到Wait是先解锁再加锁的,所以使用前不加锁是会出问题的
Signal和Broadcast

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

内容很简单,发送唤醒信号

参考

1.Go语言 WaitGroup 源码详解
2.同步原语与锁

相关文章

网友评论

      本文标题:GO阅读-Sync包-WaitGroup和Cond

      本文链接:https://www.haomeiwen.com/subject/aphzrdtx.html