上一篇文章我们介绍了全局限流,当您希望严格对API的请求总速率进行限制,并且不关心请求来自何处时,使用全局限流器可能很有用。但通常更常见的是为每个客户端单独设置一个限流器,这样可以防止单个客户端发出太多请求,影响其他客户端。
一种简单的实现方法是创建一个map来为每个客户端创建一个限流器映射,使用每个客户端的IP地址作为map的键。当一个新客户端向API发出请求时,我们将初始化一个新的限流器并将其添加到map中。对于任何后续请求,我们将从map中检索客户端的限流器,并通过调用其Allow()方法检查请求是否允许,就像之前所做的那样。
因为可能会有多个goroutine并发地访问map,所以我们需要使用互斥锁来防止竞争条件来保护对map的访问。如果您正在跟随本文操作,那么一起开始编码并更新上一篇文章中的rateLimit()中间件来实现这一点。
File: cmd/api/middleware.go
package main
...
func(app *application)rateLimit(next http.Handler) http.Handler{
//申明一个mutex和map存放客户端IP和限流器实例
var (
mu sync.Mutex
clients = make(map[string]*rate.Limiter)
)
return http.HandlerFunc(func(w http.ResponseWrite, r *http.Request){
//从请求中提取出客户端IP地址
ip, _, err := net.SplitHostPort(r.RremoteAddr)
if err != nil {
app.serverErrorResponse(w, err)
return
}
//上锁放在并发写入map
mu.Lock()
//检查map中IP是否存在,不存在就创建限流器
if _, found := clients[ip]; !found{
clients[ip] = rate.NewLimiter(2, 4)
}
//调用Allow()方法,查看请求是否允许
if !clients[ip].Allow(){
mu.Unlock()
app.rateLimitExceededResponse(w, r)
return
}
//即使请求允许也要释放锁
mu.Unlock()
next.ServerHTTP(w, r)
})
}
删除Map中旧限流器
上面的代码可正常工作,但是有一个小问题,clients这个map会无限的增加键值对,添加的每一个新的IP地址和限流器都会占用越来越多的资源。
为了防止这种情况,我们更新下代码,记录每个客户端的最后一次访问API时间。然后运行一个后台goroutine,定期从映射中删除最近没有访问服务的客户端。为了做到这一点,我们需要创建一个自定义的客户端结构体,它包含每个客户端的限流器和最后一次访问服务时间,并在初始化中间件时启动后台清理程序。
File: cmd/api/middleware.go
package main
...
func (app *application) rateLimit(next http.Handler) http.Handler {
//定义client结构体,包含限流器和最后一次访问API服务的时间
type client struct {
limiter *rate.Limiter
lastSeen time.Time
}
var (
mu sync.Mutex
//更新map值为client结构体指针
clients = make(map[string]*client)
)
//启动goroutine清除长时间没有访问服务的值
go func() {
for {
time.Sleep(time.Minute)
//更新map需要上锁
mu.Lock()
//循环遍历所有客户端,如果3分钟没有再访问服务,就清除
for ip, client := range clients {
if time.Since(client.lastSeen) > 3*time.Minute {
delete(clients, ip)
}
}
mu.Unlock()
}
}()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if app.config.limiter.enable {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
app.serverErrorResponse(w, r, err)
return
}
mu.Lock()
if _, found := clients[ip]; !found {
clients[ip] = &client{
//如果map中不存在的IP键,就创建新的client结构体
limiter: rate.NewLimiter(2, 4)}
}
if !clients[ip].limiter.Allow() {
mu.Unlock()
app.rateLimitExceededResponse(w, r)
return
}
mu.Unlock()
}
next.ServeHTTP(w, r)
})
}
此时,如果您重新启动服务并再次尝试快速连续地发出一批请求,应该会发现,从单个客户端的角度来看,限流器能正确工作,还是像以前一样返回。
$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"error": "rate limit exceeded"
}
{
"error": "rate limit exceeded"
}
附加内容
分布式应用
使用此模式进行限流只在API应用程序运行在单机上时有效。如果您的基础设施是分布式的,并且您的应用程序运行在负载均衡器后面的多个服务器上,那么您将需要使用另一种方法。
如果你使用HAProxy或Nginx作为负载均衡器或反向代理,这两者都有内置的限流功能,使用其自带限流器是明智的。或者,你可以使用像Redis这样的缓存来维护客户端的请求计数,运行在一个所有应用服务器都可以访问的服务器上。
限流器配置
目前,限流器的每秒请求数和突发值被硬编码到rateLimit()中间件中。这当然是可以的,但是如果它们在运行时可配置的话,将会更加灵活。同样,如果有一种简单的方法可以完全关闭限流器会非常有用(当您想要运行基准测试或执行负载测试时,当所有请求可能来自少量IP地址时,这将非常有用)。
为了使这些参数是可配置的,我们回到cmd/api/main.go文件,更新配置结构和命令行参数如下所示:
File:cmd/api/main.go
package main
...
// 定义一个配置结构体来保存应用程序的所有配置设置.
//目前,配置是服务器监听的端口和应用程序的环境名称 (开发, 预发, 生成等等)
//将从命令行参数中读取这些配制信息
type config struct {
port int
env string
db struct {
dsn string
maxOpenConns int
maxIdleConns int
maxIdleTime string
}
limiter struct {
rps float64
burst int
enable bool
}
}
// 修改logger字段,用*jsonlog.Logger代替
type application struct {
config config
logger *jsonlog.Logger
models data.Models
}
func main() {
// 声明一个配置结构体实例
var cfg config
// 从命令行参数中将port和env读取到配制结构体实例当中。
//默认端口使用4000以及环境信息使用开发环境development
flag.IntVar(&cfg.port, "port", 4000, "API server port")
flag.StringVar(&cfg.env, "env", "development", "Environment (development|staging|production)")
flag.StringVar(&cfg.db.dsn, "db-dsn", "postgres://greenlight:pa55word@localhost/greenlight?sslmode=disable", "PostgreSQL DSN")
flag.IntVar(&cfg.db.maxOpenConns, "db-max-open-conns", 25, "PostgreSQL max open connections")
flag.IntVar(&cfg.db.maxIdleConns, "db-max-idle-conns", 25, "PostgreSQL max idle connections")
flag.StringVar(&cfg.db.maxIdleTime, "db-max-idle-time", "15m", "PostgreSQL max connection idle time")
//限流参数
flag.Float64Var(&cfg.limiter.rps, "limiter-rps", 2, "Rate limiter maximum request per secod")
flag.IntVar(&cfg.limiter.burst, "limiter-burst", 4, "Rate limiter maximum burst")
flag.BoolVar(&cfg.limiter.enable, "limiter-enable", true, "enable")
flag.Parse()
然后更新rateLimiter()中间件使用这些配置参数:
package main
...
func (app *application) rateLimit(next http.Handler) http.Handler {
...
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
//只有配置打开限流功能才起作用
if app.config.limiter.enable {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
app.serverErrorResponse(w, r, err)
return
}
mu.Lock()
if _, found := clients[ip]; !found {
//从命令行参数读取限流器配置
clients[ip] = &client{
limiter: rate.NewLimiter(rate.Limit(app.config.limiter.rps), app.config.limiter.burst),
}
}
if !clients[ip].limiter.Allow() {
mu.Unlock()
app.rateLimitExceededResponse(w, r)
return
}
mu.Unlock()
}
next.ServeHTTP(w, r)
})
}
一旦完成以上代码,让我们通过运行带有-limiter-burst参数的API服务并将burst值减少为2来尝试一下:
$ go run . -limiter-burst=2
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:40:57Z","message":"starting server","properties":{"addr":":4000","env":"development"}}
如果您再次快速连续地发出一批6个请求,您现在应该会发现只有前两个成功:
$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"error": "rate limit exceeded"
}
{
"error": "rate limit exceeded"
}
{
"error": "rate limit exceeded"
}
{
"error": "rate limit exceeded"
}
类似地,你可以尝试使用limiter-enabled=false来禁用限流器,如下所示:
$ go run . -limiter-enable=false
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"database connection pool established"}
{"level":"INFO","time":"2021-12-18T10:43:41Z","message":"starting server","properties":{"addr":":4000","env":"development"}}
你会发现现在所有的请求都成功完成了,不管你发了多少请求。
$ for i in {1..6}; do curl http://localhost:4000/v1/healthcheck; done
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
{
"status": "available",
"system_info": {
"environment": "development",
"version": "1.0.0"
}
}
网友评论