美文网首页
sentinel滑动窗口统计数据实现

sentinel滑动窗口统计数据实现

作者: justonemoretry | 来源:发表于2023-02-28 11:48 被阅读0次

    滑动窗口简介

    首先说下固定窗口统计,比如以分钟为单位,限制100qps,进行限流的话会存在一种情况,就是上一个分钟的末尾有98qps,当前分钟的开始有98qps,这样在短时间内有大量流量,但不会进行限流,会对系统有很大的压力。
    针对这种问题,可以对整个统计区间划分为不同的小窗口,每次统计当前窗口之前的一个统计周期内的统计数据,这样就能较平滑地进行数据统计并进行流控,划分的小窗口数越多,整体统计越准确,但相应的占用的内存就会越大。


    image.png

    sentinel中滑动窗口数据结构解析

    sentinel中滑动窗口的核心数据结构是LeapArray类。
    LeapArray类构造方法

       protected final AtomicReferenceArray<WindowWrap<T>> array;
    
        /**
         * The conditional (predicate) update lock is used only when current bucket is deprecated.
         */
        private final ReentrantLock updateLock = new ReentrantLock();
    
        /**
         * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
         *
         * @param sampleCount  bucket count of the sliding window
         * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
         */
        public LeapArray(int sampleCount, int intervalInMs) {
            AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
            AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
            AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
            // 单个window长度
            this.windowLengthInMs = intervalInMs / sampleCount;
            // 统计周期长度
            this.intervalInMs = intervalInMs;
            this.intervalInSecond = intervalInMs / 1000.0;
            // 取样窗口数
            this.sampleCount = sampleCount;
            // 滑动窗口数组
            this.array = new AtomicReferenceArray<>(sampleCount);
        }
    

    其中,array是存放每个小窗口统计数据的数组,AtomicReferenceArray是一个支持原子操作的数组,WindowWrap是窗口的实现,里面的泛型是具体统计的内容。
    WindowWrap的实现

    public class WindowWrap<T> {
    
        /**
         * Time length of a single window bucket in milliseconds.
         */
        private final long windowLengthInMs;
    
        /**
         * Start timestamp of the window in milliseconds.
         */
        private long windowStart;
    
        /**
         * Statistic data.
         */
        private T value;
    
        /**
         * @param windowLengthInMs a single window bucket's time length in milliseconds.
         * @param windowStart      the start timestamp of the window
         * @param value            statistic data
         */
        public WindowWrap(long windowLengthInMs, long windowStart, T value) {
            this.windowLengthInMs = windowLengthInMs;
            this.windowStart = windowStart;
            this.value = value;
        }
        
        /**
         * Reset start timestamp of current bucket to provided time.
         *
         * @param startTime valid start timestamp
         * @return bucket after reset
         */
        public WindowWrap<T> resetTo(long startTime) {
            this.windowStart = startTime;
            return this;
        }
    
        /**
         * Check whether given timestamp is in current bucket.
         *
         * @param timeMillis valid timestamp in ms
         * @return true if the given time is in current bucket, otherwise false
         * @since 1.5.0
         */
        public boolean isTimeInWindow(long timeMillis) {
            return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
        }
    }
    

    这个具体的window元素类,其中主要有windowStart窗口开始时间,value一个泛型的统计数据,可以灵活定义。
    获取当前窗口

       /**
         * Get bucket item at provided timestamp.
         *
         * @param timeMillis a valid timestamp in milliseconds
         * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
         */
        public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            }
    
            int idx = calculateTimeIdx(timeMillis);
            // Calculate current bucket start time.
            long windowStart = calculateWindowStart(timeMillis);
    
            /*
             * Get bucket item at given time from the array.
             *
             * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
             * (2) Bucket is up-to-date, then just return the bucket.
             * (3) Bucket is deprecated, then reset current bucket.
             */
            while (true) {
                WindowWrap<T> old = array.get(idx);
                if (old == null) {
                    /*
                     *     B0       B1      B2    NULL      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            bucket is empty, so create new and update
                     *
                     * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                     * then try to update circular array via a CAS operation. Only one thread can
                     * succeed to update, while other threads yield its time slice.
                     */
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                    if (array.compareAndSet(idx, null, window)) {
                        // Successfully updated, return the created bucket.
                        return window;
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                        Thread.yield();
                    }
                } else if (windowStart == old.windowStart()) {
                    /*
                     *     B0       B1      B2     B3      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            startTime of Bucket 3: 800, so it's up-to-date
                     *
                     * If current {@code windowStart} is equal to the start timestamp of old bucket,
                     * that means the time is within the bucket, so directly return the bucket.
                     */
                    return old;
                } else if (windowStart > old.windowStart()) {
                    /*
                     *   (old)
                     *             B0       B1      B2    NULL      B4
                     * |_______||_______|_______|_______|_______|_______||___
                     * ...    1200     1400    1600    1800    2000    2200  timestamp
                     *                              ^
                     *                           time=1676
                     *          startTime of Bucket 2: 400, deprecated, should be reset
                     *
                     * If the start timestamp of old bucket is behind provided time, that means
                     * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                     * Note that the reset and clean-up operations are hard to be atomic,
                     * so we need a update lock to guarantee the correctness of bucket update.
                     *
                     * The update lock is conditional (tiny scope) and will take effect only when
                     * bucket is deprecated, so in most cases it won't lead to performance loss.
                     */
                    if (updateLock.tryLock()) {
                        try {
                            // Successfully get the update lock, now we reset the bucket.
                            return resetWindowTo(old, windowStart);
                        } finally {
                            updateLock.unlock();
                        }
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                        Thread.yield();
                    }
                } else if (windowStart < old.windowStart()) {
                    // Should not go through here, as the provided time is already behind.
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                }
            }
        }
    

    这里面会先计算对应窗口数组的下标和窗口的开始时间。

    // 计算窗口数组下标,取商,然后按array.length()取余。
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
            long timeId = timeMillis / windowLengthInMs;
            // Calculate current index so we can map the timestamp to the leap array.
            return (int)(timeId % array.length());
        }
      
    // 计算窗口的开始时间, 把时间戳减去余数
        protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
            return timeMillis - timeMillis % windowLengthInMs;
        }
    

    然后看原有的数组位置是否存在WindowWrap。

    • 如果没有,直接通过cas操作设置一个新的windowWrap。
    • 存在的情况下,和新计算的windowStart一致,可以直接用
    • 存在的情况下,之前window的时间小于计算的windowStart,则加锁重置,这里面的resetWindowTo 归具体的实现类实现,一般是windowWrap设置为当前windowStart,然后对其中的数值进行reset。
      获取有效的窗口列表
    /**
         * Get aggregated value list for entire sliding window.
         * The list will only contain value from "valid" buckets.
         *
         * @return aggregated value list for entire sliding window
         */
        public List<T> values() {
            return values(TimeUtil.currentTimeMillis());
        }
    
        public List<T> values(long timeMillis) {
            if (timeMillis < 0) {
                return new ArrayList<T>();
            }
            int size = array.length();
            List<T> result = new ArrayList<T>(size);
    
            for (int i = 0; i < size; i++) {
                WindowWrap<T> windowWrap = array.get(i);
                if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                    continue;
                }
                result.add(windowWrap.value());
            }
            return result;
        }
    
    /**
         * Check if a bucket is deprecated, which means that the bucket
         * has been behind for at least an entire window time span.
         *
         * @param windowWrap a non-null bucket
         * @return true if the bucket is deprecated; otherwise false
         */
        public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
            return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
        }
    
        public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
            // intervalInMs 是整体的统计周期,大于这个值window认为是失效的
            return time - windowWrap.windowStart() > intervalInMs;
        }
    

    小结
    滑动窗口实现类LeapArray作为整体的壳,实现了滑动窗口中窗口定位,窗口创建,是否过期判断等功能。具体的统计数据放到windowWrap中的泛型中,想存什么都可以。

    限流熔断具体统计类解析

    限流类统计

    限流判断所用统计参数基本都是来自statisticSlot统计的数据,这其中qps相关的计数依赖于StatisticNode类,下面看下统计数据的变量。

     /**
         * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
         * by given {@code sampleCount}.
         */
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);
    
        /**
         * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
         * meaning each bucket per second, in this way we can get accurate statistics of each second.
         */
        private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    
    

    其中的metric为ArrayMetric类型的变量,里面的传参为统计时间区间和分的窗口数。

    public class ArrayMetric implements Metric {
        // 持有一个LeapArray对象,里面的泛型为MetricBucket
        private final LeapArray<MetricBucket> data;
    
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        }
    }
    

    ArrayMetric中具体进行qps、rt等数据统计的方法如下,基本是先定位到当前窗口,再进行add操作。

        @Override
        public void addSuccess(int count) {
            // 定位到当前窗口
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            // 进行add计算
            wrap.value().addSuccess(count);
        }
    
        @Override
        public void addPass(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addPass(count);
        }
    
        @Override
        public void addRT(long rt) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addRT(rt);
        }
    

    这里面可以看到泛型的实现是MetricBucket,看下其具体实现,里面counters作为计数器数组,里面按MetricEvent的序号作为下标。

    public class MetricBucket {
        // 计数器,按MetricEvent这个enum的序号当下标
        private final LongAdder[] counters;
    
        private volatile long minRt;
    
        public MetricBucket() {
            MetricEvent[] events = MetricEvent.values();
            this.counters = new LongAdder[events.length];
            for (MetricEvent event : events) {
                counters[event.ordinal()] = new LongAdder();
            }
            initMinRt();
        }
        
        public void addSuccess(int n) {
            add(MetricEvent.SUCCESS, n);
        }
    
        public void addRT(long rt) {
            add(MetricEvent.RT, rt);
    
            // Not thread-safe, but it's okay.
            if (rt < minRt) {
                minRt = rt;
            }
        }
        
        public void addPass(int n) {
            add(MetricEvent.PASS, n);
        }
    
        public MetricBucket add(MetricEvent event, long n) {
            // 按下标增加n
            counters[event.ordinal()].add(n);
            return this;
        }
    }
    

    熔断类统计

    按错误熔断
    错误计数器使用SimpleErrorCounter类,里面有两个变量errorCount和totalCount进行统计

     private final LeapArray<SimpleErrorCounter> stat;
    
     static class SimpleErrorCounter {
            private LongAdder errorCount;
            private LongAdder totalCount;
    
            public SimpleErrorCounter() {
                this.errorCount = new LongAdder();
                this.totalCount = new LongAdder();
            }
    
            public LongAdder getErrorCount() {
                return errorCount;
            }
    
            public LongAdder getTotalCount() {
                return totalCount;
            }
    
            public SimpleErrorCounter reset() {
                errorCount.reset();
                totalCount.reset();
                return this;
            }
    }
    
    static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
    
            public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
                super(sampleCount, intervalInMs);
            }
    
            @Override
            public SimpleErrorCounter newEmptyBucket(long timeMillis) {
                return new SimpleErrorCounter();
            }
    
            @Override
            protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
                // Update the start time and reset value.
                w.resetTo(startTime);
                w.value().reset();
                return w;
            }
        }
    
    

    按慢请求数统计
    按慢请求数统计,SlowRequestCounter类作为泛型类,实现和SimpleErrorCounter类似,这里就不贴了。

    滑动窗口自实现

    利用数组,循环使用,当作滑动窗口数组,不是线程安全的,简单实现方法。数组保留窗口size两倍的长度,每次清掉后面的一倍窗口size的数组元素。

    public class SlidingWindow {
        /**
         * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
         */
        private AtomicInteger[] timeSlices;
        /**
         * 队列的总长度
         */
        private int timeSliceSize;
        /**
         * 每个时间片的时长,以毫秒为单位
         */
        private int timeMillisPerSlice;
        /**
         * 共有多少个时间片(即窗口长度)
         */
        private int windowSize;
        /**
         * 在一个完整窗口期内允许通过的最大阈值
         */
        private int threshold;
        /**
         * 该滑窗的起始创建时间,也就是第一个数据
         */
        private long beginTimestamp;
        /**
         * 最后一个数据的时间戳
         */
        private long lastAddTimestamp;
    
        public static void main(String[] args) {
            //1秒一个时间片,窗口共5个
            SlidingWindow window = new SlidingWindow(300, 3, 8);
            AtomicInteger num = new AtomicInteger(0);
            for (int i = 0; i < 100; i++) {
    
                System.out.println("限流运行结果:" + window.addCount(1));
                window.print();
                System.out.println("");
                System.out.println(Thread.currentThread().getName() + "--------------------------");
    
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
        public SlidingWindow(int duration, int threshold) {
            //超过10分钟的按10分钟
            if (duration > 600) {
                duration = 600;
            }
            //要求5秒内探测出来的,
            if (duration <= 5) {
                this.windowSize = 5;
                this.timeMillisPerSlice = duration * 200;
            } else {
                this.windowSize = 10;
                this.timeMillisPerSlice = duration * 100;
            }
            this.threshold = threshold;
            // 保证存储在至少两个window
            this.timeSliceSize = windowSize * 2;
    
            reset();
        }
    
        public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
            this.timeMillisPerSlice = timeMillisPerSlice;
            this.windowSize = windowSize;
            this.threshold = threshold;
            // 保证存储在至少两个window
            this.timeSliceSize = windowSize * 2;
    
            reset();
        }
    
        /**
         * 初始化
         */
        private void reset() {
            beginTimestamp = System.currentTimeMillis();
            //窗口个数
            AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
            for (int i = 0; i < timeSliceSize; i++) {
                localTimeSlices[i] = new AtomicInteger(0);
            }
            timeSlices = localTimeSlices;
        }
    
        private void print() {
            System.out.println("当前各窗口值:");
            for (AtomicInteger integer : timeSlices) {
                System.out.print(integer + "-");
            }
        }
    
        /**
         * 计算当前所在的时间片的位置
         */
        private int locationIndex() {
            long now = System.currentTimeMillis();
            //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
            if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
                reset();
            }
    
            return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
        }
    
        /**
         * 增加count个数量
         */
        public boolean addCount(int count) {
            //当前自己所在的位置,是哪个小时间窗
            int index = locationIndex();
    //        System.out.println("index:" + index);
            //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
            //譬如1秒分4个窗口,那么数组共计8个窗口
            //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
            clearFromIndex(index);
    
            int sum = 0;
            // 在当前时间片里继续+1
            sum += timeSlices[index].addAndGet(count);
            //加上前面几个时间片
            for (int i = 1; i < windowSize; i++) {
                sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
            }
            System.out.println("当前下标:" + index + "当前总和:" + sum);
    
            lastAddTimestamp = System.currentTimeMillis();
    
            return sum >= threshold;
        }
    
        private void clearFromIndex(int index) {
            for (int i = 1; i <= windowSize; i++) {
                int j = index + i;
                if (j >= windowSize * 2) {
                    j -= windowSize * 2;
                }
                timeSlices[j].set(0);
            }
        }
    
    }
    

    参考文章

    04 Sentinel 基于滑动窗口的实时指标数据统计
    Java简单实现滑动窗口

    相关文章

      网友评论

          本文标题:sentinel滑动窗口统计数据实现

          本文链接:https://www.haomeiwen.com/subject/zcllldtx.html