美文网首页Tips
记录一次“翻车”

记录一次“翻车”

作者: Sun东辉 | 来源:发表于2022-06-15 07:49 被阅读0次

故事的背景来自于一道面试题,题目是这样的:

你需要实现的目标函数 target

其中:

  • @param id 是一个随机字符串,例如 6A10A467-2842-A460-5353-DBE7D41986B7;
  • @param job 函数是一个耗时操作,例如:去数据库 query 数据,可能耗时 500ms;
  • @return count 表示在执行本次 job 期间有多少相同的 id 调用过 target

关键特性:相同 id 并发调用 target,target 只执行一次 job 函数,举例来说:第一个线程传入 id 为 "id-xxx" 调用 target,job 函数开始执行,在此期间,又有其他 4 个线程以 id 为 "id-xxx" 调用了 target;在此期间,只有一个 job 函数执行,等它执行完成后,上述 5 个线程均收到返回值 count=5,表示这段时间有 5 个相同 id 进行了调用;

代码长这样:

package main

import (
    "sync"
    "time"
    "fmt"
)

func target(key string, job func()) (count int) {}
//用来模拟 job 函数的变量
//不要修改
var (
    counter     int
    counterLock sync.Mutex
)

//用来模拟耗时,时间不固定,实现 target 时不能依赖此时间
//不要修改
const (
    mockJobTimeout = 300 * time.Millisecond
    tolerate       = 30 * time.Millisecond
)

//测试用的 job 函数,是一个计数器,用来模拟耗时操作
//不要修改
func mockJob() {
    time.Sleep(mockJobTimeout)
    counterLock.Lock()
    counter++
    counterLock.Unlock()
}

//相同 id 并行调用
//不要修改
func testCaseSampleIdParallel() {
    counter = 0 //重置计数器
    const (
        id     = "CBD225E1-B7D9-BE76-9735-1D0A9B62EE4D"
        repeat = 5 //用来模拟相同 id 的多次重复调用,调用次数不固定,实现 target 时不能依赖此调用次数
    )
    wg := sync.WaitGroup{}
    wg.Add(repeat)
    tStart := time.Now()
    for i := 0; i < repeat; i++ {
        go func() {
            count := target(id, mockJob)
            wg.Done()
            if count != repeat {
                panic(fmt.Sprintln("[testCaseSampleIdConcurrence] count:", count, "!= repeat:", repeat))
            }
        }()
    }
    wg.Wait()
    if counter != 1 { //应该只调用了一次 job 函数
        panic(fmt.Sprintln("[testCaseSampleIdConcurrence] counter:", counter, "!= 1"))
    }
    var (
        tDelta  = time.Now().Sub(tStart)
        tExpect = mockJobTimeout + tolerate
    )
    if tDelta > tExpect {
        panic(fmt.Sprintln("[testCaseRandomId] timeout", tDelta, ">", tExpect))
    }
}

//相同 id 串行调用
//不要修改
func testCaseSampleIdSerial() {
    counter = 0
    const (
        id     = "3E5A5C8D-B254-383B-4F33-F6927578FD11"
        repeat = 2
    )
    tStart := time.Now()
    for i := 0; i < repeat; i++ {
        count := target(id, mockJob)
        if count != 1 {
            panic(fmt.Sprintln("[testCaseSampleIdSerial] count:", count, "!= 1"))
        }
    }
    if counter != repeat { //虽然是相同 id,但因为是串行调用,应该执行 repeat 次 job 函数
        panic(fmt.Sprintln("[testCaseSampleIdSerial] counter:", counter, "!= repeat:", repeat))
    }
    var (
        tDelta  = time.Now().Sub(tStart)
        tExpect = repeat*mockJobTimeout + tolerate
    )
    if tDelta > tExpect {
        panic(fmt.Sprintln("[testCaseSampleIdSerial] timeout", tDelta, ">", tExpect))
    }
}

//不同 id 并行调用
//不要修改
func testCaseRandomId() {
    counter = 0 //重置计数器
    ids := []string{
        "id-3",
        "id-3",
        "id-3",

        "id-2",
        "id-2",

        "id-1",
    }
    wg := sync.WaitGroup{}
    wg.Add(len(ids))
    tStart := time.Now()
    for _, id := range ids {
        id := id
        go func() {
            count := target(id, mockJob)
            wg.Done()
            var expectedCount int
            switch id {
            case "id-1":
                expectedCount = 1
            case "id-2":
                expectedCount = 2
            case "id-3":
                expectedCount = 3
            }
            if count != expectedCount {
                panic(fmt.Sprintln("[testCaseRandomId] count:", count, "!= expectedCount:", expectedCount, "id:", id))
            }
        }()
    }
    wg.Wait()
    if counter != 3 { //3个不同的 id 同时并发调用,job 函数应该执行 3 次
        panic(fmt.Sprintln("[testCaseSampleIdConcurrence] counter:", counter, "!= 3"))
    }
    var (
        tDelta  = time.Now().Sub(tStart)
        tExpect = 3*mockJobTimeout + tolerate
    )
    if tDelta > tExpect {
        panic(fmt.Sprintln("[testCaseRandomId] timeout", tDelta, ">", tExpect))
    }
}

func main() {
    const repeat = 50
    for i := 0; i < repeat; i++ {
        testCaseSampleIdParallel()
        testCaseSampleIdSerial()
        testCaseRandomId()
        fmt.Print("\\r", i+1, "/", repeat, " ✔ ")
    }
    fmt.Println("🎉 All Tests Passed!")
}

说实话,刚看到这个题目,我是有点儿高兴的,这不正是 groupcachesingleflight 的逻辑吗?于是,复制,粘贴,提交,一气呵成。

第一次实现的 target 长这样:

type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

type Group struct {
    mu  *sync.Mutex
    m   map[string]*call
    t   map[string]int
    tMu *sync.RWMutex
}

var (
    g         *Group
    timesLock sync.RWMutex
)

func target(key string, job func()) (count int) {
    if g == nil {
        g = &Group{
            mu:  &counterLock,
            m:   make(map[string]*call),
            t:   make(map[string]int),
            tMu: &timesLock,
        }
    }

    g.mu.Lock()
    if c, ok := g.m[key]; ok {
        g.tMu.Lock()
        g.t[key]++
        g.tMu.Unlock()

        g.mu.Unlock()
        c.wg.Wait()
        return g.t[key]
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.t[key] = 1
    g.mu.Unlock()

    job()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return g.t[key]
}

甚至连 call 数据结构里没有用的参数都一起拷贝过来了,但紧接着就收到了回复,具体的内容就不说了,结论就是方案有问题,存在 Data Race。纳尼?我重新执行了一下,果然,执行不是必然成功的,是有一定的失败率的,而失败的原因正是由于 Data Race。查看 Data Race ,执行如何命令即可:

go run -race main.go 

面试没通过,问题还是要查清楚的,于是我仔细看了看,原来是 map 读写时的并发问题,修改如下:

type call struct {
    wg sync.WaitGroup
}

type statStruct struct {
    stat     map[string]int
    statRWMu *sync.RWMutex
}

type Group struct {
    mu *sync.Mutex
    m  map[string]*call
    s  *statStruct
}

var (
    g          Group
    globalLock sync.RWMutex
)

func target(key string, job func()) (count int) {
    globalLock.Lock()
    if g.mu == nil {
        g = Group{
            mu: &counterLock,
            m:  make(map[string]*call),
            s: &statStruct{
                stat:     make(map[string]int),
                statRWMu: &sync.RWMutex{},
            },
        }
    }
    globalLock.Unlock()

    g.mu.Lock()
    if c, ok := g.m[key]; ok {
        g.s.statRWMu.Lock()
        g.s.stat[key]++
        g.s.statRWMu.Unlock()

        g.mu.Unlock()
        c.wg.Wait()
        return g.s.stat[key]
    }

    c := new(call)
    c.wg.Add(1)
    g.m[key] = c

    g.s.statRWMu.Lock()
    g.s.stat[key] = 1
    g.s.statRWMu.Unlock()

    g.mu.Unlock()

    job()
    c.wg.Done()

    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()

    return g.s.stat[key]
}

再次运行,成功!!!

虽然现在回过头来看这个问题,实在是一个很小的问题,但是却足足花费了我好几个小时,这还是只是一个 Demo,如果放在偌大的生产项目中,不知道要花费多少时间来修复这个问题了!

最后,声色俱厉,摇旗呐喊,并发读写一定要加锁!!!

相关文章

网友评论

    本文标题:记录一次“翻车”

    本文链接:https://www.haomeiwen.com/subject/rzazmrtx.html