美文网首页
Go阅读-Sync包-同步原语与锁Mutex

Go阅读-Sync包-同步原语与锁Mutex

作者: 温岭夹糕 | 来源:发表于2023-03-26 13:54 被阅读0次

环境

go1.20
源码地址mutex.go

1.锁

1.1 锁的分类和使用

锁根据粒度分为Mutex和RWMutex,RW又叫读写锁。一般将其和需要保护的资源封装在同一个结构体当中

type safeResource struct {
    resource map[string]string
    lock     sync.Mutex
}

//不推荐这种写法
//因为调用方不一定会阅读源码,
//不一定知道锁的存在
var PublicResource map[string]string
var PublicLock sync.RWMutex

go的读写锁RWMutex是写锁优先的,即读锁可以重复加,但是当加写锁时,无法加读锁(读锁优先的锁相反,加写锁时,可以加读锁)
读锁互不影响

func TestMutex(t *testing.T) {
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第一次加锁")
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第二次加锁,不用等待上个读锁释放")
}

读写锁互相冲突

func TestMutex(t *testing.T) {
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("第一次加锁")
    PublicLock.Lock()
    fmt.Println("有读锁了无法加写锁")
    defer PublicLock.Unlock()
}
//第一次加锁panic: test timed out after 30s

写锁优先

func TestMutex(t *testing.T) {
    PublicLock.Lock()
    fmt.Println("添加写锁")
    defer PublicLock.Unlock()
    PublicLock.RLock()
    defer PublicLock.RUnlock()
    fmt.Print("尝试加读锁")
}
//添加写锁panic: test timed out after 30s

1.2加锁原则

  • 多个g(协程)只读不加锁
  • Mutex只要加上就会锁住适合写多读少,RWMutex适合写少读多

1.3 双重检查double-check

这种情况很少见,了解就行,
java双重检查锁的案例,即保证先行发生关系,防止在一个协程中看到另一个协程执行一半的情况

  • 案例1,给一个线程安全的map增加一个添加键值对的方法,若存在则返回旧值
type safeMap[K comparable, V any] struct {
    values map[K]V
    lock   sync.RWMutex
}
// g1 设置 key1 = 1
// g2 设置 key1 = 2
func (s *safeMap[K, V]) LoadOrStore(key K, newVal V) (val V, loaded bool) {
//读锁不互相冲突,两协程可能都获取到key1未设置的信息
    s.lock.RLock()
    oldVal, ok := s.values[key]
    s.lock.RUnlock()
    if ok {
        return oldVal, true
    }
// 添加写锁有先后顺序假设g1先获取
// key1 = 1 返回 1,false
// g2后进入,按逻辑来说应该是key1已经被设置过了
//返回 1 ,true
// 实际 确是 key2=2 返回2,false
    s.lock.Lock()
    defer s.lock.Unlock()

    s.values[key] = newVal
    return newVal, false
}

为达到我们预期效果,有两种方法:

  • 全程使用mutex
  • 使用双重检查,即在加写锁后再读一次数据
    s.lock.Lock()
    defer s.lock.Unlock()
    // g1先进了 key1 =1
    // g2后进来 key1 =2 覆盖了
    // 所以用两次检测 double-check
    oldVal, ok = s.values[key]
    if ok {
        return oldVal, true
    }

    s.values[key] = newVal
    return newVal, false

2.源码阅读

源码文件开头定义了几个常量

    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving

这几个常量分别代表锁的几种状态:被加锁、正常工作、饥饿

2.1Mutex

Mutex是对Locker的实现

type Mutex struct {
    state int32
    sema  uint32
}

type Locker interface {
    Lock()
    Unlock()
}
  • state是控制锁状态的核心
    加锁解锁就是把state修改为某个值
  • sema是用来处理沉睡、唤醒的信号量 依赖于两个runtime调用:
    • runtime_SemacquireMutex: sema+1并挂起goroutine
    • runtime_Semrelease : sema-1 并唤醒一个g

2.1.1加锁和解锁

首先明确锁是由协程g来抢的
Lock方法

// mutexLocked = 1
func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        //这部分不重要跳过就行
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    m.lockSlow()
}

compareAndSwapInt32就是交换值等价于以下代码

    if m.state== 0 {
        m.state = 1
        return true
    }
    return false

所以Lock检查锁的状态state:

  • 为0时,即没有人持有锁,将它设置为1,表示该g持有这把锁
  • 不为0,调用lockSlow,g进入自旋状态尝试拿锁

当一个线程尝试获取某一把锁,若此时锁已经被占领(state不为0),那么该线程无法获取到这把锁,该线程会等待,隔一段时间后再次尝试获取,这种采用循环 加锁->等待的机制被称为自旋锁。
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻

lockSlow过于复杂,这里简化一下

//mutexLocked = 1 
//mutexStarving = 4
func (m *Mutex) lockSlow() {
    old := m.state
    iter := 0
    for {
//判断当前g能否进入自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
//进入自旋并更新值,
//自旋能抢到锁就直接退出了,这是CPU层面控制的
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
//自旋次数满了之后,失败不能继续自旋,执行下面代码
//更新state字段中存储的不同信息,尝试去获取锁
//state是int32类型,占32位4个字节,下面都是关于字节的运算
// new为锁的新状态
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
//计算新的状态后使用CAS函数更新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
//第一种情况:自旋完后获取到锁
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }  
//第二种情况:没抢到进入等待队列
//该方法会不断尝试获取锁并陷入休眠等待信号量的释放
//一旦当前可以获取信号量则立即返回,被锁住的代码执行即sync.Mutex.Lock()
//这也是一个CPU层面执行的代码,即使被唤醒也要和新来的g抢锁
//抢锁规则取决于锁的模式
//饥饿模式下该g会直接获得锁
// 正常模式下公平竞争
//抢不到再一次入队等待信号量
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//根据等待时间来修改锁的状态,一般超过1ms就进入饥饿模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
//饥饿模式下不管新来的g,只消耗队列中的g
//队列中的g等待时间小于1ms也会切回正常模式
            if old&mutexStarving != 0 {
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果等待队列中只存在当前一个g,修改锁状态,退出饥饿模式
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }

runtime_canSpin返回true的条件为:
1.运行在多cpu机器上面
2.当前g为了获取锁进入自旋的次数小于4次
3.当前机器至少存在一个正在运行的处理器P且处理的运行队列为空
runtime_doSpin()是执行CPU层面的指令,该指令会占用并消耗CPU时间

func sync_runtime_canSpin(i int) bool {
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

小结一下加锁:
1.初始化状态时,修改Mutex.state = mutexLocked加锁

  1. 当新来的g发现锁被锁住mutexLocked且在普通模式下,进入自旋,消耗CPU时间等待锁释放。自旋是拿锁的一种方式是快路径,通过控制次数来退出自旋,自旋失败后加入等待队列,这是慢路径
    3.被唤醒后没人持有锁就直接给你,否则抢锁,抢锁是不公平的!,根据锁的模式来决定,饥饿模式mutexStarving就直接给他,正常模式就公平的和新来的g(在自旋中的g)抢锁,没抢到就回去重新排队,等待时间决定了是否进入饥饿模式,一般超过1ms,如果当前g是最后一个等待的,或者等待时间小于1ms切回正常模式
    抢锁

解锁unLock

//mutexLocked = 1
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}
  • 返回0就说明解锁成功了
  • 不为0就调用unlockSlow ,为什么会不为0,因为锁有正常模式和饥饿模式都会修改Mutex.state的值呀
func (m *Mutex) unlockSlow(new int32) {
    if new&mutexStarving == 0 {
        //处理正常模式
    } else {
        //处理饥饿模式
        runtime_Semrelease(&m.sema, true, 1)
    }
}

2.2RWMutex

RWMutex 跟cancelCtx类似对Mutex使用装饰模式

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}
  • sem见过了,用于等待读写操作
  • readerCount 正在执行读的操作数量
  • readerWait 正在写被阻塞时等待的读操作数量

2.2.1写锁的加锁和解锁

调用w.Lock 用互斥锁的形式阻塞后续操作,因为互斥锁被获取,后续g获取时只能自旋或排队

//readerCount atomic.Int32
func (rw *RWMutex) Lock() {

    rw.w.Lock()
    // 原子操作方式获取持有读锁的数量
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
//如果其他g持有该互斥锁的读锁,
//进入等待等所有读操作结束后发送释放信号量,唤醒该g
    if r != 0 && rw.readerWait.Add(r) != 0 {
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

解锁,也是对互斥锁的解锁方法进行封装

func (rw *RWMutex) Unlock() {
// 因为前面加锁时减去了rwmutexMaxReaders
// 这里把它加回来变为正数   
    r := rw.readerCount.Add(rwmutexMaxReaders)
// 循环释放因为读锁陷入等待的g,
//读锁之间不冲突,不竞争,谁先醒都无所谓
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()

}

2.2.2读锁的解锁和加锁

加锁,通过上面对写锁的学习我们只读,写锁在加锁时会将readerCount设置为负数(将自己加入),这里负数就说明存在写锁,该g进入休眠等待

func (rw *RWMutex) RLock() {
    if rw.readerCount.Add(1) < 0 {
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

释放读锁就减1,因为读锁会卡住写锁加锁(这里实际是让第一个拿到写锁的g进入睡眠),因此当readerCount为负数需要发起唤醒信号量(大于等于0说明解锁成功)

func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
        rw.rUnlockSlow(r)
    }

}

func (rw *RWMutex) rUnlockSlow(r int32) {
//减少写锁等待的读锁数量,唤醒第一个拿到写锁的协程g
    if rw.readerWait.Add(-1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

小结:

  • 实际上只有写锁才是真正对互斥锁的操作
  • 读锁大多负责休眠和唤醒g,不碰互斥锁,因此读锁之间不互相竞争

2.3 trylock

RW的trylock都是对Mutex的trylock方法进行封装

func (m *Mutex) TryLock() bool {
    old := m.state
    if old&(mutexLocked|mutexStarving) != 0 {
        return false
    }

    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
        return false
    }

    return true
}

就是进行一次的尝试修改 state,也不执行lockslow进入自旋

3.锁的错误使用场景

3.1死锁deadlock

例子1:锁不只有一把,一个协程在已经对一个资源A加锁的情况下,尝试对另一个资源B加锁,资源B已经被另一个协程上锁,好死不死,掌握资源B锁的协程也要对资源A加锁,就陷入了两个
协程互相无限等待释对方放锁的情况

type Student struct {
    sync.RWMutex
    ID   int
    Name string
}

func TestDeadLock(t *testing.T) {
    s1 := &Student{ID: 1, Name: "s1"}
    s2 := &Student{ID: 2, Name: "s2"}
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        s1.Lock()
        defer s1.Unlock()
        time.Sleep(time.Second)
        t.Log("s1名字叫:", s1.Name)
        t.Log("我要去获取s2的名字")
        s2.Lock()
        t.Log(s2.Name)
        s2.Unlock()
        wg.Done()
    }()
    go func() {
        s2.Lock()
        defer s2.Unlock()
        time.Sleep(time.Second)
        t.Log("s2名字叫:", s2.Name)
        t.Log("我要去获取s1的名字")
        s1.Lock()
        t.Log(s1.Name)
        s1.Unlock()
        wg.Done()
    }()
    wg.Wait()

}

触发死锁
解决办法:换成原子操作sync.atomic

例子2 :锁只有一把, 函数A ->对资源A加锁-> 调用函数B,函数B也要对A加锁,那么就要等待A解锁,两个也是互相等待,即对一把锁重复上锁,也叫对锁的重入

func A(s *Student) {
    s.Lock()
    defer s.Unlock()
    B(s)
}

func B(s *Student) {
    s.Lock()
    defer s.Unlock()
    fmt.Println(s.Name)
}
func TestChanDeadlock(t *testing.T) {
    s := &Student{Name: "s"}
    A(s)
}

原因我们阅读过源码知道,锁上面不记录持有锁的协程信息,只修改state

3.2解锁和上锁不在同一个方法体当中

就怕万一忘记释放锁,最好的办法还是在上锁后加defer解锁,即使发生panic也能确保解锁

3.3尝试对锁进行拷贝

也是类似于上面死锁案例2的情况

func foo(s Student) {
    s.Lock()
    defer s.Unlock()
    fmt.Println("in foo")
}

func TestFoo(t *testing.T) {
    var c Student
    c.Lock()
    defer c.Unlock()
    foo(c)
}

foo不知道已经上锁了,尝试用lock来获取锁(但是没有其他协程来释放这个赋值的锁),结果主协程被完全阻塞

3.4 go vet工具

利用 vet检测死锁

go vet demo.go 
# command-line-arguments
./demo.go:20:9: call of foo copies lock value: command-line-arguments.Counter
./demo.go:25:12: foo passes lock by value: command-line-arguments.Counter

参考

1.同步原语与锁
2.双重检查
3.自旋锁
4.极客go并发编程实战

相关文章

网友评论

      本文标题:Go阅读-Sync包-同步原语与锁Mutex

      本文链接:https://www.haomeiwen.com/subject/hqonrdtx.html