美文网首页
go mutex 源码

go mutex 源码

作者: 上善若水_f6a4 | 来源:发表于2022-04-05 15:47 被阅读0次
    // Copyright 2009 The Go Authors. All rights reserved.
    // Use of this source code is governed by a BSD-style
    // license that can be found in the LICENSE file.
    
    // Package sync provides basic synchronization primitives such as mutual
    // exclusion locks. Other than the Once and WaitGroup types, most are intended
    // for use by low-level library routines. Higher-level synchronization is
    // better done via channels and communication.
    //
    // Values containing the types defined in this package should not be copied.
    package sync
    
    import (
        "internal/race"
        "sync/atomic"
        "unsafe"
    )
    
    func throw(string) // provided by runtime
    
    // A Mutex is a mutual exclusion lock.
    // The zero value for a Mutex is an unlocked mutex.
    //
    // A Mutex must not be copied after first use.
    type Mutex struct {
        state int32
        sema  uint32
    }
    
    // A Locker represents an object that can be locked and unlocked.
    type Locker interface {
        Lock()
        Unlock()
    }
    
    const (
        //0001 = 1
        mutexLocked = 1 << iota // mutex is locked
        //0010 = 2
        mutexWoken
        //0100 = 4
        mutexStarving
        //0011 = 3
        mutexWaiterShift = iota
    
        // Mutex fairness.
        //
        // Mutex can be in 2 modes of operations: normal and starvation.
        // In normal mode waiters are queued in FIFO order, but a woken up waiter
        // does not own the mutex and competes with new arriving goroutines over
        // the ownership. New arriving goroutines have an advantage -- they are
        // already running on CPU and there can be lots of them, so a woken up
        // waiter has good chances of losing. In such case it is queued at front
        // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
        // it switches mutex to the starvation mode.
        //
        // In starvation mode ownership of the mutex is directly handed off from
        // the unlocking goroutine to the waiter at the front of the queue.
        // New arriving goroutines don't try to acquire the mutex even if it appears
        // to be unlocked, and don't try to spin. Instead they queue themselves at
        // the tail of the wait queue.
        //
        // If a waiter receives ownership of the mutex and sees that either
        // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
        // it switches mutex back to normal operation mode.
        //
        // Normal mode has considerably better performance as a goroutine can acquire
        // a mutex several times in a row even if there are blocked waiters.
        // Starvation mode is important to prevent pathological cases of tail latency.
        starvationThresholdNs = 1e6
    )
    
    // Lock locks m.
    // If the lock is already in use, the calling goroutine
    // blocks until the mutex is available.
    func (m *Mutex) Lock() {
        // Fast path: grab unlocked mutex.
        //如果 state 是 0, 没有加锁,进行加锁并返回
        //如果 state 不是 0, 已经加锁,执行 m.lockSlow() 进行排队加锁等待
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
        // Slow path (outlined so that the fast path can be inlined)
        m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
        var waitStartTime int64
        starving := false
        //进入 lockSlow 说明加锁失败, 该锁已经被锁定,进行挂起等待,
        //但是在这个过程中会进行自旋, awoke 标记在自旋过程中, 加锁的进行是否要进行锁释放
        //如果是,不用进行释放,自旋的协程直接加锁
        awoke := false
        iter := 0
        old := m.state
        for {
            // Don't spin in starvation mode, ownership is handed off to waiters
            // so we won't be able to acquire the mutex anyway.
            // old & (mutex | mutexStarving) = mutexLocked ; old 处于 lock 状态,但是不处于 mutexStarving 状态
            //判断协程是否可以自旋, 协程自旋条件如下
            // 1 互斥锁只有在普通模式才能进入自旋;
            // 2 runtime.sync_runtime_canSpin 需要返回 true:
            // 2.1 运行在多 CPU 的机器上;
            // 2.2 当前 Goroutine 为了获取该锁进入自旋的次数小于四次;
            // 2.3 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                //判断是否可以标记 mutexWoken ,并尝试进行标识,可以进行标记的条件为
                //1. awoke == false
                //2. old mutexWoken 状态为 0
                //3. waitersCount ,锁的等待个数大于 0
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                //进行自旋
                runtime_doSpin()
                iter++
                old = m.state
                continue
            }
            new := old
    
            // Don't try to acquire starving mutex, new arriving goroutines must queue.
            //当 old 不是 mutexStarving 状态
            if old&mutexStarving == 0 {
                // new = new | mutexLocked; new mutexLocked 位置为1
                new |= mutexLocked
            }
    
            // old 处于饥饿 或 锁定状态
            if old&(mutexLocked|mutexStarving) != 0 {
                //new 高位计数器加一
                new += 1 << mutexWaiterShift
            }
            // 当前协程将锁置为饥饿状态,但是如果锁目前没有被锁, 不转换
            // Unlock 方法希望饥饿的锁有 waiter
            // The current goroutine switches mutex to starvation mode.
            // But if the mutex is currently unlocked, don't do the switch.
            // Unlock expects that starving mutex has waiters, which will not
            // be true in this case.
            // starving 为真,并且 old 处于锁定状态
            // 这种情况是当前协程被唤醒,并且等待时间挺长了,
            // 但是在尝试加锁过程中,又被别的协程加锁了,切换为饥饿模式,将饥饿状态为置为1
            if starving && old&mutexLocked != 0 {
                //new = new | mutexStarving; new mutexStarving 状态置为 1
                new |= mutexStarving
            }
            if awoke {
                // The goroutine has been woken from sleep,
                // so we need to reset the flag in either case.
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // &^ 清零运算符,根据右侧操作数为 1 的位, 对左侧操作数进行清零
                // 将 new 的 mutexWoken 位置置为为 0
                new &^= mutexWoken
            }
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // old mutexLocked 和 mutexStarving 均为0
                // old 没有被锁定,并且没有饥饿
                if old&(mutexLocked|mutexStarving) == 0 {
                    // 通过 CAS 函数获取了锁
                    // locked the mutex with CAS
                    //加锁成功,出现这种情况,说明在进行 atomic.CompareAndSwapInt32 的时候
                    //之前别的协程加的锁已经被释放, 此协程可以直接加锁
                    break
                }
    
                //加锁失败,将协程挂载到锁的信号量上面
                // If we were already waiting before, queue at the front of the queue.
                // queueLifo 是否将协程挂载到等待协程队列的头部
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
    
                // 信号量挂载
                runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    
                // 协程被唤醒, 继续尝试加锁
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
    
                //old 处于 mutexStarving 状态
                if old&mutexStarving != 0 {
                    // If this goroutine was woken and mutex is in starvation mode,
                    // ownership was handed off to us but mutex is in somewhat
                    // inconsistent state: mutexLocked is not set and we are still
                    // accounted as waiter. Fix that.
                    //old 处于 mutexLocked 或者 mutexWoken 状态, 但是等待的协程数为0,出现不一致状态
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
    
                    //delta = int32(1 - 8)
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    if !starving || old>>mutexWaiterShift == 1 {
                        //退出 starving 模式
                        // Exit starvation mode.
                        //关键是在这里做,并考虑等待时间。
                        // Critical to do it here and consider wait time.
                        //饥饿模式的效率非常低,以至于两个goroutine一旦将互斥切换到饥饿模式,就可以无限地锁定步进。
                        // Starvation mode is so inefficient, that two goroutines
                        // can go lock-step infinitely once they switch mutex
                        // to starvation mode.
                        //delta = delta - 4
                        delta -= mutexStarving
                    }
                    //修改状态,break 加锁成功
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                //协程被唤醒,重新走循环,尝试加锁
                awoke = true
                iter = 0
            } else {
                old = m.state
            }
        }
    
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
    }
    
    // Unlock unlocks m.
    // It is a run-time error if m is not locked on entry to Unlock.
    //
    // A locked Mutex is not associated with a particular goroutine.
    // It is allowed for one goroutine to lock a Mutex and then
    // arrange for another goroutine to unlock it.
    func (m *Mutex) Unlock() {
        if race.Enabled {
            _ = m.state
            race.Release(unsafe.Pointer(m))
        }
    
        // Fast path: drop lock bit.
        // 将标志位置为未加锁
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {
            // Outlined slow path to allow inlining the fast path.
            // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
            m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 { // 正常模式
            old := new
            for {
                // If there are no waiters or a goroutine has already
                // been woken or grabbed the lock, no need to wake anyone.
                // In starvation mode ownership is directly handed off from unlocking
                // goroutine to the next waiter. We are not part of this chain,
                // since we did not observe mutexStarving when we unlocked the mutex above.
                // So get off the way.
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                // Grab the right to wake someone.
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else { // 饥饿模式
            // Starving mode: handoff mutex ownership to the next waiter, and yield
            // our time slice so that the next waiter can start to run immediately.
            // Note: mutexLocked is not set, the waiter will set it after wakeup.
            // But mutex is still considered locked if mutexStarving is set,
            // so new coming goroutines won't acquire it.
            runtime_Semrelease(&m.sema, true, 1)
        }
    }
    

    相关文章

      网友评论

          本文标题:go mutex 源码

          本文链接:https://www.haomeiwen.com/subject/cnvksrtx.html