

作者: justonemoretry | 来源:发表于2023-02-28 11:48 被阅读0次






       protected final AtomicReferenceArray<WindowWrap<T>> array;
         * The conditional (predicate) update lock is used only when current bucket is deprecated.
        private final ReentrantLock updateLock = new ReentrantLock();
         * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
         * @param sampleCount  bucket count of the sliding window
         * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
        public LeapArray(int sampleCount, int intervalInMs) {
            AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
            AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
            AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
            // 单个window长度
            this.windowLengthInMs = intervalInMs / sampleCount;
            // 统计周期长度
            this.intervalInMs = intervalInMs;
            this.intervalInSecond = intervalInMs / 1000.0;
            // 取样窗口数
            this.sampleCount = sampleCount;
            // 滑动窗口数组
            this.array = new AtomicReferenceArray<>(sampleCount);


    public class WindowWrap<T> {
         * Time length of a single window bucket in milliseconds.
        private final long windowLengthInMs;
         * Start timestamp of the window in milliseconds.
        private long windowStart;
         * Statistic data.
        private T value;
         * @param windowLengthInMs a single window bucket's time length in milliseconds.
         * @param windowStart      the start timestamp of the window
         * @param value            statistic data
        public WindowWrap(long windowLengthInMs, long windowStart, T value) {
            this.windowLengthInMs = windowLengthInMs;
            this.windowStart = windowStart;
            this.value = value;
         * Reset start timestamp of current bucket to provided time.
         * @param startTime valid start timestamp
         * @return bucket after reset
        public WindowWrap<T> resetTo(long startTime) {
            this.windowStart = startTime;
            return this;
         * Check whether given timestamp is in current bucket.
         * @param timeMillis valid timestamp in ms
         * @return true if the given time is in current bucket, otherwise false
         * @since 1.5.0
        public boolean isTimeInWindow(long timeMillis) {
            return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;


         * Get bucket item at provided timestamp.
         * @param timeMillis a valid timestamp in milliseconds
         * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
        public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            int idx = calculateTimeIdx(timeMillis);
            // Calculate current bucket start time.
            long windowStart = calculateWindowStart(timeMillis);
             * Get bucket item at given time from the array.
             * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
             * (2) Bucket is up-to-date, then just return the bucket.
             * (3) Bucket is deprecated, then reset current bucket.
            while (true) {
                WindowWrap<T> old = array.get(idx);
                if (old == null) {
                     *     B0       B1      B2    NULL      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            bucket is empty, so create new and update
                     * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                     * then try to update circular array via a CAS operation. Only one thread can
                     * succeed to update, while other threads yield its time slice.
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                    if (array.compareAndSet(idx, null, window)) {
                        // Successfully updated, return the created bucket.
                        return window;
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                } else if (windowStart == old.windowStart()) {
                     *     B0       B1      B2     B3      B4
                     * ||_______|_______|_______|_______|_______||___
                     * 200     400     600     800     1000    1200  timestamp
                     *                             ^
                     *                          time=888
                     *            startTime of Bucket 3: 800, so it's up-to-date
                     * If current {@code windowStart} is equal to the start timestamp of old bucket,
                     * that means the time is within the bucket, so directly return the bucket.
                    return old;
                } else if (windowStart > old.windowStart()) {
                     *   (old)
                     *             B0       B1      B2    NULL      B4
                     * |_______||_______|_______|_______|_______|_______||___
                     * ...    1200     1400    1600    1800    2000    2200  timestamp
                     *                              ^
                     *                           time=1676
                     *          startTime of Bucket 2: 400, deprecated, should be reset
                     * If the start timestamp of old bucket is behind provided time, that means
                     * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                     * Note that the reset and clean-up operations are hard to be atomic,
                     * so we need a update lock to guarantee the correctness of bucket update.
                     * The update lock is conditional (tiny scope) and will take effect only when
                     * bucket is deprecated, so in most cases it won't lead to performance loss.
                    if (updateLock.tryLock()) {
                        try {
                            // Successfully get the update lock, now we reset the bucket.
                            return resetWindowTo(old, windowStart);
                        } finally {
                    } else {
                        // Contention failed, the thread will yield its time slice to wait for bucket available.
                } else if (windowStart < old.windowStart()) {
                    // Should not go through here, as the provided time is already behind.
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));


    // 计算窗口数组下标,取商,然后按array.length()取余。
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
            long timeId = timeMillis / windowLengthInMs;
            // Calculate current index so we can map the timestamp to the leap array.
            return (int)(timeId % array.length());
    // 计算窗口的开始时间, 把时间戳减去余数
        protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
            return timeMillis - timeMillis % windowLengthInMs;


    • 如果没有,直接通过cas操作设置一个新的windowWrap。
    • 存在的情况下,和新计算的windowStart一致,可以直接用
    • 存在的情况下,之前window的时间小于计算的windowStart,则加锁重置,这里面的resetWindowTo 归具体的实现类实现,一般是windowWrap设置为当前windowStart,然后对其中的数值进行reset。
         * Get aggregated value list for entire sliding window.
         * The list will only contain value from "valid" buckets.
         * @return aggregated value list for entire sliding window
        public List<T> values() {
            return values(TimeUtil.currentTimeMillis());
        public List<T> values(long timeMillis) {
            if (timeMillis < 0) {
                return new ArrayList<T>();
            int size = array.length();
            List<T> result = new ArrayList<T>(size);
            for (int i = 0; i < size; i++) {
                WindowWrap<T> windowWrap = array.get(i);
                if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            return result;
         * Check if a bucket is deprecated, which means that the bucket
         * has been behind for at least an entire window time span.
         * @param windowWrap a non-null bucket
         * @return true if the bucket is deprecated; otherwise false
        public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
            return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
        public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
            // intervalInMs 是整体的统计周期,大于这个值window认为是失效的
            return time - windowWrap.windowStart() > intervalInMs;





         * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
         * by given {@code sampleCount}.
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
         * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
         * meaning each bucket per second, in this way we can get accurate statistics of each second.
        private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);


    public class ArrayMetric implements Metric {
        // 持有一个LeapArray对象,里面的泛型为MetricBucket
        private final LeapArray<MetricBucket> data;
        public ArrayMetric(int sampleCount, int intervalInMs) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);


        public void addSuccess(int count) {
            // 定位到当前窗口
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            // 进行add计算
        public void addPass(int count) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
        public void addRT(long rt) {
            WindowWrap<MetricBucket> wrap = data.currentWindow();


    public class MetricBucket {
        // 计数器,按MetricEvent这个enum的序号当下标
        private final LongAdder[] counters;
        private volatile long minRt;
        public MetricBucket() {
            MetricEvent[] events = MetricEvent.values();
            this.counters = new LongAdder[events.length];
            for (MetricEvent event : events) {
                counters[event.ordinal()] = new LongAdder();
        public void addSuccess(int n) {
            add(MetricEvent.SUCCESS, n);
        public void addRT(long rt) {
            add(MetricEvent.RT, rt);
            // Not thread-safe, but it's okay.
            if (rt < minRt) {
                minRt = rt;
        public void addPass(int n) {
            add(MetricEvent.PASS, n);
        public MetricBucket add(MetricEvent event, long n) {
            // 按下标增加n
            return this;



     private final LeapArray<SimpleErrorCounter> stat;
     static class SimpleErrorCounter {
            private LongAdder errorCount;
            private LongAdder totalCount;
            public SimpleErrorCounter() {
                this.errorCount = new LongAdder();
                this.totalCount = new LongAdder();
            public LongAdder getErrorCount() {
                return errorCount;
            public LongAdder getTotalCount() {
                return totalCount;
            public SimpleErrorCounter reset() {
                return this;
    static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
            public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
                super(sampleCount, intervalInMs);
            public SimpleErrorCounter newEmptyBucket(long timeMillis) {
                return new SimpleErrorCounter();
            protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
                // Update the start time and reset value.
                return w;




    public class SlidingWindow {
         * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
        private AtomicInteger[] timeSlices;
         * 队列的总长度
        private int timeSliceSize;
         * 每个时间片的时长,以毫秒为单位
        private int timeMillisPerSlice;
         * 共有多少个时间片(即窗口长度)
        private int windowSize;
         * 在一个完整窗口期内允许通过的最大阈值
        private int threshold;
         * 该滑窗的起始创建时间,也就是第一个数据
        private long beginTimestamp;
         * 最后一个数据的时间戳
        private long lastAddTimestamp;
        public static void main(String[] args) {
            SlidingWindow window = new SlidingWindow(300, 3, 8);
            AtomicInteger num = new AtomicInteger(0);
            for (int i = 0; i < 100; i++) {
                System.out.println("限流运行结果:" + window.addCount(1));
                System.out.println(Thread.currentThread().getName() + "--------------------------");
                try {
                } catch (InterruptedException e) {
        public SlidingWindow(int duration, int threshold) {
            if (duration > 600) {
                duration = 600;
            if (duration <= 5) {
                this.windowSize = 5;
                this.timeMillisPerSlice = duration * 200;
            } else {
                this.windowSize = 10;
                this.timeMillisPerSlice = duration * 100;
            this.threshold = threshold;
            // 保证存储在至少两个window
            this.timeSliceSize = windowSize * 2;
        public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
            this.timeMillisPerSlice = timeMillisPerSlice;
            this.windowSize = windowSize;
            this.threshold = threshold;
            // 保证存储在至少两个window
            this.timeSliceSize = windowSize * 2;
         * 初始化
        private void reset() {
            beginTimestamp = System.currentTimeMillis();
            AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
            for (int i = 0; i < timeSliceSize; i++) {
                localTimeSlices[i] = new AtomicInteger(0);
            timeSlices = localTimeSlices;
        private void print() {
            for (AtomicInteger integer : timeSlices) {
                System.out.print(integer + "-");
         * 计算当前所在的时间片的位置
        private int locationIndex() {
            long now = System.currentTimeMillis();
            if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
         * 增加count个数量
        public boolean addCount(int count) {
            int index = locationIndex();
    //        System.out.println("index:" + index);
            int sum = 0;
            // 在当前时间片里继续+1
            sum += timeSlices[index].addAndGet(count);
            for (int i = 1; i < windowSize; i++) {
                sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
            System.out.println("当前下标:" + index + "当前总和:" + sum);
            lastAddTimestamp = System.currentTimeMillis();
            return sum >= threshold;
        private void clearFromIndex(int index) {
            for (int i = 1; i <= windowSize; i++) {
                int j = index + i;
                if (j >= windowSize * 2) {
                    j -= windowSize * 2;


    04 Sentinel 基于滑动窗口的实时指标数据统计



