sync/semaphore用途
你的同事和你说要限制服务的并发次数,但是每个API要求的算力还不一样,如果一样还挺好做的,make一个chan,请求来的时候写一个,请求走的时候读取这个chan。我天,算力不一个的API在一个服务里面这怎么限制?
怎么做,如果用上sync/semaphore就行。
一个快速demo(如果不理解,可以下方留言)
下面的代码模拟了一个API服务同时有访问mysql和cache的接口,其中cache接口效率是mysql的两倍。
mysql假设要1s才能处理完,那cache只需500ms。ok问题来,如果我们的机器算力是10,只能处理纯粹的5个mysql请求,或者10个cache请求,怎么让请求,在我们的算力要求内 可以同时处理mysql和cache呢?这时我们的主脚登场,semaphore
,来点掌声。
使用流程:
- 声明总权重数:
s := semaphore.NewWeighted(10)
,申请的总权重数是10 - 申请你的权重,不同的业务使用不同的权重,比如这里的mysql使用了2,cached使用了1.:
b := c.TryAcquire(c.weighted)
,从总权重数减去你申请的 - 运行一些业务,这里使用sleep模拟真实业务运行时间:
time.Sleep(time.Second)
- 释放你的权重,有借有还,再借不难:
c.Release(c.weighted)
,恢复权重数至你申请之前
这里核心思想已经表达,通过计数平衡不同算力的请求,已经分配出去的权重,不还回之前c.TryAcquire
调用失败,明白这点,我们后面看下代码细节
package main
import (
"fmt"
"golang.org/x/sync/semaphore"
"sync"
"sync/atomic"
"time"
)
type Mysql struct {
weighted int64
*semaphore.Weighted
}
type Cache struct {
weighted int64
*semaphore.Weighted
}
func main() {
var wg sync.WaitGroup
s := semaphore.NewWeighted(10)
m := Mysql{weighted: 2, Weighted: s} //mysql比较慢,服务只能并发5次
c := Cache{weighted: 1, Weighted: s} //cache比较块,服务只能并发10次
wg.Add(16)
// 模拟并发请求
var (
successMysql int32 = 8
successCache int32 = 8
)
defer func() {
fmt.Printf("success mysql:%d, sucess cached:%d: total weighted:%d\n", successMysql, successCache, successMysql*2+successCache)
}()
defer wg.Wait()
for i := 0; i < 8; i++ {
go func() {
defer wg.Done()
b := c.TryAcquire(c.weighted)
if !b {
atomic.AddInt32(&successCache, -1)
fmt.Printf("cache acquired fail:%d\n", c.weighted)
return
}
time.Sleep(time.Second)
c.Release(c.weighted)
}()
}
// 模拟并发请求
for i := 0; i < 8; i++ {
go func() {
defer wg.Done()
b := m.TryAcquire(m.weighted)
if !b {
atomic.AddInt32(&successMysql, -1)
fmt.Printf("mysql acquired fail:%d\n", m.weighted)
return
}
time.Sleep(time.Second)
m.Release(m.weighted)
}()
}
}
深入数据结构Weighted+waiter
n int64
ready chan<- struct{}
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}
-
size int64
初始化分配的总的权重数 -
ucr int64
当前留存权重数 -
my sync.Mutex
锁,可以安全的修改list.List内容 -
waiters list.List
链表,存放waiter结构
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
Acquire接口--申请权重
- 第一次申请权重,如果余额充足直接返回。这里的
s.waiters.Len()
是等待队列长度,就跟买高铁票一样,有票直接买,没票可以预订。
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
- 永远不会成功的情况,当前申请的权重大于初始分配的总权重
把调用端挂起来,让他知道自己错了
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
- 放到等待队列,
- list.List是go提供的双向链表,把需要等待的元素
s.waiters.PushBack(w)
加到尾部,这里可能有人要问了,一般不都是用slice或者map存储数据的嘛,这里为啥要用双向链表。原因slice是连续空间,不适合做中间删除的操作,代价太高,把被删除前面和被删除后面的数据copy生成一个新的slice,这里面的内存分配都好几次,太浪费。相反list.List就没有这个问题,只是重新赋值4个指针。
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
- 等待ctx被cancel或者有权重被回收ready
这里面有意思的地方是,当context被cancel时,这时有可用权重,会把cancel错误忽略掉
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
- Acquire接口全部源代码,分析可看上面
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
Release接口,回收权重
- 有效性检查,如果调用方用错,直接panic
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
- 检查库存权重,如果够的话,这时就释放waiter队列里面的一些元素,这时一些阻塞的
Acquire
会得到解放
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
- 该接口完整代码,分析可看上面
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
s.mu.Unlock()
}
网友评论