滑动时间算法
图示在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进
行划分,比如图中,我们就将滑动窗口划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右
滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的
counter就会加1。
计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。
由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确
- 简单代码举例
sentinel的滑动时间窗口
sentinel 统计源码
//主要是这个方法
node.addPassRequest(count);
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
这两个是一样的,拿rollingCounterInSecond.addPass(count);举例
@Override
public void addPass(int count) {
//获取当前的时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
//添加pass的值
wrap.value().addPass(count);
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//计算当前的时间是在哪一块区域里
int idx = calculateTimeIdx(timeMillis);
// 计算当前的时间开始值(时间区间数组的下标开始值)
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.从时间区间array里面取bucket
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
//如果该时间区间取不到,则用cas创建新的时间区间
* (2) Bucket is up-to-date, then just return the bucket.
//如果该时间区间正常准备更新,则返回
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
//如果该时间废弃了,则重置bucket并且清空所有bucket里面的内容
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket 为NULL则重新创建 , bucket is empty, so create new and update
* 如果旧bucket不存在,则重新创建
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* 然后cas来循环修改,只允许一个线程修改成功,其他线程都yield 掉
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
//新建一个windowrap(下面有图解)
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
生成的wrap包含PASS,Block等是否通过等字段
-
限流统计时的计算方法
@Override
public long pass() {
//首先要计算当前时间区间(该替换的替换,该新建的新建)
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
//然后再将每个通过数相加
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
网友评论