美文网首页Nginx
Nginx - API网关之请求限流功能实现与测试

Nginx - API网关之请求限流功能实现与测试

作者: 红薯爱帅 | 来源:发表于2024-02-01 11:09 被阅读0次

    1. 概述

    对于分布式的微服务系统,如果集群最大并发rps为10。且超过10之后,可能会导致系统异常。
    此时,需要有一个API网关(API Gateway),完成rps的限制。

    当然,APIGateway还有其他功能,通常如下:

    • 请求路由和转发 - API网关根据请求的URL,将请求转发到后端的相应微服务。它将前端的请求路由到适当的微服务。
    • 请求限流 - API网关可以实现请求数限制,避免后端微服务被大量请求淹没。常见的限流算法有计数器、漏桶、令牌桶等。
    • 鉴权 - 在请求转发到微服务前,API网关可以对请求进行鉴权,验证其合法性。
    • 服务聚合 - API网关可以将多个微服务上的API聚合成一个API进行响应,避免客户端直接与各个微服务交互的复杂性。
    • 负载均衡 - 当后端有多个微服务实例时,API网关可以实现负载均衡,将请求分发到不同的实例上。
    • 请求缓存 - API网关可以对一些不频繁变动的请求响应进行缓存,以提升响应速度。
    • 监控统计 - API网关可以对所有经过其的请求进行监控统计,用于分析系统状态。
    • 服务治理 - API网关还可以具备服务注册发现、健康检查、熔断等服务治理功能。
    • 安全防护 - API网关可以进行认证授权、防范DDoS、脚本攻击等安全防护。
      总之,API网关承担了请求的聚合、路由、过滤、监控等功能,可以隐藏内部微服务的细节,提供一个简单的访问接口。

    这里主要为了实现请求限流功能,以nginx为例,介绍一下如何配置与测试。
    测试脚本已安排到本文末尾,可以用于复现和验证。

    漏桶原理

    2. nginx limit req

    Nginx采用漏桶原理,实现了请求限流。
    为了搞清楚具体细节,下面主要测试3种情况。

    配置A:单独burst

    对于不采用burst的方式,如果超过rate的request,会直接丢失,给用户返回439,并不友好。
    采用单独burst的方法,虽然使得请求的流量变得“均匀平滑”,但是确实很大程度上增加了响应时间。排在队列越后面的请求的等待时间越长,这就导致了它们的响应时间平白无故地增加了许多,过长的响应时间甚至可能会导致客户端认为请求异常或者直接导致请求超时。

    limit_req_zone $binary_remote_addr zone=rps:10m rate=1r/s;
    
    location /login/ {
        limit_req zone=rps burst=20;
        proxy_pass http://my_upstream;
    }
    

    Test Case 1: burst值和timeout

    结论:timeout的request,也会一直占用burst缓冲。

    例如,如果rate=1r/m,发起第一个request,可以得到结果。
    再执行第二次request,30s之后将timeout;再执行第三次request,30s之后也将timeout。
    但是,第二次和第三次也占用了burst缓冲。
    只有在第四分钟时,执行第四次request,才能得到正常的结果。
    所以,burst的设置,需要谨慎,不可过大。在按照rate消耗burst缓冲的请求,不应超过timeout时长。
    常用配置可以是rate=10r/s,burst=20。因为,超过2秒,页面就会感觉到卡顿。至于更多的请求,直接返回439。

    $ http POST :8888/api/v1/images/search
    http: error: Request timed out (30s).
    
    $ http POST :8888/api/v1/images/search
    http: error: Request timed out (30s).
    
    $ http POST :8888/api/v1/images/search
    HTTP/1.1 200 OK
    Connection: keep-alive
    Content-Length: 2
    Content-Type: text/html; charset=utf-8
    Date: Thu, 01 Feb 2024 07:03:10 GMT
    Server: nginx/1.21.1
    
    ok
    
    $ http POST :8888/api/v1/images/search
    http: error: Request timed out (30s).
    

    Test Case 2: burst值与资源占用

    burst为50时,可以支持最少50个用户(TcpConnection)的并发请求。
    单请求的最大耗时为5s=50/10,即burst/rate
    nginx消耗内存50-60MB,CPU占用率0.5%,并不高。

    burst为50时,如果是30个用户(TcpConnection)的并发请求。
    单请求的最大耗时为3s=30/10,即burst实际值/rate
    nginx消耗仍然是100MB以内,CPU占用率0.5%,并不高。

    • 为什么是最少50
      50个线程一起发request,不会报错;如果51个线程同时发起request,第51个线程会439。
      但是,稳定运行时,单request耗时0.5s的话,55个线程发起request,不会存在439报错信息。
      而56线程的话,会有一个线程报错439。
      更深层的原因,是nginx按照收到request的时间开始计算。
      即,只要burst的一个request被递交给upstream,则空了一个位置,就可以接收一个新的request请求。
      至于发给upstream的请求,什么时候返回结果,只有upstream服务可以控制。
      当upstream返回了一个request的结果后,则可以从burst中再取出来一个,丢给upstream。如此往复,周而复始。

    配置B:burst + nodelay

    突发的请求会一次性发送给upstream。
    经过测试,确实如此。也就是说,集群有可能在某一个瞬间,rps超过rate值

    $ python test.py 
    Success:     0, Fail: 0
    Success:    25, Fail: 210  ---> 多次测试,25个Thread的情况下,会直接25
    Success:    36, Fail: 492
    Success:    46, Fail: 776
    Success:    56, Fail: 1056
    Success:    66, Fail: 1346
    

    配置C:burst + delay

    简单来说,所谓的分段限速就是允许客户端在刚开始的时候有一定的突发请求,后面再进入到平稳的限速中。
    至于delay,应该是配置A和配置B之间的一个中间方法。

    limit_req_zone $binary_remote_addr zone=ip:10m rate=5r/s;
    
    server {
        listen 80;
        location / {
            limit_req zone=ip burst=12 delay=8;
            proxy_pass http://website;
        }
    }
    

    3. 总结

    • nginx整体内存和burst没有明显关系,充足的话,给1GB以内,管够。
    • 采用配置A单burst的方式,对后端集群最友好,可以杜绝突发超过rate的request发送给upstream。
    • 采用静态字符串的方式,可以全局控制rate
    limit_req_zone 'global' zone=rps:10m rate=10r/s;
    

    4. 参考

    https://tinychen.com/20210616-nginx-10-triple-rate-limiting-limit-req/
    https://nginx.org/en/docs/http/ngx_http_limit_req_module.html
    https://juejin.cn/post/7297154281870082100

    5. 其他

    a. 阿里云lb过来的请求,remote_addr是否是是它自己,还是用户的真实IP?

    通过这个文档来看,应该没有修改remote-addr,只添加了一个X-Forwarded-For字段,记录用户真实IP。
    所以,lb->nginx,而nginx采用remote-addr的方式实现limit_req,问题不大。
    https://help.aliyun.com/zh/slb/classic-load-balancer/use-cases/preserve-client-ip-addresses-when-layer-7-listeners-are-used

    b. swarm的lb是什么原理?两个manager的话,会怎么样?

    Swarm内置了DNS服务,可以自动为每个Service生成DNS名称,使服务间可以通过DNS发现和访问。
    如果有多个Manager节点,Swarm会选举一个Leader节点,该节点主要负责语调度任务分配决策。其他Manager节点与Leader节点互为备份。
    当Leader节点不可用时,剩余Manager节点会重新选举产生新的Leader,从而实现高可用。并且各个Manager之间会互相同步所维护的集群状态信息。
    总之,Swarm利用overlay网络和内置DNS、LB实现服务发现和负载均衡,多个Manager节点互为备份来保证服务高可用性。
    为了避免大量无效请求直接冲击swarm集群,所以部署一个裸docker容器作为apigateway会更好。

    c. docker container

    docker run -d --name apigateway \
      --restart always \
      -v `pwd`/nginx.conf:/etc/nginx/nginx.conf \
      -p 8888:80 \
      --memory 500MB \
      --log-opt max-size=100m --log-opt max-file=5 \
      nginx:1.21.1
    

    6. 附件

    • flask web api
    from flask import Flask
    import time
    
    
    app = Flask(__name__)
    
    
    @app.route("/ping")
    def ping():
        return "ping"
    
    
    @app.route("/api/v1/images/search", methods=['POST'], strict_slashes=False)
    def search():
        time.sleep(0.5)
        return "ok"
    
    
    if __name__ == "__main__":
        app.run(host="0.0.0.0", port="9999")
    
    
    """
    $ http :9999/ping
    $ http head :9999/ping
    $ http post :9999/api/v1/images/search
    """
    
    • nginx.conf
    # For more information on configuration, see:
    #   * Official English Documentation: http://nginx.org/en/docs/
    #   * Official Russian Documentation: http://nginx.org/ru/docs/
    
    user nginx;
    worker_processes auto; #启动进程
    error_log /dev/stdout; #全局错误日志
    pid /run/nginx.pid; #PID文件
    
    # Load dynamic modules. See /usr/share/nginx/README.dynamic.
    include /usr/share/nginx/modules/*.conf;
    
    events {
        worker_connections 1024; #单个后台worker process进程的最大并发链接数 
    }
    
    http {
        # 设定mime类型
        include       /etc/nginx/mime.types;
        default_type  application/octet-stream;
        sendfile        on;
        
        # 设定日志格式
        log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                          '$status $body_bytes_sent $request_time[$upstream_response_time] "$http_referer" '
                          '"$http_user_agent" "$http_x_forwarded_for"';
        access_log  /dev/stdout  main;
    
        # limit_req_zone $binary_remote_addr zone=rps:10m rate=10r/s;
        limit_req_zone 'global' zone=rps:10m rate=10r/s;
        limit_req_status 439;
    
        server {
            listen 80 default_server;
    
            location = / {
                return 204;
            }
            
            location /api/v1 {
                # limit_req zone=rps;
                limit_req zone=rps burst=30;
                proxy_pass http://xxx:9999;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            }
    
            location / {
                proxy_pass http://xxx:9999;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            }
        }
    }
    
    • python压测脚本
    import threading
    import requests
    from queue import Queue, Empty
    import time
    
    
    HOST = "http://127.0.0.1:8888"
    USER_COUNT = 20
    
    resps = Queue()
    
    
    def fn(id: int):
        while True:
            resp = requests.post(f"{HOST}/api/v1/images/search")
            resps.put(dict(name=f"User-{id:2d}", status_code=resp.status_code))
            time.sleep(0.01)
    
    
    def print_report(report):
        print(f"Success: {report['success']:5d}, Fail: {report['fail']}")
    
    
    def statistics():
        users = dict()
        report = dict(success=0, fail=0)
        while True:
            try:
                msg = resps.get_nowait()
                is_ok = msg["status_code"] < 400
                key = "success" if is_ok else "fail"
                report[key] += 1
            except Empty:
                print_report(report)
                time.sleep(1)
    
    
    def main():
        threading.Thread(target=statistics).start()
        
        for id in range(USER_COUNT):
            threading.Thread(target=fn, args=(id,)).start()
    
    
    if __name__ == "__main__":
        main()
    
    • locust压力测试脚本
    from locust import HttpUser, task
    from datetime import datetime
    
    
    HOST = "http://127.0.0.1:8888"
    
    
    class LbsUser(HttpUser):
        host = HOST
    
        @task
        def head(self):
            self.client.head("/")
    
        @task
        def ping(self):
            self.client.get("/ping")
    
    
    class SearchUser(HttpUser):
        host = HOST
    
        @task
        def search(self):
            # resp = self.client.post("/login", {"username":"testuser", "password":"secret"})
            print(datetime.now(), end=",")
            resp = self.client.post("/api/v1/images/search")
            print("Response status code:", resp.status_code)
            
    
    """
    locust --web-host 127.0.0.1 --class-picker --modern-ui
    """
    
    

    相关文章

      网友评论

        本文标题:Nginx - API网关之请求限流功能实现与测试

        本文链接:https://www.haomeiwen.com/subject/ooxzodtx.html