滑动窗口简介
首先说下固定窗口统计,比如以分钟为单位,限制100qps,进行限流的话会存在一种情况,就是上一个分钟的末尾有98qps,当前分钟的开始有98qps,这样在短时间内有大量流量,但不会进行限流,会对系统有很大的压力。
针对这种问题,可以对整个统计区间划分为不同的小窗口,每次统计当前窗口之前的一个统计周期内的统计数据,这样就能较平滑地进行数据统计并进行流控,划分的小窗口数越多,整体统计越准确,但相应的占用的内存就会越大。
image.png
sentinel中滑动窗口数据结构解析
sentinel中滑动窗口的核心数据结构是LeapArray类。
LeapArray类构造方法
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
// 单个window长度
this.windowLengthInMs = intervalInMs / sampleCount;
// 统计周期长度
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
// 取样窗口数
this.sampleCount = sampleCount;
// 滑动窗口数组
this.array = new AtomicReferenceArray<>(sampleCount);
}
其中,array是存放每个小窗口统计数据的数组,AtomicReferenceArray是一个支持原子操作的数组,WindowWrap是窗口的实现,里面的泛型是具体统计的内容。
WindowWrap的实现
public class WindowWrap<T> {
/**
* Time length of a single window bucket in milliseconds.
*/
private final long windowLengthInMs;
/**
* Start timestamp of the window in milliseconds.
*/
private long windowStart;
/**
* Statistic data.
*/
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
/**
* Reset start timestamp of current bucket to provided time.
*
* @param startTime valid start timestamp
* @return bucket after reset
*/
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
/**
* Check whether given timestamp is in current bucket.
*
* @param timeMillis valid timestamp in ms
* @return true if the given time is in current bucket, otherwise false
* @since 1.5.0
*/
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
}
这个具体的window元素类,其中主要有windowStart窗口开始时间,value一个泛型的统计数据,可以灵活定义。
获取当前窗口
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里面会先计算对应窗口数组的下标和窗口的开始时间。
// 计算窗口数组下标,取商,然后按array.length()取余。
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
// 计算窗口的开始时间, 把时间戳减去余数
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
然后看原有的数组位置是否存在WindowWrap。
- 如果没有,直接通过cas操作设置一个新的windowWrap。
- 存在的情况下,和新计算的windowStart一致,可以直接用
- 存在的情况下,之前window的时间小于计算的windowStart,则加锁重置,这里面的resetWindowTo 归具体的实现类实现,一般是windowWrap设置为当前windowStart,然后对其中的数值进行reset。
获取有效的窗口列表
/**
* Get aggregated value list for entire sliding window.
* The list will only contain value from "valid" buckets.
*
* @return aggregated value list for entire sliding window
*/
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
/**
* Check if a bucket is deprecated, which means that the bucket
* has been behind for at least an entire window time span.
*
* @param windowWrap a non-null bucket
* @return true if the bucket is deprecated; otherwise false
*/
public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
}
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// intervalInMs 是整体的统计周期,大于这个值window认为是失效的
return time - windowWrap.windowStart() > intervalInMs;
}
小结
滑动窗口实现类LeapArray作为整体的壳,实现了滑动窗口中窗口定位,窗口创建,是否过期判断等功能。具体的统计数据放到windowWrap中的泛型中,想存什么都可以。
限流熔断具体统计类解析
限流类统计
限流判断所用统计参数基本都是来自statisticSlot统计的数据,这其中qps相关的计数依赖于StatisticNode类,下面看下统计数据的变量。
/**
* Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
其中的metric为ArrayMetric类型的变量,里面的传参为统计时间区间和分的窗口数。
public class ArrayMetric implements Metric {
// 持有一个LeapArray对象,里面的泛型为MetricBucket
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
}
ArrayMetric中具体进行qps、rt等数据统计的方法如下,基本是先定位到当前窗口,再进行add操作。
@Override
public void addSuccess(int count) {
// 定位到当前窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 进行add计算
wrap.value().addSuccess(count);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
@Override
public void addRT(long rt) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addRT(rt);
}
这里面可以看到泛型的实现是MetricBucket,看下其具体实现,里面counters作为计数器数组,里面按MetricEvent的序号作为下标。
public class MetricBucket {
// 计数器,按MetricEvent这个enum的序号当下标
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
public void addSuccess(int n) {
add(MetricEvent.SUCCESS, n);
}
public void addRT(long rt) {
add(MetricEvent.RT, rt);
// Not thread-safe, but it's okay.
if (rt < minRt) {
minRt = rt;
}
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
// 按下标增加n
counters[event.ordinal()].add(n);
return this;
}
}
熔断类统计
按错误熔断
错误计数器使用SimpleErrorCounter类,里面有两个变量errorCount和totalCount进行统计
private final LeapArray<SimpleErrorCounter> stat;
static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;
public SimpleErrorCounter() {
this.errorCount = new LongAdder();
this.totalCount = new LongAdder();
}
public LongAdder getErrorCount() {
return errorCount;
}
public LongAdder getTotalCount() {
return totalCount;
}
public SimpleErrorCounter reset() {
errorCount.reset();
totalCount.reset();
return this;
}
}
static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public SimpleErrorCounter newEmptyBucket(long timeMillis) {
return new SimpleErrorCounter();
}
@Override
protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
按慢请求数统计
按慢请求数统计,SlowRequestCounter类作为泛型类,实现和SimpleErrorCounter类似,这里就不贴了。
滑动窗口自实现
利用数组,循环使用,当作滑动窗口数组,不是线程安全的,简单实现方法。数组保留窗口size两倍的长度,每次清掉后面的一倍窗口size的数组元素。
public class SlidingWindow {
/**
* 循环队列,就是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicInteger[] timeSlices;
/**
* 队列的总长度
*/
private int timeSliceSize;
/**
* 每个时间片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时间片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完整窗口期内允许通过的最大阈值
*/
private int threshold;
/**
* 该滑窗的起始创建时间,也就是第一个数据
*/
private long beginTimestamp;
/**
* 最后一个数据的时间戳
*/
private long lastAddTimestamp;
public static void main(String[] args) {
//1秒一个时间片,窗口共5个
SlidingWindow window = new SlidingWindow(300, 3, 8);
AtomicInteger num = new AtomicInteger(0);
for (int i = 0; i < 100; i++) {
System.out.println("限流运行结果:" + window.addCount(1));
window.print();
System.out.println("");
System.out.println(Thread.currentThread().getName() + "--------------------------");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public SlidingWindow(int duration, int threshold) {
//超过10分钟的按10分钟
if (duration > 600) {
duration = 600;
}
//要求5秒内探测出来的,
if (duration <= 5) {
this.windowSize = 5;
this.timeMillisPerSlice = duration * 200;
} else {
this.windowSize = 10;
this.timeMillisPerSlice = duration * 100;
}
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
this.timeMillisPerSlice = timeMillisPerSlice;
this.windowSize = windowSize;
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
/**
* 初始化
*/
private void reset() {
beginTimestamp = System.currentTimeMillis();
//窗口个数
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private void print() {
System.out.println("当前各窗口值:");
for (AtomicInteger integer : timeSlices) {
System.out.print(integer + "-");
}
}
/**
* 计算当前所在的时间片的位置
*/
private int locationIndex() {
long now = System.currentTimeMillis();
//如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
}
/**
* 增加count个数量
*/
public boolean addCount(int count) {
//当前自己所在的位置,是哪个小时间窗
int index = locationIndex();
// System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//譬如1秒分4个窗口,那么数组共计8个窗口
//当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
clearFromIndex(index);
int sum = 0;
// 在当前时间片里继续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时间片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
System.out.println("当前下标:" + index + "当前总和:" + sum);
lastAddTimestamp = System.currentTimeMillis();
return sum >= threshold;
}
private void clearFromIndex(int index) {
for (int i = 1; i <= windowSize; i++) {
int j = index + i;
if (j >= windowSize * 2) {
j -= windowSize * 2;
}
timeSlices[j].set(0);
}
}
}
网友评论