美文网首页
golang源码阅读-sync.Mutex

golang源码阅读-sync.Mutex

作者: CodingGuy | 来源:发表于2018-12-16 20:38 被阅读0次

    【golang源码分析】sync.Mutex

    概述

    Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。
    goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。
    在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个waiter等待获取锁的时间超过1ms,会将mutex转换成starvation模式。
    在starvation模式,当mutex被unlock时,持有权会直接移交到队首的goroutine中。新加入的goroutine不会再参与锁的竞争,而是直接放入等待队列的尾部。

    mutex通过一个state变量来维护锁的状态信息。

    • 低一位mutexLocked:表示锁是否被锁定
    • 低二位mutexWoken:表示是否有goroutine在竞争锁,这个主要应用于unlock时判断是否有必要唤醒等待队列中的goroutine
    • 低三位mutexStarving:表示锁的模式starvation或normal,剩余高位用于标志等待队列的长度。

    代码分析

    定义

    Mutex实现了Locker接口,维护两个变量——state用来维护锁的状态,同时通过操作sema来唤醒和休眠等待goroutine

    type Mutex struct {
        state int32
        sema  uint32
    }
    
    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
        Lock()
        Unlock()
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexStarving
        mutexWaiterShift = iota
    
        starvationThresholdNs = 1e6
    )
    

    Lock

    完整代码

    // Lock locks m.
    // If the lock is already in use, the calling goroutine
    // blocks until the mutex is available.
    func (m *Mutex) Lock() {
        // Fast path: grab unlocked mutex.
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
    
        var waitStartTime int64
        starving := false
        awoke := false
        iter := 0
        old := m.state
        for {
            // Don't spin in starvation mode, ownership is handed off to waiters
            // so we won't be able to acquire the mutex anyway.
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state
                continue
            }
            new := old
            // Don't try to acquire starving mutex, new arriving goroutines must queue.
            if old&mutexStarving == 0 {
                new |= mutexLocked
            }
            if old&(mutexLocked|mutexStarving) != 0 {
                new += 1 << mutexWaiterShift
            }
            // The current goroutine switches mutex to starvation mode.
            // But if the mutex is currently unlocked, don't do the switch.
            // Unlock expects that starving mutex has waiters, which will not
            // be true in this case.
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving
            }
            if awoke {
                // The goroutine has been woken from sleep,
                // so we need to reset the flag in either case.
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // If we were already waiting before, queue at the front of the queue.
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                runtime_SemacquireMutex(&m.sema, queueLifo)
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                if old&mutexStarving != 0 {
                    // If this goroutine was woken and mutex is in starvation mode,
                    // ownership was handed off to us but mutex is in somewhat
                    // inconsistent state: mutexLocked is not set and we are still
                    // accounted as waiter. Fix that.
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        // Exit starvation mode.
                        // Critical to do it here and consider wait time.
                        // Starvation mode is so inefficient, that two goroutines
                        // can go lock-step infinitely once they switch mutex
                        // to starvation mode.
                        delta -= mutexStarving
                    }
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
    }
    

    详解

    1. 首先默认state为0,通过cas操作将锁的状态更新为mutexLocked,尝试快速获取锁。在并发度较小的情况下,mutex多数处于初始状态,可以提供加锁性能。如果快速获取锁失败,goroutine会不断地进行自旋抢锁、休眠、唤醒抢锁,直到抢到锁后break。在这期间,需要通过cas对state进行更新,只有更新成功的goroutine才能进行下一阶段的操作。
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
    
    1. 竞争锁的goroutine会先自旋一段时间直到mutex被unlocked、或切换到starvation模式、或自旋次数达到上限。自旋过程中,goroutine也会尝试通过cas将mutexWoken位置1,以通知Unlock操作不必唤醒等待队列中的goroutine。starvatione模式是不需要自旋的,因为锁的持有权会直接移交给等待队列队首的goroutine,自旋操作没有实际意义。
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 当前goroutine未更新成功过woken位,woken位为0,等待队列非空,通过cas尝试更新woken位
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()  // pause一段时间
            iter++
            old = m.state
            continue
    }
    
    1. 从前面代码可以看到,退出自旋有三种原因,及其对应的处理如下:
      • mutex被Unlock:将mutexLocked置1,代表尝试抢锁,然后cas尝试更新state
      • mutex切换到starvation模式:不改变mutexLocked,waiter数量+1,cas更新state成功后后直接加入到等待队列里。
      • 自旋次数达到上限:不改变mutexLocked,waiter数量+1,如果starving变量为true,则将mutexStarving位置1。

    starving是goroutine被唤醒时通过计算等待时间获取的,大于1ms则为true,表示需要切换到starvation模式。

    这里需要注意,当mutex为unlocked的时候,不需要转换mutex模式。因为starving为true,说明该goroutine是normal模式下等待队列中刚被唤醒的waiter(原因需要看后面的代码,在starvation模式。会直接将mutex的持有权交给唤醒的goroutine,走不到这一步)。如果此时做了状态切换,可能会出现在starvation模式,等待队列为空的情况。因此,只有自旋抢锁失败的情况下,才会由等待时间超过阈值的goroutine将mutex模式切换到starvation

    new := old
    // 锁被unlocked,将mutexLocked置1,代表尝试抢锁
    if old&mutexStarving == 0 {
            new |= mutexLocked
    }
    // starving或自旋次数到上限, 等待数量+1
    if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
    }
    // 如果已经是starvation状态,这一步没什么意义
    if starving && old&mutexLocked != 0 {
            new |= mutexStarving
    }
    if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            // 把awoke位给清掉
            new &^= mutexWoken
    }
    
    1. cas 更新mutex状态,失败则重复上述操作,直到状态更新成功。
      • 如果是获取锁的动作,则直接break。
      • 否则将goroutine放到等待队列中,若waitStartTime不为0,则说明是waiter抢锁失败,应该重新返回队首。
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 获取锁成功,break。接下来要处理starving和自旋次数达到上限的情况
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
           // waitStartTime说明是被唤醒的goroutine抢锁失败了,应该重新放回队首(queueLifo=true)
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 休眠
            runtime_SemacquireMutex(&m.sema, queueLifo)
    
    1. 被唤醒后,如果当前starving为true,或等待时间大于阈值,则将starving置为true,下一次自旋抢锁失败会将mutex切换到starvation模式。在starvation模式,该goroutine直接获取锁,否则的话要重复前面的操作跟新来的goroutine抢锁
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    
    if old&mutexStarving != 0 {
            // starving模式不应该出现mutex被locked或woken,等待队列长度也不能为0,有的话说明这段代码逻辑有bug
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
            }
            // 获取锁并将等待数-1
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            // 等待时间小于阈值,或该goroutine为等待队列中最后一个waiter,切换到normal模式
            if !starving || old>>mutexWaiterShift == 1 {
                delta -= mutexStarving
            }
            // 把状态更新过去,获取锁并退出。这里不用cas,因为前面操作保证了低三位都不会被更新,高位本来是原子计数,所以直接add就可以
            atomic.AddInt32(&m.state, delta)
            break
    }
    awoke = true
    iter = 0    
    

    Unlock

    // mutex允许一个goroutine加锁,另一个goroutine解锁,不要求两个操作保证同一个goroutine
    func (m *Mutex) Unlock() {
        if race.Enabled {
            _ = m.state
            race.Release(unsafe.Pointer(m))
        }
    
        // 直接解锁,然后判断一下状态,如果不一致的话则抛异常
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
    
      // starvation模式,则直接唤醒blocked队列队首goroutine
      // normal模式,如果等待队列为空,或此时锁的状态发生了变化,则不用唤醒等待队列中的goroutine
      // 否则的话,将等待数减1,cas更新state,成功则唤醒一个waiter
        if new&mutexStarving == 0 {
            old := new
            for {
                // If there are no waiters or a goroutine has already
                // been woken or grabbed the lock, no need to wake anyone.
                // In starvation mode ownership is directly handed off from unlocking
                // goroutine to the next waiter. We are not part of this chain,
                // since we did not observe mutexStarving when we unlocked the mutex above.
                // So get off the way.
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                // Grab the right to wake someone.
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false)
                    return
                }
                old = m.state
            }
        } else {
         // starving模式,直接将持有权交给下一个waiter。mutexLocked是在队首waiter被唤醒后更新的,所以此时mutexLocked还是0,但是在starving       
         // 模式下,新来的goroutine不会更新mutexLocked位。配合Lock代码看。
            runtime_Semrelease(&m.sema, true)
        }
    }
    
    

    \

    practice/golang/源码分析

    相关文章

      网友评论

          本文标题:golang源码阅读-sync.Mutex

          本文链接:https://www.haomeiwen.com/subject/nroxkqtx.html