版本
- go version 1.10.1
使用方法
//创建Cond
cond := sync.NewCond(new(sync.Mutex))
//等待唤醒
cond.L.Lock()
cond.Wait()
//唤醒一个
cond.Signal()
//唤醒所有
cond.Broadcast()
数据结构
/*
package: sync
file: cond.go
line: 21
*/
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
-
noCopy:noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制。
/* package: sync file: cond.go line: 94 */ type noCopy struct{} func (*noCopy) Lock() {}
-
L:实现了Locker接口的锁对象,通常使用Mutex或RWMutex。
/* package: sync file: mutex.go line: 31 */ type Locker interface { Lock() Unlock() }
-
notify:notifyList对象,维护等待唤醒的goroutine队列,使用链表实现。
/* package: sync file: runtime.go line: 29 */ type notifyList struct { wait uint32 notify uint32 lock uintptr head unsafe.Pointer tail unsafe.Pointer }
-
checker:copyChecker对象,实际上是uintptr对象,保存自身对象地址。
/* package: sync file: cond.go line: 79 */ type copyChecker uintptr func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } }
- 检查当前checker的地址是否等于保存在checker中的地址
- 对checker进行原子CAS操作,将checker当前地址赋值给为空的checker
- 重复操作1,防止在进行1和2的时候,有其他gorountine并发的修改了checker值
- 若1、2、3都不满足,则表示当前cond是复制的,抛出panic
check方法在第一次调用的时候,会将checker对象地址赋值给checker,也就是将自身内存地址赋值给自身。
再次调用checker方法的时候,会将当前checker地址的值与保存的checker地址值进行比较,若不相同则表示当前checker的地址不是第一次调用check方法时候的地址,即cond对象被复制了,checker被重新分配了内存地址。
方法
NwoCond
/*
package: sync
file: cond.go
line: 32
*/
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
NewCond方法传入一个实现了Locker接口的对象,返回一个新的Cond对象指针,保证在多goroutine使用cond的时候,持有的是同一个实例。
Wait
/*
package: sync
file: cond.go
line: 52
*/
func (c *Cond) Wait() {
c.checker.check() //step 1
t := runtime_notifyListAdd(&c.notify) //step 2
c.L.Unlock() //step 3
runtime_notifyListWait(&c.notify, t) //step 4
c.L.Lock() //step 5
}
- check检查,保证cond在第一次使用后没有被复制
- 将notify队列的等待数加1,并将之前的等待数返回
/* package: runtime file: sema.go line: 476 */ func notifyListAdd(l *notifyList) uint32 { return atomic.Xadd(&l.wait, 1) - 1 }
- 释放锁,因此在调用Wait方法前,必须保证获取到了cond的锁,否则会报错
- 将当前goroutine挂起,等待唤醒信号
a. 锁定notify队列/* package: runtime file: sema.go line: 485 */ func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) //step a if less(t, l.notify) { //step b unlock(&l.lock) return } s := acquireSudog() //step c s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { //step d l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3) //step e if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }
b. 如果notif队列的等待数小于唤醒数 表示当前goroutine不需要再进行等待,则解锁notify队列,直接返回
c. 获取当前goroutine,设置相关参数,将当前等待数赋值给ticket
d. 将当前goroutine加入到notify队列中
e. 将当前goroutine挂起,等待唤醒信号
- gorountine被唤醒,重新获取锁
Signal
/*
package: sync
file: cond.go
line: 64
*/
func (c *Cond) Signal() {
c.checker.check() //step 1
runtime_notifyListNotifyOne(&c.notify) //step 2
}
-
check检查,保证cond在第一次使用后没有被复制
-
顺序唤醒一个等待的gorountine
/* package: runtime file: sema.go line: 485 */ func notifyListNotifyOne(l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { //step a return } lock(&l.lock) //step b t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { //step c if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock) }
a. 如果notify队列的等待数等于唤醒数,表示没有新的goroutine在等待队列上,不需要唤醒任何goroutine
b. 锁定notify队列,对队列进行双检查,查看是否有新的goroutine需要被唤醒,若无则将唤醒数加1
c. 遍历等待唤醒的goroutine队列,唤醒ticket等于唤醒数的goroutine,即顺序唤醒一个最先加入等待队列的goroutine
Broadcast
/*
package: sync
file: cond.go
line: 73
*/
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
-
check检查,保证cond在第一次使用后没有被复制
-
唤醒所有gorountine
/* package: runtime file: sema.go line: 485 */ func notifyListNotifyAll(l *notifyList) { if atomic.Load(&l.wait) == atomic.Load(&l.notify) { //step a return } lock(&l.lock) //step b s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) for s != nil { //step c next := s.next s.next = nil readyWithTime(s, 4) s = next } }
a. 如果notify队列的等待数等于唤醒数,表示没有新的goroutine在等待队列上,不需要唤醒任何goroutine
b. 锁定notify队列,清空等待唤醒队列,将等待数赋值给唤醒数
c. 遍历等待唤醒的gorountine队列,将所有goroutine唤醒
总结
-
Cond不能被复制:Cond在内部持有一个等待队列,这个队列维护所有等待在这个Cond的goroutine。因此若这个Cond允许值传递,则这个队列在值传递的过程中会进行复制,导致在唤醒goroutine的时候出现错误。
-
顺序唤醒: notifyList对象持有两个无限自增的字段wait和notify,wait字段在有新的goroutine等待的时候加1,notify字段在有新的唤醒信号的时候加1。在有新的goroutine加入队列的时候,会将当前wait赋值给goroutine的ticket,唤醒的时候会唤醒ticket等于notify的gourine。另外,当wait==notify时表示没有goroutine需要被唤醒,wait>notify时表示有goroutine需要被唤醒,waity恒大于等于notify。
网友评论