美文网首页
golang sync/semaphore 源代码阅读--精细并

golang sync/semaphore 源代码阅读--精细并

作者: guonaihong | 来源:发表于2020-03-07 12:51 被阅读0次

sync/semaphore用途

你的同事和你说要限制服务的并发次数,但是每个API要求的算力还不一样,如果一样还挺好做的,make一个chan,请求来的时候写一个,请求走的时候读取这个chan。我天,算力不一个的API在一个服务里面这怎么限制?
怎么做,如果用上sync/semaphore就行。

一个快速demo(如果不理解,可以下方留言)

下面的代码模拟了一个API服务同时有访问mysql和cache的接口,其中cache接口效率是mysql的两倍。
mysql假设要1s才能处理完,那cache只需500ms。ok问题来,如果我们的机器算力是10,只能处理纯粹的5个mysql请求,或者10个cache请求,怎么让请求,在我们的算力要求内 可以同时处理mysql和cache呢?这时我们的主脚登场,semaphore,来点掌声。

使用流程:

  • 声明总权重数: s := semaphore.NewWeighted(10),申请的总权重数是10
  • 申请你的权重,不同的业务使用不同的权重,比如这里的mysql使用了2,cached使用了1.: b := c.TryAcquire(c.weighted),从总权重数减去你申请的
  • 运行一些业务,这里使用sleep模拟真实业务运行时间: time.Sleep(time.Second)
  • 释放你的权重,有借有还,再借不难: c.Release(c.weighted),恢复权重数至你申请之前

这里核心思想已经表达,通过计数平衡不同算力的请求,已经分配出去的权重,不还回之前c.TryAcquire调用失败,明白这点,我们后面看下代码细节

package main

import (
    "fmt"
    "golang.org/x/sync/semaphore"
    "sync"
    "sync/atomic"
    "time"
)

type Mysql struct {
    weighted int64
    *semaphore.Weighted
}

type Cache struct {
    weighted int64
    *semaphore.Weighted
}

func main() {
    var wg sync.WaitGroup
    s := semaphore.NewWeighted(10)

    m := Mysql{weighted: 2, Weighted: s} //mysql比较慢,服务只能并发5次
    c := Cache{weighted: 1, Weighted: s} //cache比较块,服务只能并发10次

    wg.Add(16)

    // 模拟并发请求
    var (
        successMysql int32 = 8
        successCache int32 = 8
    )

    defer func() {
        fmt.Printf("success mysql:%d, sucess cached:%d: total weighted:%d\n", successMysql, successCache, successMysql*2+successCache)
    }()
    defer wg.Wait()

    for i := 0; i < 8; i++ {
        go func() {
            defer wg.Done()
            b := c.TryAcquire(c.weighted)
            if !b {
                atomic.AddInt32(&successCache, -1)
                fmt.Printf("cache acquired fail:%d\n", c.weighted)
                return
            }

            time.Sleep(time.Second)
            c.Release(c.weighted)
        }()
    }

    // 模拟并发请求
    for i := 0; i < 8; i++ {
        go func() {
            defer wg.Done()

            b := m.TryAcquire(m.weighted)
            if !b {
                atomic.AddInt32(&successMysql, -1)
                fmt.Printf("mysql acquired fail:%d\n", m.weighted)
                return
            }
            time.Sleep(time.Second)

            m.Release(m.weighted)
        }()
    }

}

深入数据结构Weighted+waiter

  • n int64
  • ready chan<- struct{}
type waiter struct {
    n     int64
    ready chan<- struct{} // Closed when semaphore acquired.
}
  • size int64初始化分配的总的权重数
  • ucr int64当前留存权重数
  • my sync.Mutex 锁,可以安全的修改list.List内容
  • waiters list.List 链表,存放waiter结构
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

Acquire接口--申请权重

  • 第一次申请权重,如果余额充足直接返回。这里的s.waiters.Len()是等待队列长度,就跟买高铁票一样,有票直接买,没票可以预订。
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

  • 永远不会成功的情况,当前申请的权重大于初始分配的总权重
    把调用端挂起来,让他知道自己错了
if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }
  • 放到等待队列,
  • list.List是go提供的双向链表,把需要等待的元素s.waiters.PushBack(w)加到尾部,这里可能有人要问了,一般不都是用slice或者map存储数据的嘛,这里为啥要用双向链表。原因slice是连续空间,不适合做中间删除的操作,代价太高,把被删除前面和被删除后面的数据copy生成一个新的slice,这里面的内存分配都好几次,太浪费。相反list.List就没有这个问题,只是重新赋值4个指针。
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
  • 等待ctx被cancel或者有权重被回收ready
    这里面有意思的地方是,当context被cancel时,这时有可用权重,会把cancel错误忽略掉
select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            // Acquired the semaphore after we were canceled.  Rather than trying to
            // fix up the queue, just pretend we didn't notice the cancelation.
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
  • Acquire接口全部源代码,分析可看上面
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    select {
    case <-ctx.Done():
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            // Acquired the semaphore after we were canceled.  Rather than trying to
            // fix up the queue, just pretend we didn't notice the cancelation.
            err = nil
        default:
            s.waiters.Remove(elem)
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
}

Release接口,回收权重

  • 有效性检查,如果调用方用错,直接panic
    s.cur -= n
    if s.cur < 0 {
        s.mu.Unlock()
        panic("semaphore: released more than held")
    }
  • 检查库存权重,如果够的话,这时就释放waiter队列里面的一些元素,这时一些阻塞的Acquire会得到解放
next := s.waiters.Front()
        if next == nil {
            break // No more waiters blocked.
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // Not enough tokens for the next waiter.  We could keep going (to try to
            // find a waiter with a smaller request), but under load that could cause
            // starvation for large requests; instead, we leave all remaining waiters
            // blocked.
            //
            // Consider a semaphore used as a read-write lock, with N tokens, N
            // readers, and one writer.  Each reader can Acquire(1) to obtain a read
            // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
            // of the readers.  If we allow the readers to jump ahead in the queue,
            // the writer will starve — there is always one token available for every
            // reader.
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
  • 该接口完整代码,分析可看上面
func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    if s.cur < 0 {
        s.mu.Unlock()
        panic("semaphore: released more than held")
    }
    for {
        next := s.waiters.Front()
        if next == nil {
            break // No more waiters blocked.
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // Not enough tokens for the next waiter.  We could keep going (to try to
            // find a waiter with a smaller request), but under load that could cause
            // starvation for large requests; instead, we leave all remaining waiters
            // blocked.
            //
            // Consider a semaphore used as a read-write lock, with N tokens, N
            // readers, and one writer.  Each reader can Acquire(1) to obtain a read
            // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
            // of the readers.  If we allow the readers to jump ahead in the queue,
            // the writer will starve — there is always one token available for every
            // reader.
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
    s.mu.Unlock()
}

相关文章

网友评论

      本文标题:golang sync/semaphore 源代码阅读--精细并

      本文链接:https://www.haomeiwen.com/subject/rimmlhtx.html