简介
漏桶限流,漏桶流出的速度是恒定的,流入速度不定,桶满时则抛弃请求,一般可用于保护三方接口,这样保证不超出限制的qps。对比令牌桶的话,在突发流量上会丢弃请求数更少,漏桶会有个请求排队的过程,只有漏桶装满了才会抛弃请求。
漏桶限流可以用在需要请求排队的场景,比如微博热点,秒杀之类的场景,可以利用漏桶缓存请求,对比令牌桶可以减少请求的丢弃
漏桶限流简单实现方法
这种实现方式在aquire时,先按设置的漏出速率计算桶内剩余水量,桶满了则拒绝请求,通过水数加1,这样其实并没有控制请求通过的速率,比如空桶的情况下,请求会一直通过,和设置的固定漏出速率没必然关系。
public final class LeakyBucket {
// 桶的容量
private int capacity = 10;
// 木桶剩余的水滴的量(初始化的时候的空的桶)
private AtomicInteger water = new AtomicInteger(0);
// 水滴的流出的速率 每1000毫秒流出1滴
private int leakRate;
// 第一次请求之后,木桶在这个时间点开始漏水
private long leakTimeStamp;
public LeakyBucket(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
public LeakyBucket(int leakRate) {
this.leakRate = leakRate;
}
public boolean acquire() {
// 如果是空桶,就当前时间作为桶开是漏出的时间
if (water.get() == 0) {
leakTimeStamp = System.currentTimeMillis();
water.addAndGet(1);
return capacity != 0;
}
// 先执行漏水,计算剩余水量
int waterLeft = water.get() - ((int) ((System.currentTimeMillis() - leakTimeStamp) / 1000)) * leakRate;
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
leakTimeStamp = System.currentTimeMillis();
// 尝试加水,并且水还未满
if ((water.get()) < capacity) {
water.addAndGet(1);
return true;
} else {
// 水满,拒绝加水
return false;
}
}
}
sentinel中漏桶限流实现
实际使用中sentinel的限流有个CONTROL_BEHAVIOR_RATE_LIMITER模式,采用漏桶算法,用最大排队等待时间去限制桶的容量,排队等待的线程进行休眠一段时间的操作,控制请求频率。
public class RateLimiterController implements TrafficShapingController {
// 最大排队时间
private final int maxQueueingTimeMs;
// 每秒允许通过的qps
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.计算单个请求漏出的时间
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
// 计算的预期耗时小于当前时间,放行
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 计算需要等待时间
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 等待时间过长
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
// 线程休眠,控制请求下发频率
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
网友评论