故事的背景来自于一道面试题,题目是这样的:
你需要实现的目标函数 target
其中:
- @param id 是一个随机字符串,例如 6A10A467-2842-A460-5353-DBE7D41986B7;
- @param job 函数是一个耗时操作,例如:去数据库 query 数据,可能耗时 500ms;
- @return count 表示在执行本次 job 期间有多少相同的 id 调用过 target
关键特性:相同 id 并发调用 target,target 只执行一次 job 函数,举例来说:第一个线程传入 id 为 "id-xxx" 调用 target,job 函数开始执行,在此期间,又有其他 4 个线程以 id 为 "id-xxx" 调用了 target;在此期间,只有一个 job 函数执行,等它执行完成后,上述 5 个线程均收到返回值 count=5,表示这段时间有 5 个相同 id 进行了调用;
代码长这样:
package main
import (
"sync"
"time"
"fmt"
)
func target(key string, job func()) (count int) {}
//用来模拟 job 函数的变量
//不要修改
var (
counter int
counterLock sync.Mutex
)
//用来模拟耗时,时间不固定,实现 target 时不能依赖此时间
//不要修改
const (
mockJobTimeout = 300 * time.Millisecond
tolerate = 30 * time.Millisecond
)
//测试用的 job 函数,是一个计数器,用来模拟耗时操作
//不要修改
func mockJob() {
time.Sleep(mockJobTimeout)
counterLock.Lock()
counter++
counterLock.Unlock()
}
//相同 id 并行调用
//不要修改
func testCaseSampleIdParallel() {
counter = 0 //重置计数器
const (
id = "CBD225E1-B7D9-BE76-9735-1D0A9B62EE4D"
repeat = 5 //用来模拟相同 id 的多次重复调用,调用次数不固定,实现 target 时不能依赖此调用次数
)
wg := sync.WaitGroup{}
wg.Add(repeat)
tStart := time.Now()
for i := 0; i < repeat; i++ {
go func() {
count := target(id, mockJob)
wg.Done()
if count != repeat {
panic(fmt.Sprintln("[testCaseSampleIdConcurrence] count:", count, "!= repeat:", repeat))
}
}()
}
wg.Wait()
if counter != 1 { //应该只调用了一次 job 函数
panic(fmt.Sprintln("[testCaseSampleIdConcurrence] counter:", counter, "!= 1"))
}
var (
tDelta = time.Now().Sub(tStart)
tExpect = mockJobTimeout + tolerate
)
if tDelta > tExpect {
panic(fmt.Sprintln("[testCaseRandomId] timeout", tDelta, ">", tExpect))
}
}
//相同 id 串行调用
//不要修改
func testCaseSampleIdSerial() {
counter = 0
const (
id = "3E5A5C8D-B254-383B-4F33-F6927578FD11"
repeat = 2
)
tStart := time.Now()
for i := 0; i < repeat; i++ {
count := target(id, mockJob)
if count != 1 {
panic(fmt.Sprintln("[testCaseSampleIdSerial] count:", count, "!= 1"))
}
}
if counter != repeat { //虽然是相同 id,但因为是串行调用,应该执行 repeat 次 job 函数
panic(fmt.Sprintln("[testCaseSampleIdSerial] counter:", counter, "!= repeat:", repeat))
}
var (
tDelta = time.Now().Sub(tStart)
tExpect = repeat*mockJobTimeout + tolerate
)
if tDelta > tExpect {
panic(fmt.Sprintln("[testCaseSampleIdSerial] timeout", tDelta, ">", tExpect))
}
}
//不同 id 并行调用
//不要修改
func testCaseRandomId() {
counter = 0 //重置计数器
ids := []string{
"id-3",
"id-3",
"id-3",
"id-2",
"id-2",
"id-1",
}
wg := sync.WaitGroup{}
wg.Add(len(ids))
tStart := time.Now()
for _, id := range ids {
id := id
go func() {
count := target(id, mockJob)
wg.Done()
var expectedCount int
switch id {
case "id-1":
expectedCount = 1
case "id-2":
expectedCount = 2
case "id-3":
expectedCount = 3
}
if count != expectedCount {
panic(fmt.Sprintln("[testCaseRandomId] count:", count, "!= expectedCount:", expectedCount, "id:", id))
}
}()
}
wg.Wait()
if counter != 3 { //3个不同的 id 同时并发调用,job 函数应该执行 3 次
panic(fmt.Sprintln("[testCaseSampleIdConcurrence] counter:", counter, "!= 3"))
}
var (
tDelta = time.Now().Sub(tStart)
tExpect = 3*mockJobTimeout + tolerate
)
if tDelta > tExpect {
panic(fmt.Sprintln("[testCaseRandomId] timeout", tDelta, ">", tExpect))
}
}
func main() {
const repeat = 50
for i := 0; i < repeat; i++ {
testCaseSampleIdParallel()
testCaseSampleIdSerial()
testCaseRandomId()
fmt.Print("\\r", i+1, "/", repeat, " ✔ ")
}
fmt.Println("🎉 All Tests Passed!")
}
说实话,刚看到这个题目,我是有点儿高兴的,这不正是 groupcache 里 singleflight 的逻辑吗?于是,复制,粘贴,提交,一气呵成。
第一次实现的 target 长这样:
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu *sync.Mutex
m map[string]*call
t map[string]int
tMu *sync.RWMutex
}
var (
g *Group
timesLock sync.RWMutex
)
func target(key string, job func()) (count int) {
if g == nil {
g = &Group{
mu: &counterLock,
m: make(map[string]*call),
t: make(map[string]int),
tMu: ×Lock,
}
}
g.mu.Lock()
if c, ok := g.m[key]; ok {
g.tMu.Lock()
g.t[key]++
g.tMu.Unlock()
g.mu.Unlock()
c.wg.Wait()
return g.t[key]
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.t[key] = 1
g.mu.Unlock()
job()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return g.t[key]
}
甚至连 call
数据结构里没有用的参数都一起拷贝过来了,但紧接着就收到了回复,具体的内容就不说了,结论就是方案有问题,存在 Data Race。纳尼?我重新执行了一下,果然,执行不是必然成功的,是有一定的失败率的,而失败的原因正是由于 Data Race。查看 Data Race ,执行如何命令即可:
go run -race main.go
面试没通过,问题还是要查清楚的,于是我仔细看了看,原来是 map
读写时的并发问题,修改如下:
type call struct {
wg sync.WaitGroup
}
type statStruct struct {
stat map[string]int
statRWMu *sync.RWMutex
}
type Group struct {
mu *sync.Mutex
m map[string]*call
s *statStruct
}
var (
g Group
globalLock sync.RWMutex
)
func target(key string, job func()) (count int) {
globalLock.Lock()
if g.mu == nil {
g = Group{
mu: &counterLock,
m: make(map[string]*call),
s: &statStruct{
stat: make(map[string]int),
statRWMu: &sync.RWMutex{},
},
}
}
globalLock.Unlock()
g.mu.Lock()
if c, ok := g.m[key]; ok {
g.s.statRWMu.Lock()
g.s.stat[key]++
g.s.statRWMu.Unlock()
g.mu.Unlock()
c.wg.Wait()
return g.s.stat[key]
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.s.statRWMu.Lock()
g.s.stat[key] = 1
g.s.statRWMu.Unlock()
g.mu.Unlock()
job()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return g.s.stat[key]
}
再次运行,成功!!!
虽然现在回过头来看这个问题,实在是一个很小的问题,但是却足足花费了我好几个小时,这还是只是一个 Demo,如果放在偌大的生产项目中,不知道要花费多少时间来修复这个问题了!
最后,声色俱厉,摇旗呐喊,并发读写一定要加锁!!!
网友评论