美文网首页
golang sync/semaphore 源代码阅读--精细并

golang sync/semaphore 源代码阅读--精细并

作者: guonaihong | 来源:发表于2020-03-07 12:51 被阅读0次

    sync/semaphore用途

    你的同事和你说要限制服务的并发次数,但是每个API要求的算力还不一样,如果一样还挺好做的,make一个chan,请求来的时候写一个,请求走的时候读取这个chan。我天,算力不一个的API在一个服务里面这怎么限制?
    怎么做,如果用上sync/semaphore就行。

    一个快速demo(如果不理解,可以下方留言)

    下面的代码模拟了一个API服务同时有访问mysql和cache的接口,其中cache接口效率是mysql的两倍。
    mysql假设要1s才能处理完,那cache只需500ms。ok问题来,如果我们的机器算力是10,只能处理纯粹的5个mysql请求,或者10个cache请求,怎么让请求,在我们的算力要求内 可以同时处理mysql和cache呢?这时我们的主脚登场,semaphore,来点掌声。

    使用流程:

    • 声明总权重数: s := semaphore.NewWeighted(10),申请的总权重数是10
    • 申请你的权重,不同的业务使用不同的权重,比如这里的mysql使用了2,cached使用了1.: b := c.TryAcquire(c.weighted),从总权重数减去你申请的
    • 运行一些业务,这里使用sleep模拟真实业务运行时间: time.Sleep(time.Second)
    • 释放你的权重,有借有还,再借不难: c.Release(c.weighted),恢复权重数至你申请之前

    这里核心思想已经表达,通过计数平衡不同算力的请求,已经分配出去的权重,不还回之前c.TryAcquire调用失败,明白这点,我们后面看下代码细节

    package main
    
    import (
        "fmt"
        "golang.org/x/sync/semaphore"
        "sync"
        "sync/atomic"
        "time"
    )
    
    type Mysql struct {
        weighted int64
        *semaphore.Weighted
    }
    
    type Cache struct {
        weighted int64
        *semaphore.Weighted
    }
    
    func main() {
        var wg sync.WaitGroup
        s := semaphore.NewWeighted(10)
    
        m := Mysql{weighted: 2, Weighted: s} //mysql比较慢,服务只能并发5次
        c := Cache{weighted: 1, Weighted: s} //cache比较块,服务只能并发10次
    
        wg.Add(16)
    
        // 模拟并发请求
        var (
            successMysql int32 = 8
            successCache int32 = 8
        )
    
        defer func() {
            fmt.Printf("success mysql:%d, sucess cached:%d: total weighted:%d\n", successMysql, successCache, successMysql*2+successCache)
        }()
        defer wg.Wait()
    
        for i := 0; i < 8; i++ {
            go func() {
                defer wg.Done()
                b := c.TryAcquire(c.weighted)
                if !b {
                    atomic.AddInt32(&successCache, -1)
                    fmt.Printf("cache acquired fail:%d\n", c.weighted)
                    return
                }
    
                time.Sleep(time.Second)
                c.Release(c.weighted)
            }()
        }
    
        // 模拟并发请求
        for i := 0; i < 8; i++ {
            go func() {
                defer wg.Done()
    
                b := m.TryAcquire(m.weighted)
                if !b {
                    atomic.AddInt32(&successMysql, -1)
                    fmt.Printf("mysql acquired fail:%d\n", m.weighted)
                    return
                }
                time.Sleep(time.Second)
    
                m.Release(m.weighted)
            }()
        }
    
    }
    
    

    深入数据结构Weighted+waiter

    • n int64
    • ready chan<- struct{}
    type waiter struct {
        n     int64
        ready chan<- struct{} // Closed when semaphore acquired.
    }
    
    • size int64初始化分配的总的权重数
    • ucr int64当前留存权重数
    • my sync.Mutex 锁,可以安全的修改list.List内容
    • waiters list.List 链表,存放waiter结构
    // Weighted provides a way to bound concurrent access to a resource.
    // The callers can request access with a given weight.
    type Weighted struct {
        size    int64
        cur     int64
        mu      sync.Mutex
        waiters list.List
    }
    

    Acquire接口--申请权重

    • 第一次申请权重,如果余额充足直接返回。这里的s.waiters.Len()是等待队列长度,就跟买高铁票一样,有票直接买,没票可以预订。
        if s.size-s.cur >= n && s.waiters.Len() == 0 {
            s.cur += n
            s.mu.Unlock()
            return nil
        }
    
    
    • 永远不会成功的情况,当前申请的权重大于初始分配的总权重
      把调用端挂起来,让他知道自己错了
    if n > s.size {
            // Don't make other Acquire calls block on one that's doomed to fail.
            s.mu.Unlock()
            <-ctx.Done()
            return ctx.Err()
        }
    
    • 放到等待队列,
    • list.List是go提供的双向链表,把需要等待的元素s.waiters.PushBack(w)加到尾部,这里可能有人要问了,一般不都是用slice或者map存储数据的嘛,这里为啥要用双向链表。原因slice是连续空间,不适合做中间删除的操作,代价太高,把被删除前面和被删除后面的数据copy生成一个新的slice,这里面的内存分配都好几次,太浪费。相反list.List就没有这个问题,只是重新赋值4个指针。
        ready := make(chan struct{})
        w := waiter{n: n, ready: ready}
        elem := s.waiters.PushBack(w)
    
    • 等待ctx被cancel或者有权重被回收ready
      这里面有意思的地方是,当context被cancel时,这时有可用权重,会把cancel错误忽略掉
    select {
        case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                // Acquired the semaphore after we were canceled.  Rather than trying to
                // fix up the queue, just pretend we didn't notice the cancelation.
                err = nil
            default:
                s.waiters.Remove(elem)
            }
            s.mu.Unlock()
            return err
    
        case <-ready:
            return nil
        }
    
    • Acquire接口全部源代码,分析可看上面
    func (s *Weighted) Acquire(ctx context.Context, n int64) error {
        s.mu.Lock()
        if s.size-s.cur >= n && s.waiters.Len() == 0 {
            s.cur += n
            s.mu.Unlock()
            return nil
        }
    
        if n > s.size {
            // Don't make other Acquire calls block on one that's doomed to fail.
            s.mu.Unlock()
            <-ctx.Done()
            return ctx.Err()
        }
    
        ready := make(chan struct{})
        w := waiter{n: n, ready: ready}
        elem := s.waiters.PushBack(w)
        s.mu.Unlock()
    
        select {
        case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                // Acquired the semaphore after we were canceled.  Rather than trying to
                // fix up the queue, just pretend we didn't notice the cancelation.
                err = nil
            default:
                s.waiters.Remove(elem)
            }
            s.mu.Unlock()
            return err
    
        case <-ready:
            return nil
        }
    }
    

    Release接口,回收权重

    • 有效性检查,如果调用方用错,直接panic
        s.cur -= n
        if s.cur < 0 {
            s.mu.Unlock()
            panic("semaphore: released more than held")
        }
    
    • 检查库存权重,如果够的话,这时就释放waiter队列里面的一些元素,这时一些阻塞的Acquire会得到解放
    next := s.waiters.Front()
            if next == nil {
                break // No more waiters blocked.
            }
    
            w := next.Value.(waiter)
            if s.size-s.cur < w.n {
                // Not enough tokens for the next waiter.  We could keep going (to try to
                // find a waiter with a smaller request), but under load that could cause
                // starvation for large requests; instead, we leave all remaining waiters
                // blocked.
                //
                // Consider a semaphore used as a read-write lock, with N tokens, N
                // readers, and one writer.  Each reader can Acquire(1) to obtain a read
                // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
                // of the readers.  If we allow the readers to jump ahead in the queue,
                // the writer will starve — there is always one token available for every
                // reader.
                break
            }
    
            s.cur += w.n
            s.waiters.Remove(next)
            close(w.ready)
    
    • 该接口完整代码,分析可看上面
    func (s *Weighted) Release(n int64) {
        s.mu.Lock()
        s.cur -= n
        if s.cur < 0 {
            s.mu.Unlock()
            panic("semaphore: released more than held")
        }
        for {
            next := s.waiters.Front()
            if next == nil {
                break // No more waiters blocked.
            }
    
            w := next.Value.(waiter)
            if s.size-s.cur < w.n {
                // Not enough tokens for the next waiter.  We could keep going (to try to
                // find a waiter with a smaller request), but under load that could cause
                // starvation for large requests; instead, we leave all remaining waiters
                // blocked.
                //
                // Consider a semaphore used as a read-write lock, with N tokens, N
                // readers, and one writer.  Each reader can Acquire(1) to obtain a read
                // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
                // of the readers.  If we allow the readers to jump ahead in the queue,
                // the writer will starve — there is always one token available for every
                // reader.
                break
            }
    
            s.cur += w.n
            s.waiters.Remove(next)
            close(w.ready)
        }
        s.mu.Unlock()
    }
    

    相关文章

      网友评论

          本文标题:golang sync/semaphore 源代码阅读--精细并

          本文链接:https://www.haomeiwen.com/subject/rimmlhtx.html