美文网首页
goland sync.Mutex 源码学习

goland sync.Mutex 源码学习

作者: 蛮大人我们走 | 来源:发表于2019-10-14 12:25 被阅读0次

    Mutex相关概念

    Mutex 是互斥锁,正常情况下有2个状态:正常状态&饥饿状态

    • 正常状态:所有等待锁的goroutine是按照FIFO顺序等待的,在等待中被唤醒的goroutineu 会直接拥有锁,而是会和新来的gorotine竞争锁的拥有。
    • 新来的请求锁为什么不需要排队,并且可以直接跟唤醒的gorotine竞争呢??
      因为新来的正在CPU上执行,并且可能有多个,就会使得被唤醒的goroutine 在锁竞争中失败
      此时被唤醒的goroutine就会加入到等待队列的前面,如果等待中的goroutine超过1ms 没有获取锁,就会将该锁转换为饥饿模式!被唤醒的goroutine指的是之前被阻塞的
    • 饥饿状态
      锁的所有权将直接交给了等待队列中的第一个goroutine,新来的goroutine将不会再去跟竞争获得锁,即便锁是unlock的状态,也不会去尝试旋操作,而是放在等待队列的尾部。
      如果一个等待中的goroutine获得来锁,如果满足以下任意一个条件,那么锁的状态就会自动转为正常状态:
      1 goroutine是当前等待队列的最后一个
      2 goroutine等待的时间<1ms
    type Mutex strut {
        state  int32
        seme     unint32
    }
    

    State 是公用字段
    bit 0 位表示的是当前的mutex是否被某个goroutine所持有
    0 : 表示当前处于未被加锁状态
    1 :表示已经被某个goroutine 持有

    bit 1 位表示当前mutex是否已经被唤醒,即某个唤醒的goroutine要尝试获取锁

    bit 2 位 标记该mutex的状态
    1 : 处于饥饿状态

    注意:
    比较重要的一些变量
    全局:
    mutexLocked = 1 << 0
    mutexWoken
    mutexStarving
    mutexWaiterShift 请求锁的队列

    Lock()中的变量
    waitStartTime 标记当前go程的等待时间
    starving 标记当前go程的状态 饥饿状态&正常状态
    awoke 标记当前go程状态 唤醒&阻塞
    old 复制当前的锁的状态

    import (
        "internal/race"
        "sync/atomic"
        "unsafe"
    )
    
    func throw(string) // provided by runtime
    type Mutex struct {
        state int32
        sema  uint32
    }
    type Locker interface {
        Lock()
        Unlock()
    }
    
    const (
        mutexLocked = 1 << iota // mutex is locked
        mutexWoken
        mutexStarving
        mutexWaiterShift = iota
        starvationThresholdNs = 1e6
    )
    //获取锁的过程是个循环
    func (m *Mutex) Lock() {
        //  如果mutex的state = 0, 并且没有等待的/唤醒的goroutine,并且锁处于正常状态,那么就成功获得
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // 锁为正常状态 & go程等待队列=0 那么就直接返回锁
            if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
            }
            return
        }
    
        // 标记goroutine的等待时间
        var waitStartTime int64
        // 当前goroutine是否已经处于饥饿状态
        starving := false
        // 当前gorotine是否已经唤醒
        awoke := false
            // 自旋次数
        iter := 0
            //锁的当前状态,是正常的还是饥饿模式的,就是当前go程请求锁一开始的时候的状态,做一个记录
        old := m.state
        for {
            // old&(mutexLocked|mutexStarving) == mutexLocked:old=1,当前状态已经被锁,但是并没有处于饥饿状态时
            // 还可以自旋,即多核 压力不大 并且在一定次数内可以自旋 都满足的话 才可以进入自旋模式,即 出现上面的新来的goroutine和被唤醒的goroutine竞争锁的操作
            if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                // 自旋的过程中,如果发现state还没有设置woken标识
                // 那么就设置一下,并且将自己(goroutine)标记为唤醒
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                old = m.state
                continue
            }
            new := old
            // 到这里,需要判断下当前锁是否为正常模式,如果是正常模式,就需要 设置锁 尝试通过CAS 获取锁!
            // 如果当前锁的状态是 正常状态,那么就更新当前锁的状态为已经上锁
        if old&mutexStarving == 0 { //这里是正常状态
                new |= mutexLocked
            }
      //如果是饥饿模式,那么新来的goroutine就不能进行自旋来竞争锁
      // 新的线程来了就只能排在队列的后面
            if old&(mutexLocked|mutexStarving) != 0 { //这里是饥饿状态
                new += 1 << mutexWaiterShift
            }
            //如果goroutine已经是饥饿状态,并且已经加锁,维持这个状态,锁变成饥饿状态
            if starving && old&mutexLocked != 0 {
                new |= mutexStarving
            }
            if awoke {
                //如果当前go程被唤醒 就需要清除new的状态。当前的状态已经不能是woken状态
                if new&mutexWoken == 0 {
                    throw("sync: inconsistent mutex state")
                }
                new &^= mutexWoken
            }
            // 通过CAS设置new的状态值
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 如果(锁)old的状态是未上锁状态,并且当前锁是正常状态
                // 那么当前go程就成功获取了锁的所有权 并且返回
                if old&(mutexLocked|mutexStarving) == 0 {
                    break // locked the mutex with CAS
                }
                // 未获取到的话 就需要计算当前go程的等待时间
                queueLifo := waitStartTime != 0
                if waitStartTime == 0 {
                    waitStartTime = runtime_nanotime()
                }
                // 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
                // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
                // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
                runtime_SemacquireMutex(&m.sema, queueLifo)
                // 计算当前goroutine是否已经处于饥饿状态.
                starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                old = m.state
                // 如果已经是饥饿状态
                // 那么锁应该处于unlock状态,那么锁就应该直接交给当前的go程
                if old&mutexStarving != 0 {
                    // 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
                    // 那么state是一个非法状态
                    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                        throw("sync: inconsistent mutex state")
                    }
                    // 当前go来设置锁,并且将等待的go程 - 1
                    delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    // 如果当前go程是最有一个等待者,或者它并不处于饥饿状态
                    // 那么就需要将锁的状态设置为正常模式
                    if !starving || old>>mutexWaiterShift == 1 {
                        // Exit starvation mode.
                        delta -= mutexStarving
                    }
                    //设置新的state,获取来锁,推出并且返回
                    atomic.AddInt32(&m.state, delta)
                    break
                }
                // 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
                awoke = true
                iter = 0
            } else {
                old = m.state // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
            }
        }
    
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
    }
    

    感觉整个过程都需要考虑到两个实体的状态
    锁:正常状态&饥饿状态
    goroutine:唤醒&阻塞

    func (m *Mutex) Unlock() {
        if race.Enabled {
            _ = m.state
            race.Release(unsafe.Pointer(m))
        }
    
        // 如果state 不是处于上锁的状态,那么就是对锁进行释放操作就会出错
        new := atomic.AddInt32(&m.state, -mutexLocked)
        if (new+mutexLocked)&mutexLocked == 0 {
            throw("sync: unlock of unlocked mutex")
        }
    
        // 释放锁以后,就需要通知其他等待中的go程
        // 如果锁处于饥饿状态,就直接交给队列中第一个go程,去唤醒它 让他去获取锁
        if new&mutexStarving == 0 {
            // 锁如果是正常状态
            old := new
            for {
                // 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
                if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                    return
                }
                // 将等待的goroutine数减一,并设置woken标识
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                // 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    runtime_Semrelease(&m.sema, false)
                    return
                }
                old = m.state
            }
        } else {
            // 锁如果是饥饿状态,直接将锁的拥有权交给等待队列中的第一个go程序
            // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
            // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
            // 新来的goroutine不会把锁抢过去.
            runtime_Semrelease(&m.sema, true)
        }
    }
    
    

    参考了大佬的解释,先记录下

    相关文章

      网友评论

          本文标题:goland sync.Mutex 源码学习

          本文链接:https://www.haomeiwen.com/subject/mrstmctx.html