Mutex相关概念
Mutex 是互斥锁,正常情况下有2个状态:正常状态&饥饿状态
- 正常状态:所有等待锁的goroutine是按照FIFO顺序等待的,在等待中被唤醒的goroutineu 会直接拥有锁,而是会和新来的gorotine竞争锁的拥有。
- 新来的请求锁为什么不需要排队,并且可以直接跟唤醒的gorotine竞争呢??
因为新来的正在CPU上执行,并且可能有多个,就会使得被唤醒的goroutine 在锁竞争中失败
此时被唤醒的goroutine就会加入到等待队列的前面,如果等待中的goroutine超过1ms 没有获取锁,就会将该锁转换为饥饿模式!被唤醒的goroutine指的是之前被阻塞的- 饥饿状态
锁的所有权将直接交给了等待队列中的第一个goroutine,新来的goroutine将不会再去跟竞争获得锁,即便锁是unlock的状态,也不会去尝试旋操作,而是放在等待队列的尾部。
如果一个等待中的goroutine获得来锁,如果满足以下任意一个条件,那么锁的状态就会自动转为正常状态:
1 goroutine是当前等待队列的最后一个
2 goroutine等待的时间<1ms
type Mutex strut {
state int32
seme unint32
}
State 是公用字段
bit 0 位表示的是当前的mutex是否被某个goroutine所持有
0 : 表示当前处于未被加锁状态
1 :表示已经被某个goroutine 持有
bit 1 位表示当前mutex是否已经被唤醒,即某个唤醒的goroutine要尝试获取锁
bit 2 位 标记该mutex的状态
1 : 处于饥饿状态
注意:
比较重要的一些变量
全局:
mutexLocked = 1 << 0
mutexWoken
mutexStarving
mutexWaiterShift 请求锁的队列
Lock()中的变量
waitStartTime 标记当前go程的等待时间
starving 标记当前go程的状态 饥饿状态&正常状态
awoke 标记当前go程状态 唤醒&阻塞
old 复制当前的锁的状态
import (
"internal/race"
"sync/atomic"
"unsafe"
)
func throw(string) // provided by runtime
type Mutex struct {
state int32
sema uint32
}
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
//获取锁的过程是个循环
func (m *Mutex) Lock() {
// 如果mutex的state = 0, 并且没有等待的/唤醒的goroutine,并且锁处于正常状态,那么就成功获得
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 锁为正常状态 & go程等待队列=0 那么就直接返回锁
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 标记goroutine的等待时间
var waitStartTime int64
// 当前goroutine是否已经处于饥饿状态
starving := false
// 当前gorotine是否已经唤醒
awoke := false
// 自旋次数
iter := 0
//锁的当前状态,是正常的还是饥饿模式的,就是当前go程请求锁一开始的时候的状态,做一个记录
old := m.state
for {
// old&(mutexLocked|mutexStarving) == mutexLocked:old=1,当前状态已经被锁,但是并没有处于饥饿状态时
// 还可以自旋,即多核 压力不大 并且在一定次数内可以自旋 都满足的话 才可以进入自旋模式,即 出现上面的新来的goroutine和被唤醒的goroutine竞争锁的操作
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中,如果发现state还没有设置woken标识
// 那么就设置一下,并且将自己(goroutine)标记为唤醒
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 到这里,需要判断下当前锁是否为正常模式,如果是正常模式,就需要 设置锁 尝试通过CAS 获取锁!
// 如果当前锁的状态是 正常状态,那么就更新当前锁的状态为已经上锁
if old&mutexStarving == 0 { //这里是正常状态
new |= mutexLocked
}
//如果是饥饿模式,那么新来的goroutine就不能进行自旋来竞争锁
// 新的线程来了就只能排在队列的后面
if old&(mutexLocked|mutexStarving) != 0 { //这里是饥饿状态
new += 1 << mutexWaiterShift
}
//如果goroutine已经是饥饿状态,并且已经加锁,维持这个状态,锁变成饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
//如果当前go程被唤醒 就需要清除new的状态。当前的状态已经不能是woken状态
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 通过CAS设置new的状态值
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果(锁)old的状态是未上锁状态,并且当前锁是正常状态
// 那么当前go程就成功获取了锁的所有权 并且返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 未获取到的话 就需要计算当前go程的等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo)
// 计算当前goroutine是否已经处于饥饿状态.
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果已经是饥饿状态
// 那么锁应该处于unlock状态,那么锁就应该直接交给当前的go程
if old&mutexStarving != 0 {
// 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
// 那么state是一个非法状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前go来设置锁,并且将等待的go程 - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果当前go程是最有一个等待者,或者它并不处于饥饿状态
// 那么就需要将锁的状态设置为正常模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
delta -= mutexStarving
}
//设置新的state,获取来锁,推出并且返回
atomic.AddInt32(&m.state, delta)
break
}
// 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
awoke = true
iter = 0
} else {
old = m.state // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
感觉整个过程都需要考虑到两个实体的状态
锁:正常状态&饥饿状态
goroutine:唤醒&阻塞
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 如果state 不是处于上锁的状态,那么就是对锁进行释放操作就会出错
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 释放锁以后,就需要通知其他等待中的go程
// 如果锁处于饥饿状态,就直接交给队列中第一个go程,去唤醒它 让他去获取锁
if new&mutexStarving == 0 {
// 锁如果是正常状态
old := new
for {
// 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 将等待的goroutine数减一,并设置woken标识
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 锁如果是饥饿状态,直接将锁的拥有权交给等待队列中的第一个go程序
// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
// 新来的goroutine不会把锁抢过去.
runtime_Semrelease(&m.sema, true)
}
}
参考了大佬的解释,先记录下
网友评论