Sentinel基础介绍见 微服务流量控制组件Sentinel
10.1 滑动窗口算法
滑动窗口算法是将时间周期分为N个小周期(窗口),分别记录每个小周期内访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口。最终只需要统计滑动窗口范围内的所有小时间窗口总的计数即可,入下图所示
img将一分钟拆分为4个小时间窗口,每个小时间窗口最多能够处理25个请求。并且通过虚线框表示滑动窗口的大小(当前窗口的大小是2,也就是在这个窗口内最多能够处理50个请求)。同时滑动窗口会随着时间往前移动,比如前面15s结束之后,窗口会滑动到15s~45s这个范围,然后在新的窗口中重新统计数据。
由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。此算法可以很好的解决固定窗口算法的临界问题。
Sentinel底层在做统计QPS做快速失败时用的也是滑动时间窗算法。
10.1.1 链表实现:
假设某个服务最多只能每秒钟处理100个请求,可以设置一个1秒钟的滑动时间窗口,用LinkedList表示,该窗口分为10个格子; 每个格子100毫秒,每100毫秒移动一次,每次移动都需要记录当前服务100ms内请求的次数counter到格子中; 如果格子数大于10个,删除最前边的各自,格子数始终保留10个;
循环整个队列,如果累加counter大于限流请求数,则被限流,否则不会被限流;
mport java.util.LinkedList;
import java.util.stream.Collectors;
public class SlidingWindow {
//使用LinkedList来记录滑动窗口的10个格子。
LinkedList<Metric> slots = new LinkedList<Metric>();
class Metric {
//起始时间
long startTm ;
//服务访问次数,可以放在Redis中,实现分布式系统的访问计数
long counter = 0L;
}
private void doCheck() {
//当前100ms内请数
Metric metric = new Metric();
long currentTime = System.currentTimeMillis();
//当前时间 大于 上一窗口起始时间+ 时间间隔100ms,说明要开始新的窗口了
if(slots.getLast().getStartTm()+100 <= System.currentTimeMillis()){
//假如上一次窗口起始时间为1500,当前时间为 1788,则说明【1600-1700】这个时间段没有请求,
// 那么我们需要补全空数据
if((currentTime - slots.getLast().getStartTm())/100 > 1){
LongStream.range(0,(currentTime - slots.getLast().getStartTm())/100 -1).forEach(item ->{
Metric metric2 = new Metric();
metric2.setCounter(0);
metric2.setStartTm(slots.getLast().getStartTm()+100);
slots.addLast(metric2);
});
}else {
//创建一个新窗口
metric.setCounter(1);
//取整
metric.setStartTm((slots.getLast().getStartTm() / 100) * 100);
slots.addLast(metric);
}
}else {
//旧的窗口,需要请求重置
metric.setCounter(slots.getLast().getCounter()+1);
slots.add(slots.size()-1,metric);
}
//如果linkedList长度大于10个,则删除最前面的一个,体现滑动时间窗
if (slots.size() > 10) {
slots.removeFirst();
}
//累加最近10个窗口内的访问次数
Long countTotal = slots.stream().collect(Collectors.summingLong(s ->s.getCounter()));
//比较最后一个节点和第一个节点,两者相差100以上就限流
if (countTotal > 100) {
System.out.println("限流了。。");
//TODO 修改限流标记为true
} else {
//TODO 修改限流标记为false
}
}
}
这个简化了很多逻辑,而且性能也不太好,但是理解滑动窗口算法是没问题的。
10.1.2 Sentinel实现:
Sentinel中是通过环形数组实现的,环形数组可以内存复用降低GC,性能高(高性能队列Disruptor中也有用到),如下图。
image.pngSentinel在做QPS统计时,滑动时间窗有两个维度
-
毫秒级维度:初始化一个跨度为1000ms,包含两个500ms的时间窗口
-
秒级维度:还有一个跨度为60s的,包含60个1s的时间窗口
在毫秒级维度中,仅仅使用两个时间窗口就完成了QPS的计算,并没有做删除时间窗口节点的操作,而是清空原本节点的内容。两个时间窗口代表数组的两个下标。
通过 (当前时间 / 500ms) % 数组长度2的取模结果,得到当前时间的请求数落在那个时间窗口内;
然后拿 (当前时间 / 500ms) * 500ms比较时间窗口的起始位置,
如果与之前 (比如:500ms) 一致,就把当次请求数加入到该窗口内用作统计
如果与之前不一致,就清空之前时间窗口内统计的数据,并放入当前时间的请求数,完成滑动的操作
核心源码如下:
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// 计算当前时间在数组中的位置
return (int) (timeId % array.length());
}
//计算当前时间 在 窗口内的 起始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
//获取当前窗口
public WindowWrap<T> currentWindow(long timeMillis) {
//当前时间如果小于0,返回空
if (timeMillis < 0) {
return null;
}
//计算时间窗口的索引
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//在窗口数组中获得窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 比如当前时间是888,根据计算得到的数组窗口位置是个空,所以直接创建一个新窗口就好了
*/
WindowWrap<T> window = new WindowWrap<T>(
windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to
// wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* 这个更好了,刚好等于,直接返回就行
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* |_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* 这个要当成圆形理解就好了,之前如果是1200一个完整的圆形,然后继续从1200开始,如果现在时间是1676,落在在B2的位置,
* 窗口开始时间是1600,获取到的old时间其实会是600,所以肯定是过期了,直接重置窗口就可以了
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the
// bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这个不太可能出现,嗯。。时钟回拨
return new WindowWrap<T>(
windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
//统计时间窗口总的请求数量
public List<T> values(long timeMillis) {
// 把当前时间窗口中的bucket WindowWrap 都返回出去 用来统计时间窗口总的请求数量
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
//校验时间窗口是否过期,相当于上一个例子中的补全空数据,这里更优雅
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
10.2 令牌桶算法
image.png简化demo:
/**
* 令牌桶限流算法
*/
public class TokenBucket {
public long timeStamp = System.currentTimeMillis(); // 当前时间
public long capacity; // 桶的容量
public long rate; // 令牌放入速度
public long tokens; // 当前令牌数量
public boolean grant() {
long now = System.currentTimeMillis();
// 先添加令牌
tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
} else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}
限流过程如下:
-
刚开始,系统的 QPS 非常低,初始化我们就直接把令牌桶塞满了;
-
然后这个低 QPS 的状态持续了一段时间,因为我们一直会填充最大 QPS 数量的令牌(因为取最小值,所以其实桶里令牌基本不会有变化),所以令牌桶一直处于满的状态,整个系统的限流也处于一个比较低的水平。这以上的部分一直处于警戒线之上。实际上就是叫做冷启动 / 预热的过程;
-
接着系统的 QPS 突然激增,令牌消耗速度太快。就算我们每次增加最大 QPS 数量的令牌任然无法维持消耗,所以桶里的令牌在不断低减少。这个时候,冷启动阶段的限制 QPS 也在不断地提高,最后直到桶里的令牌低于警戒线;
-
低于警戒线之后,系统就会按照最高 QPS 去限流,这个过程就是系统在逐渐达到最高限流的过程。那这样一来,实际就达到了我们处理突增流量的目的,整个系统在漫漫地适应突然飙高的 QPS,然后最终达到系统的 QPS 阈值;
-
最后,如果 QPS 回复正常,那么又会逐渐回到警戒线之上,就回到了最开始的过程。
管理端配置如下(流控效果选 Warm Up):
image.png公式推导如下:
img-
横坐标storedPermits代表存储桶中的令牌数量;
-
纵坐标代表获取一个令牌需要的时间,即请求通过的时间间隔;
stableInterval(稳定区间):稳定产生令牌的时间间隔,假定限流阈值QPS为1000,stableInterval的值为1毫秒。
coldInterval:冷启动产生令牌的最大时间隔间,等于稳定产生令牌的时间间隔乘以冷启动系数(stableInterval * coldFactor)
,Sentinel中coldFactor默认为3。
warmupPeriod:预热时间,即冷启动周期,对应上图中的梯形面积,Sentinel中默认为10秒。
thresholdPermits:从冷启动到正常的令牌桶中令牌数量的阈值,当令牌桶中的令牌数量超越该值时,则进入冷启动阶段。
因为coldFactor默以为3,所以(coldInterval - stableInterval)是stableInterval的两倍,所以从thresholdPermits到0的时刻是从maxPermits到thresholdPermits时刻的一半,也便是冷启动周期的一半。因为梯形的面积等于warmupPeriod,所以长方形面积是梯形面积的一半,长方形的面积是warmupPeriod / 2。
依据长方形面积公式:长 * 宽 = 面积
count = 1000; 限流阈值
stableInterval=1/count= 0.001
coldInterval = 0.003s
warmupPeriod = 10s 预热时间 10s
可得:
thresholdPermits=0.5*warmupPeriod/stableInterval= 0.5 * 10 / 1ms = 5000
maxPermits:最大答应桶中寄存的令牌数。
依据梯形的面积公式:(上低 + 下低)* 高 / 2
可得:
warmupPeriod=(stableInterval+coldInterval)*(maxPermits-thresholdPermits)/2
推出:
maxPermits=thresholdPermits+2*warmupPeriod/(stableInterval+coldInterval)
= 5000 + 2* 10 /(1ms + 3ms)
= 5000 + 20/0.004 = 5000 + 5000 = 10000
slope:直线的斜率,即出产令牌的速率。
依据斜率核算公式:(y2-y1) / (x2-x1),可得:
slope=(coldInterval-stableInterval)/(maxPermits-thresholdPermits)
= 0.002/5000
10.3 漏桶算法
漏桶算法是将访问请求放入漏桶中,当请求达到限流值,则进行丢弃(触发限流策略)。无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。超过漏桶容量的直接抛弃。
管理端配置:匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。阈值必须设置为QPS。
这种方式主要用于处理间隔性突发的流量,例如消息队列。想象一下这样的场景,在某一秒有大量的请求到来,而接下来的几秒则处于空闲状态,我们希望系统能够在接下来的空闲期间逐渐处理这些请求,而不是在第一秒直接拒绝多余的请求。
image.png首先计算出当前请求平摊到 1 秒内的时间花费,然后去计算这一次请求预计时间; 如果小于当前时间的话,那么以当前时间为主,返回即可; 反之如果超过当前时间的话,这时候就要进行排队等待了。等待的时候要判断是否超过当前最大的等待时间,超过就直接丢弃; 没有超过就更新上一次的通过时间,然后再比较一次是否超时。如果还超时就重置时间,反之在等待时间范围之内的话就等待。如果都不是,那就可以通过了。
public class RateLimiterController implements TrafficShapingController {
//最大等待超时时间,默认500ms
private final int maxQueueingTimeMs;
//限流数量
private final double count;
//上一次的通过时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
@Override public boolean canPass(
Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow
// in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
//时间平摊到1s内的花费
long costTime = Math.round(
1.0 * (acquireCount) / count * 1000); // 1 / 100 * 1000 = 10ms
//计算这一次请求预计的时间
long expectedTime = costTime + latestPassedTime.get();
//花费时间小于当前时间,pass,最后通过时间 = 当前时间
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
}
else {
//预计通过的时间超过当前时间,要进行排队等待,重新获取一下,避免出现问题,差额就是需要等待的时间
long waitTime = costTime + latestPassedTime.get()
- TimeUtil.currentTimeMillis();
//等待时间超过最大等待时间,丢弃
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
//反之,可以更新最后一次通过时间了
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
//更新后再判断,还是超过最大超时时间,那么就丢弃,时间重置
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
//在时间范围之内的话,就等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
网友评论