美文网首页
Go 的 sync (同步原语)库

Go 的 sync (同步原语)库

作者: thepoy | 来源:发表于2021-04-10 09:33 被阅读0次

    官方包的注释:

    // Package sync provides basic synchronization primitives such as mutual
    // exclusion locks. Other than the Once and WaitGroup types, most are intended
    // for use by low-level library routines. Higher-level synchronization is
    // better done via channels and communication.
    

    sync包提供基础的同步原语,sync.Mutextsync.RWMutexsync.WaitGroupsync.Oncesync.Cond

    一、Mutex

    Go 语言的sync.Mutex由两个字段statesema组成。其中,state表示当前互斥锁的状态,sema是用来控制锁状态的信号量。

    type Mutex struct {
        state int32
        sema  uint32
    }
    

    上述两个加起来只占 8 字节空间的结构体表过了 Go 语言中的互斥锁。

    1 状态

    互斥锁的状态:

    const (
        mutexLocked = 1 << iota // 锁定
        mutexWoken // 唤醒
        mutexStarving  // 饥饿
      ...
    )
    

    2 模式

    sync.Mutex有两种模式——正常模式和饥饿模式。

    在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换为饥饿模式,防止部分 Goroutine 被“饿死”。

    引入饥饿模式的目的是为了保证互斥锁的公平性。在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁,也不会进入自旋状态,只会在队列的末尾等待。如果一个 Goroutine 获取到了互斥锁并且它在队列末尾的时间或者它等待的时间少于 1ms ,那么当前的互斥锁就会切换回正常模式。

    与饥饿模式相比,正常模式下的互斥锁能够提供更好的性能,饥饿模式能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延迟。

    3 上锁

    上锁sync.Mutex.Lock,解锁sync.Mutex.Unlock

    互斥锁的上锁方法经过精简,方法的主干只保留最常见、简单的情况 ——当锁的状态是 0 时,将mutextLocked位置换成 1:

    func (m *Mutex) Lock() {
        // Fast path: grab unlocked mutex.
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
        // Slow path (outlined so that the fast path can be inlined)
        m.lockSlow()
    }
    
    

    如果互斥锁的状态不是 0 时就会调用sync.Mutex.lockSlow尝试通过自旋等方式等待锁的释放,该方法的主体是一个非常大的for循环。这里将它分成几个部分进行介绍:

    1. 判断当前 Goroutine 能否进入自旋
    2. 通过自旋等待互斥锁的释放
    3. 计算互斥锁的最新状态
    4. 更新互斥锁的状态并获取锁

    3.1 判断 G 能否自旋

    for {
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state
                continue
            }
    }
    

    自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核处理器上,自旋可以避免 G 的切换,使用恰当能更好地利用资源,发挥更好的性能,但是使用不当,会拖慢整个程序,所以 G 进入自旋的条件非常苛刻:

    1. 互斥锁只有在普通模式才能进入自旋(Don't spin in starvation mode)
    2. runtime.sync_runtime_canSpin需要返回true
      1. 运行在多核处理器上
      2. 当前 G 为了获取该锁进入自旋的次数小于四次
      3. 当前机器上至少存在一个正在运行的处理器 P 并且其运行队列为空

    3.2 自旋占用 CPU

    一旦当前 G 能够进入自旋就会调用runtime.sync_runtime_doSpinruntime.procyield并执行 30 次的PAUSE指令,该指令只会占用 CPU 并消耗 CPU 时间:

    func sync_runtime_doSpin() {
        procyield(active_spin_cnt)
    }
    
    TEXT runtime·procyield(SB),NOSPLIT,$0-0
        MOVL    cycles+0(FP), AX
    again:
        PAUSE
        SUBL    $1, AX
        JNZ again
        RET
    

    3.3 计算锁的状态

    处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。几个不同的条件分别会更新state字段中存储的不同信息:

    const ( 
        mutexLocked
        mutexWoken
        mutexStarving
        mutexWaiterShift
    )
    
    new := old
    if old&mutexStarving == 0 {
      new |= mutexLocked
    }
    if old&(mutexLocked|mutexStarving) != 0 {
      new += 1 << mutexWaiterShift
    }
    if starving && old&mutexLocked != 0 {
      new |= mutexStarving
    }
    if awoke {
      if new&mutexWoken == 0 {
        throw("sync: inconsistent mutex state")
      }
      new &^= mutexWoken
    }
    

    3.4 更新锁状态

    计算了新的互斥锁状态之后,会使用 CAS 函数sync/atomic.CompareAndSwapInt32更新状态:

    if atomic.CompareAndSwapInt32(&m.state, old, new) {
      if old&(mutexLocked|mutexStarving) == 0 {
        break // 用 CAS 上锁
      }
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
      }
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      if old&mutexStarving != 0 {
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
          throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
          delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
      }
      awoke = true
      iter = 0
    } else {
      old = m.state
    }
    

    如果没有通过 CAS 获得锁,会调用runtime.sync_runtime_SemacquireMutex通过信号量保证资源不会被两个 G 获取。runtime.sync_runtime_SemacquireMutex会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 G 可以获取信号量,它就会立刻返回,sync.Mutex.Lock的剩余代码也会继续执行。

    • 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环
    • 在饥饿模式下,当前 G 会获得互斥锁,如果等待队列中只存在当前 G ,互斥锁还会从饥饿模式中退出

    4 解锁

    互斥锁的解锁过程sync.Mutex.Unlock与加锁过程相比就很简单,该过程会先使用sync/atomic.AddInt32函数快速解锁,这时会发生下面的两种情况:

    1. 如果该函数返回的新状态等于 0,当前 G 就成功解锁了互斥锁
    2. 如果该函数返回的新状态不等于 0,这段代码会调用sync.Mutex.unlockSlow开始慢速解锁
    func (m *Mutex) Unlock() {
        ...
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            m.unlockSlow(new)
        }
    }
    

    sync.Mutex.unlockSlow会先校验锁状态的合法性——如果当前互斥锁已经被解锁过了会直接抛出导常”sync: unlock of unlocked mutex“终止程序。

    在正常情况下, 会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:

    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 { // 正常模式
            old := new
            for {
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {
            runtime_Semrelease(&m.sema, true, 1)
        }
    }
    
    1. 在正常模式下,上述代码会使用如下所示的处理过程:
      • 如果互斥锁不存在等待者或互斥锁的mutexLockedmutexStarvingmutexWoken状态都不为 0,那么当前方法可以直接返回,不需要唤醒其他等待者
      • 如果互斥锁存在等待者,会通过runtime.sync_runtime_Semrelease唤醒等待者并移交锁的所有权
    2. 在饥饿模式下,上述代码会直接调用runtime.sync_runtime_Semrelease将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态

    5 小结

    对上锁和解锁进行简单总结。

    互斥锁的上锁过程比较复杂,涉及自旋、信号量以及调度等概念:

    • 如果互斥锁处理初始化状态,会通过置位mutexLocked上锁
    • 如果互斥锁处理mutexLocked状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE指令占用 CPU 时间等待锁的释放
    • 如果当前 G 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式
    • 互斥锁在正常情况下会通过runtime.sync_runtime_SemacquireMutex将尝试获取锁的 G 切换到休眠状态,等待锁的持有者唤醒
    • 如果当前 G 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式

    互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:

    • 当互斥锁已经被解锁时,调用sync.Mutex.Unlock会直接抛出异常
    • 当互斥锁处理饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置mutexLocked标志位
    • 当互斥锁处理普通模式时,如果没有 G 等待锁的释放或者已经有被唤醒的 G 获取了锁,会直接返回;在其他情况下会通过runtime.sync_runtime_Semrelease唤醒对应的 G

    二、RWMutex

    读写互斥锁sync.RWMutex是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

    常见服务的资源读写比例会非常高,因为大多数的读请求之间不会相互影响,所以我们可以分离读写操作,以此来提高服务的性能。

    1 结构体

    type RWMutex struct {
      w           Mutex  // 如果有未完成(pending)的写操作(writers)就一直维持互斥锁
        writerSem   uint32 // 写等待读的信号
        readerSem   uint32 // 读等待写的信号
        readerCount int32  // 未完成(pending)的读操作(readers)的数量
      readerWait  int32  // 即将结束/正在离开(departing)的读操作的数量
    }
    

    2 写锁

    写操作的锁使用sync.RWMutex.Locksync.RWMutex.Unlock方法。

    当资源的使用者想要获取锁时,需要调用sync.RWMutex.Lock方法:

    func (rw *RWMutex) Lock() {
        ...
        rw.w.Lock()
      // 通过把 rw.readerCount 设置为负数,来告知读操作所有者有写操作未完成
        r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
        if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
            runtime_SemacquireMutex(&rw.writerSem, false, 0)
        }
        ...
    }
    
    • 调用结构体持有的sync.Mutex结构体的sync.Mutex.Lock阻塞后续的写操作
      • 因为互斥锁已经被获取,其他 G 在获取写锁时会进入自旋或者休眠
    • 调用sync/atomic.AddInt32函数阻塞后续的读操作
    • 如果仍然有其他 G 持有互斥锁的读锁,该 G 会调用runtime.sync_runtime_SemacquireMutex进入休眠状态等待所有读锁的所有者执行结束后释放writeSem信号量将当前协程唤醒

    写锁的释放会调用sync.RWMutex.Unlock

    func (rw *RWMutex) Unlock() {
        ...
        // 将 readerCount 的值增加 rwmutexMaxReaders,使 readerCount 变为非负数,宣告有读操作即将结束
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
        if r >= rwmutexMaxReaders {
            ...
            throw("sync: Unlock of unlocked RWMutex")
        }
        for i := 0; i < int(r); i++ {
            runtime_Semrelease(&rw.readerSem, false, 0)
        }
        rw.w.Unlock()
        ...
    }
    

    与加锁的过程正好相反,写锁的释放分为以下几步:

    • 调用sync/atomic.AddInt32函数将readerCount变回正数,释放读锁
    • 通过 for 循环释放所有因为获取读锁而陷入等待的 G
    • 调用sync.Mutex.Unlock释放写锁

    获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作“饿死”。

    3 读锁

    读锁的加锁方法sync.RMWutex.RLock很简单,该方法会通过sync/actomic.AddInt32readerCount加一:

    func (rw *RWMutex) RLock() {
        ...
        if atomic.AddInt32(&rw.readerCount, 1) < 0 {
            // 有一个写操作未完成,等待它执行完毕
            runtime_SemacquireMutex(&rw.readerSem, false, 0)
        }
        ...
    }
    
    • 如果该方法返回负数,意味着有其他 G 获得了写锁,当前 G 就会调用runtime.sync_runtime_SemacquireMutex陷入休眠等待锁的释放
    • 如果该方法的结果为非负数,意味着没有 G 获得写锁,当前方法会成功返回

    G 想要释放读锁时,会调用如下所示的sync.RMWutex.RUnlock方法:

    func (rw *RWMutex) RUnlock() {
        ...
        if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
            // Outlined slow-path to allow the fast-path to be inlined
            rw.rUnlockSlow(r)
        }
        ...
    }
    

    该方法会先减少正在读资源的readerCount整数,根据sync/atomic.AddInt32的返回值不同会分别进行处理:

    • 如果返回值大于等于0,读锁直接解锁成功

    • 如果返回值小于0,表示有一个未完成的写操作,这时会调用sync.RWMutex.rUnlockSlow方法

    func (rw *RWMutex) rUnlockSlow(r int32) {
      if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        ...
        throw("sync: RUnlock of unlocked RWMutex")
      }
      // 有一个写操作未完成
      if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
      }
    }
    

    sync.RWMutex.rUnlockSlow会减少获取锁的写操作等待的读操作数readerWait并在所有读操作都被释放之后触发写操作的信号量writerSem,该信号量被触发时,调度器就会唤醒尝试获取写锁的 G

    4 小结

    虽然读写互斥锁sync.RMWutex提供的功能比较复杂,但它是建立在sync.Mutex的基出上,所以代码实现很简单。

    读锁和写锁的关系:

    1. 调用sync.RMWutex.Lock尝试获取锁时
      • 每次sync.RMWutex.RUlock都会将readerCount减一,当它归零时该 G 会获得写锁
      • readerCount减少rwmutexMaxReaders以阻塞后续的读操作
    2. 调用sync.RWMutex.Unlock释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁

    读写互斥锁在互斥锁之外提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。

    三、WaitGourp

    sync.WaitGroup可以等待一组 G 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

    requests := []*Request{...}
    wg := sync.WaitGroup{}
    wg.add(len(requests))
    
    for _, request := range requests {
      go func(r *request) {
        defer wg.Done()
        ...
      }(request)
    }
    
    wg.Wait()
    

    可以通过sync.WaitGroup将原本顺序执行的代码在多个 G 中并发执行,加快程序处理速度。

    1 结构体

    type WaitGroup struct {
        noCopy noCopy  // 保证 wg 不会被开发者通过再赋值的方式拷贝
        state1 [3]uint32  // 存储状态和信号量
    }
    

    sync.noCopy是一个私有结构体,在编绎时会检查被拷贝的变量中是否包含sync.noCopy或者实现了LockUnlock方法。如果包含该结构体或者实现了对应的方法就会报出以下错误:

    func main() {
        wg := sync.WaitGroup{}
        wg2 := wg
        fmt.Println(wg, wg2)
    }
    
    $ go vet main.go 
    # command-line-arguments
    ./main.go:10:9: assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
    ./main.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
    ./main.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
    

    这段代码会因为变量赋值或调用函数时发生值拷贝导致分析器报错。

    sync.state1的代码注释:

        // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
        // 64-bit atomic operations require 64-bit alignment, but 32-bit
        // compilers do not ensure it. So we allocate 12 bytes and then use
        // the aligned 8 bytes in them as state, and the other 4 as storage
        // for the sema.
    

    sync.WaitGroup提供的私有方法sync.WaitGroup.state能够帮我们从state1字段中取出它的状态和信号量。

    2 接口

    sync.WaitGroup对外暴露了三个方法:AddWaitDone

    其中Done方法只是向Add中传入了 -1,所以重点分析另外两个方法AddWait

    2.1 Add

    func (wg *WaitGroup) Add(delta int) {
        statep, semap := wg.state()
        ...
        state := atomic.AddUint64(statep, uint64(delta)<<32)
        v := int32(state >> 32)
        w := uint32(state)
        ...
        if v < 0 {
            panic("sync: negative WaitGroup counter")
        }
        if w != 0 && delta > 0 && v == int32(delta) {
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        if v > 0 || w == 0 {
            return
        }
        if *statep != state {
            panic("sync: WaitGroup misuse: Add called concurrently with Wait")
        }
        *statep = 0
        for ; w != 0; w-- {
            runtime_Semrelease(semap, false, 0)
        }
    }
    

    Add方法向可能是负数的WaitGroupcounter上增加增量。

    如果counter归零,所有Wait的被阻塞的 G 都被释放。

    如果counter是负数,会引发 panic。

    2.2 Wait

    func (wg *WaitGroup) Wait() {
        statep, semap := wg.state()
        ...
        for {
            state := atomic.LoadUint64(statep)
            v := int32(state >> 32)
            w := uint32(state)
            if v == 0 {
                ...
                return
            }
            if atomic.CompareAndSwapUint64(statep, state, state+1) {
                ...
                runtime_Semacquire(semap)
                if *statep != 0 {
                    panic("sync: WaitGroup is reused before previous Wait has returned")
                }
                ...
                return
            }
        }
    }
    

    Wait的作用就是在WaitGroupcounter归零前一直阻塞。

    3 小结

    • sync.WaitGroup必须在sync.WaitGroup.Wait方法返回之后才能被重新使用
    • sync.WaitGroup.Done只是向sync.WaitGroup.Add方法传入 -1 以唤醒等待的 G。所以也可以通过向Add内传递一个负数来代替Done
    • 可以同时有多个 G 等待当前sync.WaitGroup计数器归零,这些 G 会被同时唤醒

    四、Once

    sync.Once可以保证程序运行期间某段代码只执行一次。

    简单示例:

    func main() {
        o := sync.Once{}
        for i := 0; i < 10; i++ {
            o.Do(func() {
                fmt.Println("once")
            })
        }
    }
    
    $ go run main.go 
    once
    

    1 结构体

    type Once struct {
        done uint32  // 代码是否执行过的标识
        m    Mutex  // 互斥锁
    }
    

    2 接口

    sync.Once.Dosync.Once结构体对外暴露的唯一的方法,该方法会接收一个入参为空的函数:

    • 如果传入的函数已经执行过,会直接返回
    • 如果传入的函数没有执行过,会调用sync.Once.doSlow执行传入的函数
    func (o *Once) Do(f func()) {
        if atomic.LoadUint32(&o.done) == 0 {
            o.doSlow(f)
        }
    }
    
    func (o *Once) doSlow(f func()) {
        o.m.Lock()
        defer o.m.Unlock()
        if o.done == 0 {
            defer atomic.StoreUint32(&o.done, 1)
            f()
        }
    }
    

    执行过程:

    1. 为当前 G 上锁
    2. 执行传入的无入参函数
    3. 运行延迟函数调用,将成员变量done更新成 1

    sync.Once会通过成员变量done确保函数不会执行第二次。

    3 小结

    作为用于保证函数执行次数的sync.Once结构体,使用了互斥锁sync/atomic包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。

    在使用该结构体时,也需要注意以下问题:

    • sync.Once.Do方法中传入的函数只会被执行一次,哪怕函数中发生了panic
    • 两次调用sync.Once.Do方法传入不同的函数只会执行第一次传入的函数

    五、Cond

    sync.Cond是条件变量,可以让一组 G 都在满足特定条件时被唤醒。每一个sync.Cond结构体在初始化时都需要传入一个互斥锁。

    简单示例:

    var status uint32
    
    func listen(c *sync.Cond) {
        c.L.Lock()
        for atomic.LoadUint32(&status) != 1 {
            c.Wait()
        }
        fmt.Println("listenning")
        c.L.Unlock()
    }
    
    func broadcast(c *sync.Cond) {
        c.L.Lock()
        atomic.StoreUint32(&status, 1)
        c.Broadcast()
        c.L.Unlock()
    }
    
    func main() {
        c := sync.NewCond(&sync.Mutex{})
        for i := 0; i < 10; i++ {
            go listen(c)
        }
        time.Sleep(time.Second)
        go broadcast(c)
    
        ch := make(chan os.Signal, 1)
        signal.Notify(ch, os.Interrupt)
        <-ch
    }
    

    上述代码同时运行了 11 个 G,这 11 个 G 分别做了不同的事:

    • 10 个 G 通过sync.Cond.Wait等待特定条件的满足
    • 1 个 G 会调用sync.Cond.Broadcast唤醒所有陷入等待的 G

    调用sync.Cond.Broadcast方法后,上述代码会打印出 10 次 "listenning" 并结束调用。

    1 结构体

    type Cond struct {
        noCopy noCopy  // 保证结构体不会在编绎时拷贝
        L Locker  // 保护内部的`notify`字段
        notify  notifyList  // 一个 Goroutine 链表,实现同步机制的核心结构
        checker copyChecker  // 禁止运行期间发生拷贝
    }
    

    2 接口

    2.1 Wait

    sync.Cond对外暴露的sync.Cond.Wait方法会将当前 G 陷入休眠状态,它的执行过程分成以下两个步骤:

    • 调用runtime.notifyListAdd将等待计数器加一并解锁
    • 调用runtime.notifyListWait等待其他 G 的唤醒并加锁
    func (c *Cond) Wait() {
        c.checker.check()
        t := runtime_notifyListAdd(&c.notify)
        c.L.Unlock()
        runtime_notifyListWait(&c.notify, t)
        c.L.Lock()
    }
    

    2.2 Signal 和 Broadcast

    sync.Cond.Signalsync.Cond.Broadcast就是用来唤醒陷入休眠的 G 的方法,它们的实现有一些细微的差别:

    • Signal方法会唤醒队列最前面的 G
    • Broadcast方法会唤醒队列中全部的 G
    func (c *Cond) Signal() {
        c.checker.check()
        runtime_notifyListNotifyOne(&c.notify)
    }
    
    func (c *Cond) Broadcast() {
        c.checker.check()
        runtime_notifyListNotifyAll(&c.notify)
    }
    

    G 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能需要等待调度器的调度。

    一般情况下,我们都会先调用sync.Cond.Wait陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用sync.Cond.Signal或者sync.Cond.Broadcast唤醒一个或者全部的 G

    3 小结

    sync.Cond不是一个常用的同步机制,但是在条件长时间无法满足时,与使用for {}进行忙碌等待相比,sync.Cond能够让出处理器的使用权,提供 CPU 的利用率。使用时需要注意以下问题:

    • Wait在调用之前一定要上锁,否则会触发panic,程序崩溃
    • Signal唤醒的 G 都是队列最前面、等待最久的 G
    • Broadcast会按照一定顺序广播通知等待的全部 G

    相关文章

      网友评论

          本文标题:Go 的 sync (同步原语)库

          本文链接:https://www.haomeiwen.com/subject/vxigkltx.html