需求
编写的django项目需要调用外部接口,但是用户的服务比较脆弱,需要请求方限制qps。
- 漏桶算法
- 令牌桶算法
代码
令牌桶实现代码
from time import time, sleep
# 用于全局缓存和锁
from django.core.cache import cache
LOCK_PREFIX = __name__
class TokenBucket:
"""令牌桶
用于控制请求速度
- 令牌信息存在redis中,包含两个字段:
tokens: 令牌数量
last: 最后更新时间戳
"""
def __init__(self, bucket_key, rate=10):
self.bucket_key = bucket_key
if rate <= 0:
raise Exception('Rate must more than 0')
self.rate = rate
# cache和lock的key
self.lock_key = f'{LOCK_PREFIX}_{self.bucket_key}_bucket_lock'
self.bucket_cache_key = f'{LOCK_PREFIX}_{self.bucket_key}'
def get_token(self, amount=1):
bucket_lock = cache.lock(self.lock_key)
try:
bucket_lock.acquire()
now = time()
token = cache.get(self.bucket_cache_key, None)
if token is None:
token = {
'tokens': self.rate,
'last': now
}
elapsed = now - token.get('last', now)
if int(elapsed * self.rate):
temp = token.get('tokens') + int(elapsed * self.rate)
token['tokens'] = temp if temp < self.rate else self.rate
token['last'] = now
if token.get('tokens', 0) >= amount:
token['tokens'] -= amount
amount = 0
else:
amount -= token.get('tokens', 0)
token['tokens'] = 0
cache.set(self.bucket_cache_key, token)
return amount
finally:
bucket_lock.release()
return -1
def consume(self, amount=1):
while amount:
amount = self.get_token(amount)
sleep(1)
return
DEMO
# qps = 100
bucket = TokenBucket('test', rate=100)
bucket.consume(1)
网友评论