美文网首页Sentinel
Sentinel滑动窗口介绍

Sentinel滑动窗口介绍

作者: 晴天哥_王志 | 来源:发表于2021-06-14 11:31 被阅读0次

    系列

    开篇

    • sentinel 处理流程是基于slot链(ProcessorSlotChain)来完成的,如限流熔断等,其中重要的一个slot就是StatisticSlot,它是做各种数据统计的,而限流熔断的数据判断来源就是StatisticSlot。
    • StatisticSlot的各种数据统计都是基于滑动窗口来完成的,因此本文就重点分析StatisticSlot的滑动窗口统计机制。
    • StatisticSlot的滑动窗口需要了解统计指标的数据结构、滑动窗口的窗口定位,指标保存等概念。

    StatisticNode

    public class StatisticNode implements Node {
        // 对每秒指标统计
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);
        // 每分钟指标统计
        private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
        private LongAdder curThreadNum = new LongAdder();
        private long lastFetchTime = -1;
    
    
        @Override
        public void addPassRequest(int count) {
            rollingCounterInSecond.addPass(count);
            rollingCounterInMinute.addPass(count);
        }
    }
    
    • 采集指标的统计节点,负责统计相关的采集指标。
    • StatisticNode包含rollingCounterInSecond和rollingCounterInMinute。
    • rollingCounterInSecond是对每秒指标的统计。
    • rollingCounterInMinute是对每分钟指标的统计。
    • rollingCounterInSecond和rollingCounterInMinute是ArrayMetric,负责保存统计指标。

    统计指标

    • 统计指标使用ArrayMetric进行承载。
    • ArrayMetric内部是滑动窗口LeapArray对象。
    • LeapArray的每个元素为WindowWrap。
    • WindowWrap内部包含MetricBucket。

    ArrayMetric

    public class ArrayMetric implements Metric {
    
        private final LeapArray<MetricBucket> data;
    
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        }
    
        public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
            if (enableOccupy) {
                this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
            } else {
                this.data = new BucketLeapArray(sampleCount, intervalInMs);
            }
        }
    }
    
    • ArrayMetric作为保存指标的数组,通过滑动窗口LeapArray保存MetricBucket。
    • MetricBucket代表统计指标,LeapArray代表滑动窗口,滑动窗口的每个窗口是MetricBucket对象。

    LeapArray

    public class BucketLeapArray extends LeapArray<MetricBucket> {
        public BucketLeapArray(int sampleCount, int intervalInMs) {
            super(sampleCount, intervalInMs);
        }
    }
    
    public abstract class LeapArray<T> {
        protected int windowLengthInMs;
        protected int sampleCount;
        protected int intervalInMs;
        private double intervalInSecond;
        protected final AtomicReferenceArray<WindowWrap<T>> array;
        private final ReentrantLock updateLock = new ReentrantLock();
    
        public LeapArray(int sampleCount, int intervalInMs) {
            this.windowLengthInMs = intervalInMs / sampleCount;
            this.intervalInMs = intervalInMs;
            this.intervalInSecond = intervalInMs / 1000.0;
            this.sampleCount = sampleCount;
            this.array = new AtomicReferenceArray<>(sampleCount);
        }
    }
    
    • LeapArray作为滑动窗口,BucketLeapArray作为其一种具体的实现。
    • LeapArray通过AtomicReferenceArray<WindowWrap<T>> array来实现滑动窗口。
    • 滑动窗口的统计指标MetricBucket通过WindowWrap进行包装。

    WindowWrap

    public class WindowWrap<T> {
    
        private final long windowLengthInMs; // 时间窗口的长度
        private long windowStart; // 时间窗口开始时间
        private T value; // MetricBucket对象,保存各个指标数据
    
        public WindowWrap(long windowLengthInMs, long windowStart, T value) {
            this.windowLengthInMs = windowLengthInMs;
            this.windowStart = windowStart;
            this.value = value;
        }
    }
    
    • WindowWrap作为滑动窗口的每个元素的承载,内部保存MetricBucket。

    MetricBucket

    public class MetricBucket {
    
        private final LongAdder[] counters;
        private volatile long minRt;
    
        public MetricBucket() {
            MetricEvent[] events = MetricEvent.values();
            this.counters = new LongAdder[events.length];
            for (MetricEvent event : events) {
                counters[event.ordinal()] = new LongAdder();
            }
            initMinRt();
        }
    }
    
    public enum MetricEvent {
        PASS, // 正常通过
        BLOCK, // 阻塞
        EXCEPTION, // 异常
        SUCCESS, // 成功
        RT, // RT统计
        OCCUPIED_PASS // 抢占通过
    }
    
    • MetricBucket内部保存各个统计指标MetricEvent的LongAdder数组。
    • MetricEvent的枚举值代表各个采集指标。

    滑动窗口定位

    public abstract class LeapArray<T> {
    
        protected int windowLengthInMs; // 时间窗口的长度
        protected int sampleCount; // 时间窗口的个数
        protected int intervalInMs;
        private double intervalInSecond;
        protected final AtomicReferenceArray<WindowWrap<T>> array;
    
        public WindowWrap<T> currentWindow() {
            return currentWindow(TimeUtil.currentTimeMillis());
        }
    
        public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            }
            // 根据当前时间和时间窗口的长度进行计算获取窗口下标
            int idx = calculateTimeIdx(timeMillis);
            // 获取指定下标的时间窗口的开始时间
            long windowStart = calculateWindowStart(timeMillis);
    
            /*
             * Get bucket item at given time from the array.
             *
             * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
             * (2) Bucket is up-to-date, then just return the bucket.
             * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
             */
            while (true) {
                WindowWrap<T> old = array.get(idx);
                if (old == null) {
                    // 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中
                    /*
                     *     B0       B1      B2    NULL      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            bucket is empty, so create new and update
                     *
                     * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                     * then try to update circular array via a CAS operation. Only one thread can
                     * succeed to update, while other threads yield its time slice.
                     */
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                    if (array.compareAndSet(idx, null, window)) {
                        // Successfully updated, return the created bucket.
                        return window;
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                        Thread.yield();
                    }
                } else if (windowStart == old.windowStart()) {
                    // 2.获取的时间窗口正好对应当前时间,直接返回
                    /*
                     *     B0       B1      B2     B3      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            startTime of Bucket 3: 800, so it's up-to-date
                     *
                     * If current {@code windowStart} is equal to the start timestamp of old bucket,
                     * that means the time is within the bucket, so directly return the bucket.
                     */
                    return old;
                } else if (windowStart > old.windowStart()) {
                    // 3.获取的时间窗口为老的,进行窗口reset操作复用
                    /*
                     *   (old)
                     *             B0       B1      B2    NULL      B4
                     * |_______||_______|_______|_______|_______|_______||___
                     * ...    1200     1400    1600    1800    2000    2200  timestamp
                     *                              ^
                     *                           time=1676
                     *          startTime of Bucket 2: 400, deprecated, should be reset
                     *
                     * If the start timestamp of old bucket is behind provided time, that means
                     * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                     * Note that the reset and clean-up operations are hard to be atomic,
                     * so we need a update lock to guarantee the correctness of bucket update.
                     *
                     * The update lock is conditional (tiny scope) and will take effect only when
                     * bucket is deprecated, so in most cases it won't lead to performance loss.
                     */
                    if (updateLock.tryLock()) {
                        try {
                            // Successfully get the update lock, now we reset the bucket.
                            return resetWindowTo(old, windowStart);
                        } finally {
                            updateLock.unlock();
                        }
                    } else {
                        Thread.yield();
                    }
                } else if (windowStart < old.windowStart()) {
                    // 4.时间回拨了,正常情况下不会走到这里
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                }
            }
        }
    
        private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
            long timeId = timeMillis / windowLengthInMs;
            // Calculate current index so we can map the timestamp to the leap array.
            return (int)(timeId % array.length());
        }
    
        protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
            return timeMillis - timeMillis % windowLengthInMs;
        }
    }
    
    
    public class BucketLeapArray extends LeapArray<MetricBucket> {
    
        public BucketLeapArray(int sampleCount, int intervalInMs) {
            super(sampleCount, intervalInMs);
        }
    
        @Override
        public MetricBucket newEmptyBucket(long time) {
            return new MetricBucket();
        }
    
        @Override
        protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
            // 重置窗口的开始时间和对应的统计值
            w.resetTo(startTime);
            w.value().reset();
            return w;
        }
    }
    
    • 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中
    • 2.获取的时间窗口正好对应当前时间,直接返回
    • 3.获取的时间窗口为老的,进行窗口reset操作复用。reset操作负责重置时间窗口的开始时间和窗口统计值。
    • 4.时间回拨了正常情况下不会走到这里

    指标保存

    public class ArrayMetric implements Metric {
    
        private final LeapArray<MetricBucket> data;
    
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        }
    
        @Override
        public void addPass(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addPass(count);
        }
    }
    
    public class MetricBucket {
        private final LongAdder[] counters;
    
        public void addPass(int n) {
            add(MetricEvent.PASS, n);
        }
    
        public MetricBucket add(MetricEvent event, long n) {
            counters[event.ordinal()].add(n);
            return this;
        }
    }
    
    • currentWindow返回当前时间对应的滑动窗口。
    • addPass通过add指定类型的MetricEvent指标到LongAdder当中。

    相关文章

      网友评论

        本文标题:Sentinel滑动窗口介绍

        本文链接:https://www.haomeiwen.com/subject/uzxneltx.html