美文网首页
go 互斥锁

go 互斥锁

作者: 小东班吉 | 来源:发表于2023-12-28 16:54 被阅读0次

    并发编程中经常会出现竞争条件和竞争数据的问题,所以需要将代码段设为临界区,通过使用mutex将代码段保护起来。

    sync.Mutex

    mutex是一种互斥锁,用来控制多线程对共享资源竞争访问的一种同步机制。

    Mutex的零值是未锁定的mutex,也就是说mutex不用初始化赋值,另外因为它的state字段存储了锁定,唤醒,饥饿等状态以及等待的waiter数量,所以它是不可复制的。通过channel也可以实现更高级的同步。

    Mutex的实现

    源码中有一段关于锁的描述如下:

    // Mutex can be in 2 modes of operations: normal and starvation.// In normal mode waiters are queued in FIFO order, but a woken up waiter  
    // does not own the mutex and competes with new arriving goroutines over  
    // the ownership. New arriving goroutines have an advantage -- they are  
    // already running on CPU and there can be lots of them, so a woken up  
    // waiter has good chances of losing. In such case it is queued at front  
    // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,  
    // it switches mutex to the starvation mode.  
    //  
    // In starvation mode ownership of the mutex is directly handed off from  
    // the unlocking goroutine to the waiter at the front of the queue.  
    // New arriving goroutines don't try to acquire the mutex even if it appears  
    // to be unlocked, and don't try to spin. Instead they queue themselves at  
    // the tail of the wait queue.  
    //  
    // If a waiter receives ownership of the mutex and sees that either  
    // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,  
    // it switches mutex back to normal operation mode.  
    //  
    // Normal mode has considerably better performance as a goroutine can acquire  
    // a mutex several times in a row even if there are blocked waiters.  
    // Starvation mode is important to prevent pathological cases of tail latency.
    

    翻译过来就是:

    互斥体可以有两种操作模式:正常模式和饥饿模式。
    在正常模式下,等待者按照 FIFO 顺序排队,但是被唤醒的等待者不拥有互斥量,而是与新到达的 goroutine 竞争所有权。新到达的 goroutine 有一个优势——它们已经在 CPU 上运行,并且可能有很多,所以醒来的waiter很有可能失败。在这种情况下,它会排在等待队列的前面。如果等待者在超过 1 毫秒内未能获取互斥体,则会将互斥体切换到饥饿模式。
    在饥饿模式下,互斥锁的所有权直接从解锁 goroutine 移交给队列前面的waiter。新到达的 goroutine 不会尝试获取互斥锁,即使它看起来已解锁,也不会尝试旋转。相反,它们将自己排在等待队列的末尾。如果某个等待者收到互斥体的所有权,并发现 (1) 它是队列中的最后一个等待者,或者 (2) 它等待的时间少于 1 毫秒,则它将互斥体切换回正常操作模式。
    
    普通模式具有相当好的性能,因为即使存在阻塞的等待者,goroutine 也可以连续多次获取互斥锁。饥饿模式对于防止尾部延迟的病理情况很重要。
    

    mutex的结构体很简单,只有2个字段,其中sema是信号量,用于唤醒/阻塞goroutine,而state被分为了四个部分,是否加锁,唤醒,饥饿模式,阻塞等待的Waiter数量,其中前3个都占一个标志位,剩下的位则是waiter数量。

    type Mutex struct {  
        state int32  
        sema  uint32  
    }
    
    const (  
        mutexLocked = 1 << iota // mutex is locked  
        mutexWoken  
        mutexStarving    mutexWaiterShift = iota
    
        starvationThresholdNs = 1e6
    )
    

    按照上面源码中的注释,分析下代码

    func (m *Mutex) Lock() {  
        // 快速路径:很轻松就获取到了锁  
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {  
           return  
        }  
        // 慢速路径
        m.lockSlow()  
    }
    
    func (m *Mutex) lockSlow() {  
        var waitStartTime int64  
        starving := false  // 是否饥饿
        awoke := false   // 是否唤醒
        iter := 0   // 自选次数
        old := m.state
        for {
           // 非饥饿模式才可以自旋,因为饥饿模式下所有权都会直接交给waiter(看上面注释里说的饥饿模式下的处理)
           if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {  
              // 尝试设置 mutexWoken 标志来通知 Unlock 不要唤醒其他阻塞的 goroutine。
              // 这个就是上面说的正常模式下,会唤醒当前请求锁的,但又不直接获取锁,只是设置一个标志参与竞争,然后因为已经唤醒一个,就不会唤醒其他的了,所以也阻止了,因为unlock导致唤醒其他goroutine
              if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&  
                 atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { 
                 awoke = true  
              }  
              runtime_doSpin()  
              iter++  
              old = m.state  
              continue  
           }  
           new := old
           // 只有在非饥饿模式下才会尝试获取锁,饥饿模式下则是直接交给waiter,新的goroutine 必须排队,所以这里判断了下不是饥饿模式,才设置锁的标志。
           if old&mutexStarving == 0 {  
              new |= mutexLocked  
           }  
            // 如果锁已经加锁或者处于饥饿模式,就得排队,等待的数量加1
           if old&(mutexLocked|mutexStarving) != 0 {  
              new += 1 << mutexWaiterShift  
           }  
           // 当前的 goroutine 将互斥锁要切换到饥饿模式,只在当前锁已经是加锁状态才会切换,
           // 如果互斥锁当前不是已加锁的状态(已解锁),则不要进行切换,new不会设置饥饿模式的状态位
           if starving && old&mutexLocked != 0 {  
              new |= mutexStarving  
           } 
           // 唤醒标志
           if awoke {  
              // 清除唤醒标志      
              if new&mutexWoken == 0 {  
                 throw("sync: inconsistent mutex state")  
              }
              new &^= mutexWoken  
           }
           // 设置新的状态
           if atomic.CompareAndSwapInt32(&m.state, old, new) {  
              // 非饥饿模式下或者没加锁的情况下成功获取锁
              if old&(mutexLocked|mutexStarving) == 0 {  
                 break 
              }  
              // 下面处理饥饿模式
              // 如果已经有在等待的wiater,则直接加入队头,否则就加入队尾,
              // queueLifo为true则加入对头
              queueLifo := waitStartTime != 0  
              if waitStartTime == 0 {  
                 waitStartTime = runtime_nanotime()  
              }
              // 阻塞等待被唤醒
              runtime_SemacquireMutex(&m.sema, queueLifo, 1)  
              // 唤醒后如果有waiter等待的时间超过1秒,则代表处于饥饿状态,准备切换饥饿模式
              starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs  
              old = m.state
              // 如果被唤醒后,当前锁已经处于饥饿模式则进入if代码段,否则参与竞争
              if old&mutexStarving != 0 {  
                 // 不正长的状态,锁已设置或已唤醒,没有等待唤醒的都属于不正常的状态
                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {  
                    throw("sync: inconsistent mutex state")  
                 }  
                 // delta的默认是设置锁的位,并且将等待的waiter减去1
                 delta := int32(mutexLocked - 1<<mutexWaiterShift)  
                 // 如果是最后一位等待的waiter,或者waiter等待的时间少于1秒了,则退出饥饿模式
                 if !starving || old>>mutexWaiterShift == 1 {
                     // 退出饥饿模式,将delta 减去mutexStarving
                    delta -= mutexStarving  
                 }
                 atomic.AddInt32(&m.state, delta)  
                 break  
              }  
              awoke = true  
              iter = 0  
           } else {  
              old = m.state  
           }  
        }
    }
    
    

    加锁的流程:

    1. 如果走快速路径加锁成功(没有其他goroutine获取过锁),则直接返回,否则进入慢路径
    2. 首先进行自旋,自旋的条件是如果已经上锁,且处于正常模式,然后唤醒正在等待的goroutinue(也就是自己),设置唤醒状态位,参与正常模式的竞争。(自旋的目的是为了短临界区的代码释放锁后,可以快速获得锁,而不用经过休眠,再唤醒排队等待,减少goroutine切换的开销)。
    3. 自旋结束(饥饿模式/超过最大自旋次数/已经解锁)
      1. 如果不是饥饿模式,则设置加锁标志位,new |= mutexLocked
      2. 如果是饥饿模式或者已经加锁的,就继续排队,new += 1 << mutexWaiterShift(饥饿模式下新来的goroutine加入队尾排队,已经加锁的也是需要排队等待)
      3. 如果当前锁已经饥饿(还未切换饥饿模式),且已经被加锁,则切换饥饿模式,设置饥饿的标志为位,new |= mutexStarving
      4. 如果是已经被唤醒的,则清除唤醒的标志位。
    4. 利用cas 将old更新为new,设置了新的状态,不一定是加锁成功。atomic.CompareAndSwapInt32(&m.state, old, new) ,如果是以前没加锁也不是饥饿模式,那肯定代表了加锁成功,直接退出,否则就开始处理饥饿模式。
    5. 是否有已经在等待的waiter
      1. 等待时间不为0(参与竞争的goroutine),则直接加入队头
      2. 没有已经在等待的,加入队尾,饥饿模式下,所有新来的都加入队尾排队。
    6. 入队阻塞等待唤醒
    7. 有被唤醒后,根据等待时间判断是否已处于饥饿状态,更新饥饿的状态,等待下一轮将设置为饥饿模式。if starving && old&mutexLocked != 0 { new |= mutexStarving }
    8. 如果当前锁已经是饥饿模式
      1. 设置锁的标志位,将等待的waiter减去1,也就是减去自己。
      2. 判断能否退出饥饿模式,如果是最后一位等待者或者唤醒后的不处于饥饿状态(等待时间不超过1秒钟),则清除饥饿标志位
      3. 更新锁状态state。饥饿模式下直接持有锁,退出。
    9. 如果当前锁没有处于饥饿模式,则参与竞争,进入下一次循环。

    读写锁

    go 读写锁一般用在读多写少的场景,相比较mutex,性能会好很多。

    它包含6个方法,RLockRUnlockLockUnlockTryRLockRUnlock,根据读/写不同的场景调用不同的方法就可以。
    数据结构如下:

    type RWMutex struct {  
        w           Mutex  // 等待或持有的writer
        writerSem   uint32 // writer等待reader释放锁的信号量  
        readerSem   uint32 // reader等待writer释放锁的信号量 
        readerCount int32  // reader的数量(包括持有读锁的reader以及等待读锁的reader)
        readerWait  int32  // 持有读锁的reader数量
    }
    
    const rwmutexMaxReaders = 1 << 30 // 最大reader的数量
    

    RLock方法只有几行代码,当请求读锁的时候,第一行代码对readerCount进行了原子写+1,readerCount小于0则等待被唤醒,说明此时有一个writer持有写锁或者读锁没有被释放导致新进来的写锁得不到锁,所以接下来的读锁需要等待。
    readerCount 的类型是int32,这里判断小于0说明是有负数出现的情况,这是因为在写锁的时候会把readerCount进行反转成负数。

    func (rw *RWMutex) RLock() {
        if atomic.AddInt32(&rw.readerCount, 1) < 0 {  
           // A writer is pending, wait for it.  
           runtime_SemacquireMutex(&rw.readerSem, false, 0)  
        }
    }
    

    RUnlock也只有2行代码,用于解除单个读锁,不会影响其他读锁。解锁时对readerCount减1.
    当readerCount大于0时,解锁成功。
    当readerCount是负数时,说明有写锁正在进行,阻塞等待,对当前持有锁的reader,readerWait减1,如果是最后一个,则唤醒处于等待状态的writer

    func (rw *RWMutex) RUnlock() {  
        if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {  
           // Outlined slow-path to allow the fast-path to be inlined  
           rw.rUnlockSlow(r)  
        } 
    }
    
    func (rw *RWMutex) rUnlockSlow(r int32) {
        if r+1 == 0 || r+1 == -rwmutexMaxReaders {  
           throw("sync: RUnlock of unlocked RWMutex")  
        }  
        // A writer is pending.  
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {  
           // The last reader unblocks the writer.  
           runtime_Semrelease(&rw.writerSem, false, 1)  
        }  
    }
    

    Lock 是写锁,RWMutex的w类型是Mutex,rw.w.Lock()保证如果有多个访问Lock方法,只有一个能获取到写锁,其余的都会阻塞在这行代码。
    接下来对readerCount进行反转变成负数,再加上rwmutexMaxReaders,然后判断r是否等于0,同时对rw.readerWait原子写加上r,这里指的是,如果在写锁期间有调用了Rlock,则readerCount值会发生变化,先减rwmutexMaxReaders再加,r的结果就不一定是0,另外即使readerCount的值先减为了负数,此时有读锁进来,对负数加1,也不会影响这里最终r的值。这也是上面RLock方法里为什么会判断readerCount+1的值是负数的原因。

    这样做的好处是调用RLock的时候通过判断负数就知道有正在请求或者持有的写锁,需要等待。

    那么当r不等于0时,说明有持有读锁的reader,对readerWait加上r,然后将当前请求的写锁加入队列,等待被唤醒。当r等于0时,写锁成功。

    readerWait的值在请求读锁的时候并没有做处理,只是对readerCount+1,如果没有写锁,那读锁解锁的时候就直接减1解锁成功了。但如果readerCount为负数了,则肯定有写锁在等待被唤醒或者已经拿到锁了,而在写锁请求的时候就会把readerCount的值拷贝到readerWait上(当然这期间很可能readerCount的值又增加了(不可能减少的,因为释放读锁的请求肯定会在这次写锁之后了),但增加的reader肯定在写锁释放之后了,等待下一次有写入会继续拷贝,没有写锁,那对读锁释放会直接从readerCount上减),表示等待或者持有读锁的数量,这样在解锁读锁的时候就直接对readerWait减1。简单来说readerCount就是用来判断读锁请求的时候是否有写锁存在,否则怎么在请求读锁的时候,知道是否有写锁。

    func (rw *RWMutex) Lock() {  
        // First, resolve competition with other writers.  
        rw.w.Lock()  
        // Announce to readers there is a pending writer.  
        r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders  
        // Wait for active readers.  
        if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {  
           runtime_SemacquireMutex(&rw.writerSem, false, 0)  
        }
    }
    

    Unlock 解锁,解锁和加锁不与特定的goroutine绑带,也就是说可以一个go程加锁,另一个解锁,和 mutex一样。
    readerCount加上rwmutexMaxReaders得到还有多少个读锁在等待,如果有,紧接着将所有读锁唤醒,释放写锁。

    如果在读锁唤醒,释放锁后还会有等待的写锁会怎样呢?写与写肯定是阻塞的,当已有写锁的时候,此时如果再有写锁会阻塞在RWMutex.Lock方法中的rw.w.Lock() 这一行,但如果此时有读锁进来,则会阻塞等待被唤醒。这样,即使后边进来的写锁发生在读锁之前,但依然会先处理读锁,释放后才会唤醒写锁,而写锁在第一个写锁释放后,根据RWMutex.Lock方法的情况会阻塞等待唤醒,直到读锁释放后唤醒它。

    从这里也可以看出,当有一个写锁的时候,如果此时有另一个写锁加锁请求,也有其他的读锁加锁请求,那么在释放写锁时,首先处理的是读锁,完了才是第二个写锁。

    在持有写锁的期间readerCount值肯定是个rwmutexMaxReaders反转后的负数,那么加上rwmutexMaxReaders,如果有等待读锁的reader,得到的肯定是一个小于rwmutexMaxReaders的数。
    比如,有2个读锁,在写锁加锁的时候,readerCount = 3-1073741824(rwmutexMaxReaders) =-1073741821,在写锁解锁的时候,根据解锁第一行的代码,r = -1073741821+1073741824 = 3
    那么足以说明,如果有等待的reader,那么r的值一定是小于rwmutexMaxReaders的,如果大于等于,说明没有等待的reader,解锁就进入到下面throw抛出来的错误这里。

    func (rw *RWMutex) Unlock() {  
        // Announce to readers there is no active writer.  
        r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)  
        if r >= rwmutexMaxReaders {  
           race.Enable()  
           throw("sync: Unlock of unlocked RWMutex")  
        }  
        // Unblock blocked readers, if any.  
        for i := 0; i < int(r); i++ {  
           runtime_Semrelease(&rw.readerSem, false, 0)  
        }
        // Allow other writers to proceed.  
        rw.w.Unlock()
    }
    

    总结如下:

    • 没有读锁,也没有写锁
      • 新来的读锁会立即获得锁
      • 新来的写锁会立即获得锁
    • 有读锁,没有写锁
      • 新来的读锁会立即获得锁
      • 新来的写锁会阻塞等待读锁释放
    • 有写锁,没有读锁
      • 新来的读锁会阻塞等待写锁释放
      • 新来的写锁会阻塞等待写锁释放
    • 有写锁,读锁在阻塞等待的时候
      • 新来的读锁会阻塞等待上面的写锁释放
      • 新来的写锁会阻塞等待上面的读锁释放
      • 如果同时读锁和写锁都进来了,则上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁
    • 有读锁,写锁在阻塞等待的时候
      • 新来的读锁会阻塞等待,写锁拿到锁释放后才会处理读锁
      • 新来的写锁会阻塞等待,写锁拿到锁释放后才会处理写锁
      • 如果同时有读锁和写锁进来了,则在上面的写锁释放后,会先处理读锁,等到所有读锁释放后再处理写锁

    相关文章

      网友评论

          本文标题:go 互斥锁

          本文链接:https://www.haomeiwen.com/subject/asfjndtx.html