美文网首页
go流量控制

go流量控制

作者: 温岭夹糕 | 来源:发表于2024-08-01 22:28 被阅读0次

    目录

    链接地址

    1.导读-为什么需要限流

    因为资源是有限的(如服务器资源),当资源供不应求就会发现一系列问题(如请求延迟),因此我们通常会对资源增加访问的限制,以现实为例,如为防止十字路流量过大且保证安全性,引入红绿灯机制,到下班时间段,甚至特定路段会安排交警指挥,还有春运的交通限流、超市排队结算等等,可以说限流机制在日常生活中都是无处不在。那放到API概念上时,常见的限流应用场景有哪些?

    • 防止流量突发时,服务出现雪崩,如抢购或DDOS攻击
    • 用户SLA的分级,如针对免费用户和付费用户,提供不同的API QPPS额度
    • API市场的API商品,会通过限流来满足商品库存的调用限制

    2.限流指标

    限流有两个重要概念:阈值拒绝策略,下面是对常用阈值的介绍

    2.1TPS

    TPS(Transaction Per Second)即事务数/秒。与mysql的事务不同,这里指一个客端端向服务器发送请求做出响应的过程,客户端在发送请求开始计算到服务器响应后结束计算。
    在单机系统上一个请求完成一笔事务,但是在分布式系统中,一笔请求通常需要多个系统配合(一笔事务多个请求),有的服务需要异步返回,因此完成一笔事务花费时间可能很长,因此如果按照TPS进行限流,时间粒度很大,很难准确评估系统的响应性能。

    2.2HPS

    每秒请求数,指每秒钟服务端收到客户端的请求数量。单机系统一笔请求完成一笔事务,TPS和QPS相同,分布式不同,目前主流的限流方法多采用HPS作为限流指标

    2.3QPS

    服务端每秒能够响应的客户端查询请求数量,同理单机HPS和QPS相同

    2.4 性能压测工具wrk

    wrk

    3.限流方法

    即拒绝策略,限流方法根据限流范围分为单机限流分布式限流,下面以单机限流进行分析(分布式之后补充)

    3.1流量计数器(固定窗口限流)

    特点:

    • 将时间划分固定大小窗口,如每秒一个
    • 每个窗口记录请求数量
    • 请求到达+1
    • 请求超过阈值拒绝
    • 窗口结束重置请求

    最直观的解释,我直接限制每秒请求数量100,超过100就拒绝掉
    我这里用gin中间件实现一个最简单的流量计数器

    type ratelimitByCountBuilder struct {
        count int64
    }
    
    func newRateLimit(count int64) *ratelimitByCountBuilder {
        //todo 启动协程清空count
        builder := &ratelimitByCountBuilder{
            count: count,
        }
    
        go func() {
            ticker := time.NewTicker(time.Second)
            for {
                select {
                case <-ticker.C:
                    atomic.StoreInt64(&builder.count, count)
                }
            }
        }()
        return builder
    }
    
    func (builder *ratelimitByCountBuilder) Build() gin.HandlerFunc {
        return func(ctx *gin.Context) {
            if atomic.LoadInt64(&builder.count) <= 0 {
                fmt.Printf("rejection req!\n")
                ctx.Abort()
                return
            }
    
            atomic.AddInt64(&builder.count, -1)
        }
    }
    

    或者直接使用自带的

    gin.LimitRequests(100, time.Second)
    

    原理就是请求过来时先进行判断
    缺点是什么?

    • 单位时间很难把控,请求分布不均匀


      image.png

      从上面看hps超过100

    • 无法应对突发流量,窗口大小是固定的,不够灵活
    • 窗口结束时的请求重置可能导致请求的不公平性

    3.2滑动窗口

    实例代码
    特点:

    • 确定一个窗口大小和请求数阈值,如1秒
    • 每次请求到达请求数+1,如果请求数超过阈值则 拒绝
    • 随时间推移窗口滑动,移出过期请求,即检查最早到达的请求,计算与当前时间差,判断是否超过窗口大小
    • 窗口大小可以根据流量变化
    type limitBySlidingWindowBuilder struct {
        windowSize  time.Duration //窗口大小
        maxRequests int           //最大请求数量
        requests    []time.Time
        lock        sync.Mutex
    }
    
    func newBuilder(size time.Duration, max int) *limitBySlidingWindowBuilder {
        return &limitBySlidingWindowBuilder{
            windowSize:  size,
            maxRequests: max,
            requests:    make([]time.Time, 0, max),
        }
    }
    
    func (limit *limitBySlidingWindowBuilder) Build() gin.HandlerFunc {
        return func(ctx *gin.Context) {
            if !limit.allowRequest() {
                ctx.Abort()
                return
            }
        }
    }
    
    func (limit *limitBySlidingWindowBuilder) allowRequest() bool {
        currentTime := time.Now()
        limit.lock.Lock()
        defer limit.lock.Unlock()
    
        //清理过期请求
        if len(limit.requests) > 0 && currentTime.Sub(limit.requests[0]) > limit.windowSize {
            limit.requests = limit.requests[1:]
        }
    
        //检查请求数
        if len(limit.requests) >= limit.maxRequests {
            fmt.Println("reject")
            return false
        }
    
        limit.requests = append(limit.requests, currentTime)
    
        return true
    }
    
    // todo 自定义动态调整窗口大小
    func (limit *limitBySlidingWindowBuilder) update(t time.Duration) {
        limit.lock.Lock()
        defer limit.lock.Unlock()
        limit.windowSize = t
    
    }
    

    缺点:

    • 内存消耗:当随时间推移,窗口大小较大或请求频率高的情况下,内存消耗增加
    • 更多CPU计算开销


      image.png

    3.3漏桶

    demo
    是对滑动窗口的改进:

    • 确定一个固定的漏桶容量,表示存储最大请求数
    • 漏桶速率,表示每秒可以处理请求数
    • 当请求到达,讲请求放入漏桶
    • 漏桶以固定速率从漏桶中消费请求
    • 如果漏桶满了则丢弃或延迟处理 image.png
    type leakybucket struct {
        rate       float64 //漏桶速度 请求数/秒
        size       int
        water      int   //当前水量
        lastLeakMs int64 //上次漏水时间戳
        lock       sync.Locker
    }
    
    func newleak(rate float64, size int) *leakybucket {
        return &leakybucket{
            rate:       rate,
            size:       size,
            water:      0,
            lastLeakMs: time.Now().Unix(),
        }
    }
    
    func (limit *leakybucket) Build() gin.HandlerFunc {
        return func(ctx *gin.Context) {
            if !limit.allow() {
                ctx.Abort()
                return
            }
        }
    }
    
    func (limit *leakybucket) allow() bool {
        now := time.Now().Unix()
    
        limit.lock.Lock()
        defer limit.lock.Unlock()
    
        //之前漏出的水量
        leakAmount := int(float64(now-limit.lastLeakMs) / 1000 * limit.rate)
    
        if leakAmount > 0 {
            if leakAmount > limit.water {
                limit.water = 0
            } else {
                limit.water -= leakAmount
            }
        }
    
        //计算当前是否超过容量
        if limit.water > limit.size {
            limit.water--
            fmt.Println("reject")
            return false
        }
    
        limit.water++
    
        limit.lastLeakMs = now
        return true
    }
    

    3.4 令牌桶

    令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号


    image.png

    具体用法参考"golang.org/x/time/rate"

    4.分布式限流

    4.1基于中心化

    即通过一个中心化的限流器控制所有服务器的请求

    • 选择一个中心化组件,如redis
    • 定义限流规则,如设置每秒最大请求数,每个IP单位时间最多访问次数
    • 对于每个请求要先向redis请求令牌
    • 没拿到就被限流
      lua脚本如下
    local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')
    local last_time = ratelimit_info[1]
    local current_token = tonumber(ratelimit_info[2])
    local max_token = tonumber(ARGV[1])
    local token_rate = tonumber(ARGV[2])
    local current_time = tonumber(ARGV[3])
    if current_token == nil then
      current_token = max_token
      last_time = current_time
    else
      local past_time = current_time-last_time
      
      if past_time>1000 then
          current_token = current_token+token_rate
          last_time = current_time
      end
    
      ## 防止溢出
      if current_token>max_token then
        current_token = max_token
        last_time = current_time
      end
    end
    
    local result = 0
    if(current_token>0) then
      result = 1
      current_token = current_token-1
      last_time = current_time
    end
    redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
    return result
    
    

    相关文章

      网友评论

          本文标题:go流量控制

          本文链接:https://www.haomeiwen.com/subject/cuvohjtx.html