美文网首页服务监控和治理
Sentinel之实时数据分析

Sentinel之实时数据分析

作者: 九点半的马拉 | 来源:发表于2020-04-21 19:06 被阅读0次

    Sentinel有一个重要的功能,即实时数据统计分析,我们可以获得在每1秒或者每1分钟下的每个上下文调用链路中的某一资源的请求数、阻塞数或响应时间;也可以获得某一资源全局的请求数、阻塞数或者响应时间。 主要实现逻辑是在StatisticSlot中。

    Statisticslot处于调用链slotchain中的第三个,负责统计资源的实时状态,调用到slotchain中的任意一个slot时,都会触发该slot的entry方法。

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
        try {
            // 触发下一个Slot的entry方法
            fireEntry(context, resourceWrapper, node, count, args);
            // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
            // 统计信息
            node.increaseThreadNum();
            node.addPassRequest();
            // 省略部分代码
        } catch (BlockException e) {
            context.getCurEntry().setError(e);
            // Add block count.
            node.increaseBlockedQps();
            // 省略部分代码
            throw e;
        } catch (Throwable e) {
            context.getCurEntry().setError(e);
            // Should not happen
            node.increaseExceptionQps();
            // 省略部分代码
            throw e;
        }
    }
    

    entry()主要有三个部分:
    1) 首先会触发后续slot的entry方法,如SystemSlot、FlowSlot、DegradeSlot等的规则。
    2)当后续的slot通过,没有抛出BlockException异常,说明该资源被成功调用,则增加执行线程数和通过的请求数。
    3)当后续的slot中某一没有通过,则会抛出BlockException等异常,如果捕获的是BlockException异常,则主要是增加阻塞的数量;如果是系统异常,则增加异常数量。

    当退出的时候会执行exit()方法:

     public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            DefaultNode node = (DefaultNode)context.getCurNode();
            if (context.getCurEntry().getError() == null) {
                //计算响应时间,通过当前时间-CurEntry的创建时间取毫秒值
                long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
                if (rt > Constants.TIME_DROP_VALVE) {
                    rt = Constants.TIME_DROP_VALVE;
                }
                //新增响应时间和成功数
                node.addRtAndSuccess(rt, count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
                }
                //线程数减1
                node.decreaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().decreaseThreadNum();
                }
                //全局线程数-1
                if (resourceWrapper.getType() == EntryType.IN) {
                    Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                    Constants.ENTRY_NODE.decreaseThreadNum();
                }
            } else {
                // Error may happen.
            }
            ***其他逻辑***
            fireExit(context, resourceWrapper, count);
        }
    

    当退出时,重点关注响应时间,将本次响应时间收集到Node中,并将当前活跃线程数减1。

    整体流程如上所述,但是具体的操作我们还不清楚,接下来我将分析其中的Qps数是如何统计的。

    在上述的entry()方法中在统计Qps数量时会调用node.addPassRequest();方法。

    @Override
    public void addPassRequest(int count) {
         # DefaultNode类型  
         # 统计某个resource在某个context中的实时指标
         super.addPassRequest(count);
         # ClusterNode类型
         # 统计某个resource在所有的context中实时指标总和
         this.clusterNode.addPassRequest(count);
    }
    

    这两个Node都是StatisticNode的子类,最终会调用StatisticNode中的方法。

    @Override
    public void addPassRequest(int count) {
         # 秒级统计
         rollingCounterInSecond.addPass(count);
         # 分钟统计
         rollingCounterInMinute.addPass(count);
    }
    

    秒级统计和分钟统计的底层原理都是一样的,下面将对秒级统计进行分析。


    更正,在秒级统计中实际是OccupiableBucketLeapArray而不是BucketLeapArray
    为了好理解,假设秒级用BucketLeapArray,而实际上不是
    对于分钟级来说,一种有60个窗口,每个窗口是1s


    public class ArrayMetric implements Metric {
        private final LeapArray<MetricBucket> data;
        
        public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
            if (enableOccupy) {
                this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
            } else {
                this.data = new BucketLeapArray(sampleCount, intervalInMs);
            }
        }
        
        @Override
        public void addPass(int count) {
               WindowWrap<MetricBucket> wrap = data.currentWindow();
               wrap.value().addPass(count);
        }
    

    在上面代码中,有几个重要的类。ArrayMetricBucketLeapArrayMetricBucketWindowWrap

    WindowWrap

    每一个滑动窗口的包装类,其内部的数据结构T是用MetricBucket表示的。

    public class WindowWrap<T> {
        //一个窗口时段的时间长度(以毫秒为单位)
        private final long windowLengthInMs;
        //窗口的开始时间戳(以毫秒为单位)
        private long windowStart;
        //统计数据,MetricBucket
        private T value;
    

    MetricBucket

    表示一段时间内的指标数据,存放在LongAdder类型的数组里。有通过数量、阻塞数量、异常数量、成功数量、响应时间、已通过未来配额。相对于AtomicLongLongAddr在高并发下有更好的吞吐量,代价是花费了更多的空间。

    public class MetricBucket {
        private final LongAdder[] counters;
        private volatile long minRt;
        public long get(MetricEvent event) {
            return counters[event.ordinal()].sum();
        }
    }
    
    public enum MetricEvent {
        PASS,
        BLOCK,
        EXCEPTION,
        SUCCESS,
        RT,
        OCCUPIED_PASS
    }
    

    LeapArray

    Sentinel中统计指标的基本数据结构。

    public LeapArray(int sampleCount, int intervalInMs) {
        # 时间窗口的长度
        this.windowLengthInMs = intervalInMs / sampleCount;
        # 以毫秒为单位的时间间隔,
        this.intervalInMs = intervalInMs;
        # 采样窗口的个数,即数组长度
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    

    在按秒统计时,默认的时间窗口数组长度为2,每个时间窗口的长度为500ms。

    在统计QPS时,第一步是调用data.currentWindow(),获取当前时间窗口。

    public WindowWrap<T> currentWindow() {
            return currentWindow(TimeUtil.currentTimeMillis());
    }
    

    Qps添加第一大步

    下面对currentTimeMills()方法进行拆开分析。

    public WindowWrap<T> currentWindow(long timeMillis) {
            if (timeMillis < 0) {
                return null;
            }
            # 计算给定的时间映射在数组中的下标(默认数组长度为2)
            # 则idx可以是0或者1
            int idx = calculateTimeIdx(timeMillis);
            # 根据当前时间计算出所在窗口应该对用的开始时间
            long windowStart = calculateWindowStart(timeMillis);
    
    private int calculateTimeIdx(long timeMillis) {
            long timeId = timeMillis / windowLengthInMs;
            return (int)(timeId % array.length());
    }
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
            return timeMillis - timeMillis % windowLengthInMs;
    }
    

    为什么默认要用两个采样窗口,因为sentinel设定的是比较轻量的框架。时间窗口保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多的内存,另一方面时间窗口过多意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动。

    while (true) {
          # 获取存储的该索引位置下的旧的时间窗口
          WindowWrap<T> old = array.get(idx);
          if (old == null) {
              # 没有则创建一个
              WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
              # 通过CAS进行设置
              if (array.compareAndSet(idx, null, window)) {
                     return window;
               } else {
                    //否则当前线程让出时间片,再进行线程竞争
                    Thread.yield();
               }
         # 如果实际应当的开始时间和原来的窗口的开始时间相等,则说明没有失效,直接返回
         } else if (windowStart == old.windowStart()) {
                return old;
         # 让应当的开始时间大于原来old窗口的开始时间,则说明该窗口失效
         } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                   try {
                       # 将旧的时间窗口的开始时间设置为实际应该的开始时间,
                       # 并重置该窗口的统计数据为0
                        return resetWindowTo(old, windowStart);
                   } finally {
                       updateLock.unlock();
                   }
                }  else {
                     Thread.yield();
                    }
        # 这种情况不可能存在,会抛出异常
        } else if (windowStart < old.windowStart()) {
                    return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
               }
    }
    
    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
            // Update the start time and reset value.
            w.resetTo(startTime);
            # w.value() 即 MetricBucket 
            w.value().reset();
            return w;
    }
    #重新设置它的开始时间
    public WindowWrap<T> resetTo(long startTime) {
            this.windowStart = startTime;
            return this;
    } 
    # 将MetricBucket的统计数据都重置为0
    public void reset() {
            internalReset(0L);
    }
    

    Qps添加第二大步

    至此,第一大步已经介绍完了,下面是第二大步wrap.value().addPass(count)
    这一步很简单,就是在第一步后会获得所处的时间窗口WindowWrap,然后得到该类里面的MetricBucket,它统计了该事件窗口下的数据统计,最后进行原子增加操作。

    private T value;
    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
            this.windowLengthInMs = windowLengthInMs;
            this.windowStart = windowStart;
            this.value = value;
    }
    public T value() {
            return value;
    }
    
    public void addPass(int n) {
            add(MetricEvent.PASS, n);
    }
    
    public MetricBucket add(MetricEvent event, long n) {
            counters[event.ordinal()].add(n);
            return this;
    }
    

    以上就是增加Qps的整体流程。

    Qps数据获取

    那我们将数据添加上了,那怎么查询获得呢?

    TIM截图1.png

    经过学习了解后,我们可以知道资源的数据统计存放在DefaultNodeClsterNode中,它们都是StatisticNode的子类,StatisticNode实现了NOde接口的很多关于统计数据的方法,其中有统计Qps的方法。

    @Override
    public double passQps() {
            # 先获取现在的时间窗口数组的Qps总量 @(1)
            # 然后获取时间 @(2)
            return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
    }
    

    代码@(1)解析

    @Override
    public long pass() {
            # 与前面方法一致,过滤掉过期窗口
            data.currentWindow();
            long pass = 0;
            List<MetricBucket> list = data.values();
    
            for (MetricBucket window : list) {
                pass += window.pass();
            }
            return pass;
    }
    
    public List<T> values() {
            return values(TimeUtil.currentTimeMillis());
    }
    
    public List<T> values(long timeMillis) {
            if (timeMillis < 0) {
                return new ArrayList<T>();
            }
            int size = array.length();
            List<T> result = new ArrayList<T>(size);
    
            for (int i = 0; i < size; i++) {
                WindowWrap<T> windowWrap = array.get(i);
                if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                    continue;
                }
                # 即 MetricBucket
                result.add(windowWrap.value());
            }
            return result;
        }
    

    当前时间减去某一窗口的开始时间,超过了事件间隔(按秒统计的话,就是1s),就说明该窗口过期,不添加。

    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
            return time - windowWrap.windowStart() > intervalInMs;
    }
    

    代码@(2)解析

    因为之前的时间单位是毫秒,现在计算的是每秒,所以转化为秒。

    @Override
    public double getWindowIntervalInSec() {
            return data.getIntervalInSecond();
    }
    
    public double getIntervalInSecond() {
            return intervalInMs / 1000.0;
    }
    

    至此,关于实时统计的模块就讲完了,大部分是参考几个大神的文章,图文并茂,很好理解,大家可以阅读如下:

    Sentinel 原理-滑动窗口
    Alibaba Seninel 滑动窗口实现原理(文末附原理图)
    源码分析 Sentinel 实时数据采集实现原理

    相关文章

      网友评论

        本文标题:Sentinel之实时数据分析

        本文链接:https://www.haomeiwen.com/subject/cfipihtx.html