美文网首页
go 互斥锁

go 互斥锁

作者: 小东班吉 | 来源:发表于2023-12-28 16:54 被阅读0次

并发编程中经常会出现竞争条件和竞争数据的问题,所以需要将代码段设为临界区,通过使用mutex将代码段保护起来。

sync.Mutex

mutex是一种互斥锁,用来控制多线程对共享资源竞争访问的一种同步机制。

Mutex的零值是未锁定的mutex,也就是说mutex不用初始化赋值,另外因为它的state字段存储了锁定,唤醒,饥饿等状态以及等待的waiter数量,所以它是不可复制的。通过channel也可以实现更高级的同步。

Mutex的实现

源码中有一段关于锁的描述如下:

// Mutex can be in 2 modes of operations: normal and starvation.// In normal mode waiters are queued in FIFO order, but a woken up waiter  
// does not own the mutex and competes with new arriving goroutines over  
// the ownership. New arriving goroutines have an advantage -- they are  
// already running on CPU and there can be lots of them, so a woken up  
// waiter has good chances of losing. In such case it is queued at front  
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,  
// it switches mutex to the starvation mode.  
//  
// In starvation mode ownership of the mutex is directly handed off from  
// the unlocking goroutine to the waiter at the front of the queue.  
// New arriving goroutines don't try to acquire the mutex even if it appears  
// to be unlocked, and don't try to spin. Instead they queue themselves at  
// the tail of the wait queue.  
//  
// If a waiter receives ownership of the mutex and sees that either  
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,  
// it switches mutex back to normal operation mode.  
//  
// Normal mode has considerably better performance as a goroutine can acquire  
// a mutex several times in a row even if there are blocked waiters.  
// Starvation mode is important to prevent pathological cases of tail latency.

翻译过来就是:

互斥体可以有两种操作模式:正常模式和饥饿模式。
在正常模式下,等待者按照 FIFO 顺序排队,但是被唤醒的等待者不拥有互斥量,而是与新到达的 goroutine 竞争所有权。新到达的 goroutine 有一个优势——它们已经在 CPU 上运行,并且可能有很多,所以醒来的waiter很有可能失败。在这种情况下,它会排在等待队列的前面。如果等待者在超过 1 毫秒内未能获取互斥体,则会将互斥体切换到饥饿模式。
在饥饿模式下,互斥锁的所有权直接从解锁 goroutine 移交给队列前面的waiter。新到达的 goroutine 不会尝试获取互斥锁,即使它看起来已解锁,也不会尝试旋转。相反,它们将自己排在等待队列的末尾。如果某个等待者收到互斥体的所有权,并发现 (1) 它是队列中的最后一个等待者,或者 (2) 它等待的时间少于 1 毫秒,则它将互斥体切换回正常操作模式。

普通模式具有相当好的性能,因为即使存在阻塞的等待者,goroutine 也可以连续多次获取互斥锁。饥饿模式对于防止尾部延迟的病理情况很重要。

mutex的结构体很简单,只有2个字段,其中sema是信号量,用于唤醒/阻塞goroutine,而state被分为了四个部分,是否加锁,唤醒,饥饿模式,阻塞等待的Waiter数量,其中前3个都占一个标志位,剩下的位则是waiter数量。

type Mutex struct {  
    state int32  
    sema  uint32  
}

const (  
    mutexLocked = 1 << iota // mutex is locked  
    mutexWoken  
    mutexStarving    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

按照上面源码中的注释,分析下代码

func (m *Mutex) Lock() {  
    // 快速路径:很轻松就获取到了锁  
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {  
       return  
    }  
    // 慢速路径
    m.lockSlow()  
}

func (m *Mutex) lockSlow() {  
    var waitStartTime int64  
    starving := false  // 是否饥饿
    awoke := false   // 是否唤醒
    iter := 0   // 自选次数
    old := m.state
    for {
       // 非饥饿模式才可以自旋,因为饥饿模式下所有权都会直接交给waiter(看上面注释里说的饥饿模式下的处理)
       if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {  
          // 尝试设置 mutexWoken 标志来通知 Unlock 不要唤醒其他阻塞的 goroutine。
          // 这个就是上面说的正常模式下,会唤醒当前请求锁的,但又不直接获取锁,只是设置一个标志参与竞争,然后因为已经唤醒一个,就不会唤醒其他的了,所以也阻止了,因为unlock导致唤醒其他goroutine
          if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&  
             atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { 
             awoke = true  
          }  
          runtime_doSpin()  
          iter++  
          old = m.state  
          continue  
       }  
       new := old
       // 只有在非饥饿模式下才会尝试获取锁,饥饿模式下则是直接交给waiter,新的goroutine 必须排队,所以这里判断了下不是饥饿模式,才设置锁的标志。
       if old&mutexStarving == 0 {  
          new |= mutexLocked  
       }  
        // 如果锁已经加锁或者处于饥饿模式,就得排队,等待的数量加1
       if old&(mutexLocked|mutexStarving) != 0 {  
          new += 1 << mutexWaiterShift  
       }  
       // 当前的 goroutine 将互斥锁要切换到饥饿模式,只在当前锁已经是加锁状态才会切换,
       // 如果互斥锁当前不是已加锁的状态(已解锁),则不要进行切换,new不会设置饥饿模式的状态位
       if starving && old&mutexLocked != 0 {  
          new |= mutexStarving  
       } 
       // 唤醒标志
       if awoke {  
          // 清除唤醒标志      
          if new&mutexWoken == 0 {  
             throw("sync: inconsistent mutex state")  
          }
          new &^= mutexWoken  
       }
       // 设置新的状态
       if atomic.CompareAndSwapInt32(&m.state, old, new) {  
          // 非饥饿模式下或者没加锁的情况下成功获取锁
          if old&(mutexLocked|mutexStarving) == 0 {  
             break 
          }  
          // 下面处理饥饿模式
          // 如果已经有在等待的wiater,则直接加入队头,否则就加入队尾,
          // queueLifo为true则加入对头
          queueLifo := waitStartTime != 0  
          if waitStartTime == 0 {  
             waitStartTime = runtime_nanotime()  
          }
          // 阻塞等待被唤醒
          runtime_SemacquireMutex(&m.sema, queueLifo, 1)  
          // 唤醒后如果有waiter等待的时间超过1秒,则代表处于饥饿状态,准备切换饥饿模式
          starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs  
          old = m.state
          // 如果被唤醒后,当前锁已经处于饥饿模式则进入if代码段,否则参与竞争
          if old&mutexStarving != 0 {  
             // 不正长的状态,锁已设置或已唤醒,没有等待唤醒的都属于不正常的状态
             if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {  
                throw("sync: inconsistent mutex state")  
             }  
             // delta的默认是设置锁的位,并且将等待的waiter减去1
             delta := int32(mutexLocked - 1<<mutexWaiterShift)  
             // 如果是最后一位等待的waiter,或者waiter等待的时间少于1秒了,则退出饥饿模式
             if !starving || old>>mutexWaiterShift == 1 {
                 // 退出饥饿模式,将delta 减去mutexStarving
                delta -= mutexStarving  
             }
             atomic.AddInt32(&m.state, delta)  
             break  
          }  
          awoke = true  
          iter = 0  
       } else {  
          old = m.state  
       }  
    }
}

加锁的流程:

  1. 如果走快速路径加锁成功(没有其他goroutine获取过锁),则直接返回,否则进入慢路径
  2. 首先进行自旋,自旋的条件是如果已经上锁,且处于正常模式,然后唤醒正在等待的goroutinue(也就是自己),设置唤醒状态位,参与正常模式的竞争。(自旋的目的是为了短临界区的代码释放锁后,可以快速获得锁,而不用经过休眠,再唤醒排队等待,减少goroutine切换的开销)。
  3. 自旋结束(饥饿模式/超过最大自旋次数/已经解锁)
    1. 如果不是饥饿模式,则设置加锁标志位,new |= mutexLocked
    2. 如果是饥饿模式或者已经加锁的,就继续排队,new += 1 << mutexWaiterShift(饥饿模式下新来的goroutine加入队尾排队,已经加锁的也是需要排队等待)
    3. 如果当前锁已经饥饿(还未切换饥饿模式),且已经被加锁,则切换饥饿模式,设置饥饿的标志为位,new |= mutexStarving
    4. 如果是已经被唤醒的,则清除唤醒的标志位。
  4. 利用cas 将old更新为new,设置了新的状态,不一定是加锁成功。atomic.CompareAndSwapInt32(&m.state, old, new) ,如果是以前没加锁也不是饥饿模式,那肯定代表了加锁成功,直接退出,否则就开始处理饥饿模式。
  5. 是否有已经在等待的waiter
    1. 等待时间不为0(参与竞争的goroutine),则直接加入队头
    2. 没有已经在等待的,加入队尾,饥饿模式下,所有新来的都加入队尾排队。
  6. 入队阻塞等待唤醒
  7. 有被唤醒后,根据等待时间判断是否已处于饥饿状态,更新饥饿的状态,等待下一轮将设置为饥饿模式。if starving && old&mutexLocked != 0 { new |= mutexStarving }
  8. 如果当前锁已经是饥饿模式
    1. 设置锁的标志位,将等待的waiter减去1,也就是减去自己。
    2. 判断能否退出饥饿模式,如果是最后一位等待者或者唤醒后的不处于饥饿状态(等待时间不超过1秒钟),则清除饥饿标志位
    3. 更新锁状态state。饥饿模式下直接持有锁,退出。
  9. 如果当前锁没有处于饥饿模式,则参与竞争,进入下一次循环。

读写锁

go 读写锁一般用在读多写少的场景,相比较mutex,性能会好很多。

它包含6个方法,RLockRUnlockLockUnlockTryRLockRUnlock,根据读/写不同的场景调用不同的方法就可以。
数据结构如下:

type RWMutex struct {  
    w           Mutex  // 等待或持有的writer
    writerSem   uint32 // writer等待reader释放锁的信号量  
    readerSem   uint32 // reader等待writer释放锁的信号量 
    readerCount int32  // reader的数量(包括持有读锁的reader以及等待读锁的reader)
    readerWait  int32  // 持有读锁的reader数量
}

const rwmutexMaxReaders = 1 << 30 // 最大reader的数量

RLock方法只有几行代码,当请求读锁的时候,第一行代码对readerCount进行了原子写+1,readerCount小于0则等待被唤醒,说明此时有一个writer持有写锁或者读锁没有被释放导致新进来的写锁得不到锁,所以接下来的读锁需要等待。
readerCount 的类型是int32,这里判断小于0说明是有负数出现的情况,这是因为在写锁的时候会把readerCount进行反转成负数。

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {  
       // A writer is pending, wait for it.  
       runtime_SemacquireMutex(&rw.readerSem, false, 0)  
    }
}

RUnlock也只有2行代码,用于解除单个读锁,不会影响其他读锁。解锁时对readerCount减1.
当readerCount大于0时,解锁成功。
当readerCount是负数时,说明有写锁正在进行,阻塞等待,对当前持有锁的reader,readerWait减1,如果是最后一个,则唤醒处于等待状态的writer

func (rw *RWMutex) RUnlock() {  
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {  
       // Outlined slow-path to allow the fast-path to be inlined  
       rw.rUnlockSlow(r)  
    } 
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {  
       throw("sync: RUnlock of unlocked RWMutex")  
    }  
    // A writer is pending.  
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {  
       // The last reader unblocks the writer.  
       runtime_Semrelease(&rw.writerSem, false, 1)  
    }  
}

Lock 是写锁,RWMutex的w类型是Mutex,rw.w.Lock()保证如果有多个访问Lock方法,只有一个能获取到写锁,其余的都会阻塞在这行代码。
接下来对readerCount进行反转变成负数,再加上rwmutexMaxReaders,然后判断r是否等于0,同时对rw.readerWait原子写加上r,这里指的是,如果在写锁期间有调用了Rlock,则readerCount值会发生变化,先减rwmutexMaxReaders再加,r的结果就不一定是0,另外即使readerCount的值先减为了负数,此时有读锁进来,对负数加1,也不会影响这里最终r的值。这也是上面RLock方法里为什么会判断readerCount+1的值是负数的原因。

这样做的好处是调用RLock的时候通过判断负数就知道有正在请求或者持有的写锁,需要等待。

那么当r不等于0时,说明有持有读锁的reader,对readerWait加上r,然后将当前请求的写锁加入队列,等待被唤醒。当r等于0时,写锁成功。

readerWait的值在请求读锁的时候并没有做处理,只是对readerCount+1,如果没有写锁,那读锁解锁的时候就直接减1解锁成功了。但如果readerCount为负数了,则肯定有写锁在等待被唤醒或者已经拿到锁了,而在写锁请求的时候就会把readerCount的值拷贝到readerWait上(当然这期间很可能readerCount的值又增加了(不可能减少的,因为释放读锁的请求肯定会在这次写锁之后了),但增加的reader肯定在写锁释放之后了,等待下一次有写入会继续拷贝,没有写锁,那对读锁释放会直接从readerCount上减),表示等待或者持有读锁的数量,这样在解锁读锁的时候就直接对readerWait减1。简单来说readerCount就是用来判断读锁请求的时候是否有写锁存在,否则怎么在请求读锁的时候,知道是否有写锁。

func (rw *RWMutex) Lock() {  
    // First, resolve competition with other writers.  
    rw.w.Lock()  
    // Announce to readers there is a pending writer.  
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders  
    // Wait for active readers.  
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {  
       runtime_SemacquireMutex(&rw.writerSem, false, 0)  
    }
}

Unlock 解锁,解锁和加锁不与特定的goroutine绑带,也就是说可以一个go程加锁,另一个解锁,和 mutex一样。
readerCount加上rwmutexMaxReaders得到还有多少个读锁在等待,如果有,紧接着将所有读锁唤醒,释放写锁。

如果在读锁唤醒,释放锁后还会有等待的写锁会怎样呢?写与写肯定是阻塞的,当已有写锁的时候,此时如果再有写锁会阻塞在RWMutex.Lock方法中的rw.w.Lock() 这一行,但如果此时有读锁进来,则会阻塞等待被唤醒。这样,即使后边进来的写锁发生在读锁之前,但依然会先处理读锁,释放后才会唤醒写锁,而写锁在第一个写锁释放后,根据RWMutex.Lock方法的情况会阻塞等待唤醒,直到读锁释放后唤醒它。

从这里也可以看出,当有一个写锁的时候,如果此时有另一个写锁加锁请求,也有其他的读锁加锁请求,那么在释放写锁时,首先处理的是读锁,完了才是第二个写锁。

在持有写锁的期间readerCount值肯定是个rwmutexMaxReaders反转后的负数,那么加上rwmutexMaxReaders,如果有等待读锁的reader,得到的肯定是一个小于rwmutexMaxReaders的数。
比如,有2个读锁,在写锁加锁的时候,readerCount = 3-1073741824(rwmutexMaxReaders) =-1073741821,在写锁解锁的时候,根据解锁第一行的代码,r = -1073741821+1073741824 = 3
那么足以说明,如果有等待的reader,那么r的值一定是小于rwmutexMaxReaders的,如果大于等于,说明没有等待的reader,解锁就进入到下面throw抛出来的错误这里。

func (rw *RWMutex) Unlock() {  
    // Announce to readers there is no active writer.  
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)  
    if r >= rwmutexMaxReaders {  
       race.Enable()  
       throw("sync: Unlock of unlocked RWMutex")  
    }  
    // Unblock blocked readers, if any.  
    for i := 0; i < int(r); i++ {  
       runtime_Semrelease(&rw.readerSem, false, 0)  
    }
    // Allow other writers to proceed.  
    rw.w.Unlock()
}

总结如下:

  • 没有读锁,也没有写锁
    • 新来的读锁会立即获得锁
    • 新来的写锁会立即获得锁
  • 有读锁,没有写锁
    • 新来的读锁会立即获得锁
    • 新来的写锁会阻塞等待读锁释放
  • 有写锁,没有读锁
    • 新来的读锁会阻塞等待写锁释放
    • 新来的写锁会阻塞等待写锁释放
  • 有写锁,读锁在阻塞等待的时候
    • 新来的读锁会阻塞等待上面的写锁释放
    • 新来的写锁会阻塞等待上面的读锁释放
    • 如果同时读锁和写锁都进来了,则上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁
  • 有读锁,写锁在阻塞等待的时候
    • 新来的读锁会阻塞等待,写锁拿到锁释放后才会处理读锁
    • 新来的写锁会阻塞等待,写锁拿到锁释放后才会处理写锁
    • 如果同时有读锁和写锁进来了,则在上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁

相关文章

  • Go 语言的锁

    Go 语言提供两类锁: 互斥锁(Mutex)和读写锁(RWMutex)。其中读写锁(RWMutex)是基于互斥锁(...

  • golang-互斥锁 sync.Mutex

    互斥量(锁) 在go语言里面很少用,但是也提供支持 ! 互斥量的概念!go语言提供的锁:Mutex (sync.M...

  • Golang 读写锁设计

    在《Go精妙的互斥锁设计》一文中,我们详细地讲解了互斥锁的实现原理。互斥锁为了避免竞争条件,它只允许一个线程进入代...

  • C 语言的 互斥锁、自旋锁、原子操作

    今天不整 GO 语言,我们来分享一下以前写的 C 代码,来看看 互斥锁,自旋锁和原子操作的 demo 互斥锁 临界...

  • GO互斥锁、读写锁

    读写锁:读时共享,写时独占。写锁优先级比读锁优先级高 通过mutex实现读时共享,写时独占代码示例: 打印结果: ...

  • golang 基础(31) 锁

    在 go 语言中提供了并发编程模型和工具外,还提供传统的同步工具锁,包括互斥锁和读写锁有关互斥锁有些内容我们必须清...

  • (七)golang 互斥量 源码分析

    互斥锁 源码位置:https://github.com/golang/go/blob/master/src/syn...

  • Go超时锁的设计和实现

    Go提供两种锁:sync.Mutex和sync.RWMutex。 sync.Mutex: 互斥锁。任意时刻,只能有...

  • go RWMutex源码解析

    RWMutex 基于go 1.13源码总的来说读写锁就是利用互斥锁和CAS维护2个关于读锁的变量以及runtime...

  • 线程同步与互斥

    Linux--线程编程 多线程编程-互斥锁 线程同步与互斥 互斥锁 信号量 条件变量 互斥锁 互斥锁的基本使用...

网友评论

      本文标题:go 互斥锁

      本文链接:https://www.haomeiwen.com/subject/asfjndtx.html