美文网首页Java高级交流
【sentinel】深入浅出之原理篇StatisticSlot&

【sentinel】深入浅出之原理篇StatisticSlot&

作者: 一滴水的坚持 | 来源:发表于2019-03-18 20:34 被阅读0次

    StatisticSlot则用于记录,统计不同纬度的 runtime 信息,在这里记录线程数变化,请求数量,计算RT时间,代码比较简单:

    public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            try {
    
                fireEntry(context, resourceWrapper, node, count, prioritized, args);
                //请求通过,增加线程数
                node.increaseThreadNum();
                //增加请求通过数
                node.addPassRequest(count);
                //如果原始节点存在,则新增线程数和通过的请求总数
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                    context.getCurEntry().getOriginNode().addPassRequest(count);
                }
                //如果是IN,则在Cluster节点上新增线程数和通过请求数,这个是全局的ClusterNode,和ClusterBuilderSlot的ClusterNode不一样,此处所有请求共享同一个Cluster
                if (resourceWrapper.getType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                    Constants.ENTRY_NODE.addPassRequest(count);
                }
                //钩子函数
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (PriorityWaitException ex) {
                //增加线程数
                node.increaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                }
                //增加线程数 共享全局Cluster
                if (resourceWrapper.getType() == EntryType.IN) {
                    Constants.ENTRY_NODE.increaseThreadNum();
                }
                 //钩子函数
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (BlockException e) {
                context.getCurEntry().setError(e);
                //节点Block数量加一
                node.increaseBlockQps(count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseBlockQps(count);
                }
                if (resourceWrapper.getType() == EntryType.IN) {
                    Constants.ENTRY_NODE.increaseBlockQps(count);
                }
                //钩子,扩展
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onBlocked(e, context, resourceWrapper, node, count, args);
                }
    
                throw e;
            } catch (Throwable e) {
                context.getCurEntry().setError(e);
                node.increaseExceptionQps(count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseExceptionQps(count);
                }
    
                if (resourceWrapper.getType() == EntryType.IN) {
                    Constants.ENTRY_NODE.increaseExceptionQps(count);
                }
                throw e;
            }
        }
    
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            DefaultNode node = (DefaultNode)context.getCurNode();
            if (context.getCurEntry().getError() == null) {
                //计算响应时间,通过当前时间-CurEntry的创建时间取毫秒值
                long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
                if (rt > Constants.TIME_DROP_VALVE) {
                    rt = Constants.TIME_DROP_VALVE;
                }
                //新增响应时间和成功数
                node.addRtAndSuccess(rt, count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
                }
                //线程数减1
                node.decreaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().decreaseThreadNum();
                }
                //全局线程数-1
                if (resourceWrapper.getType() == EntryType.IN) {
                    Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
                    Constants.ENTRY_NODE.decreaseThreadNum();
                }
            } else {
                // Error may happen.
            }
            //回调钩子
            Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
            for (ProcessorSlotExitCallback handler : exitCallbacks) {
                handler.onExit(context, resourceWrapper, count, args);
            }
            fireExit(context, resourceWrapper, count);
        }
    }
    

    逻辑简单,但实现并不简单,先了解一下DefaultNode的Api:

    
    public class DefaultNode extends StatisticNode {
    
       private ResourceWrapper id;
       private volatile Set<Node> childList = new HashSet<>();
       private ClusterNode clusterNode;
    
       @Override
       public void increaseBlockQps(int count) {
           super.increaseBlockQps(count);
           this.clusterNode.increaseBlockQps(count);
       }
    
       @Override
       public void increaseExceptionQps(int count) {
           super.increaseExceptionQps(count);
           this.clusterNode.increaseExceptionQps(count);
       }
    
       @Override
       public void addRtAndSuccess(long rt, int successCount) {
           super.addRtAndSuccess(rt, successCount);
           this.clusterNode.addRtAndSuccess(rt, successCount);
       }
    
       @Override
       public void increaseThreadNum() {
           super.increaseThreadNum();
           this.clusterNode.increaseThreadNum();
       }
    
       @Override
       public void decreaseThreadNum() {
           super.decreaseThreadNum();
           this.clusterNode.decreaseThreadNum();
       }
    
       @Override
       public void addPassRequest(int count) {
           super.addPassRequest(count);
           this.clusterNode.addPassRequest(count);
       }
    
       private void visitTree(int level, DefaultNode node) {
           for (int i = 0; i < level; ++i) {
               System.out.print("-");
           }
           if (!(node instanceof EntranceNode)) {
               System.out.println(
                   String.format("%s(thread:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(),
                       node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(),
                       node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest()));
           } else {
               System.out.println(
                   String.format("Entry-%s(t:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(),
                       node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(),
                       node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest()));
           }
           for (Node n : node.getChildList()) {
               DefaultNode dn = (DefaultNode)n;
               visitTree(level + 1, dn);
           }
       }
    
    }
    

    上文链接 ClusterBuilderSlot原理介绍已经提到过,一个ContextName对应的同一个Resource对应ClusterNode为同一个,所以这里同步新增,或减少记录数,都是基于当前节点和对应的ClusterNode一起统计的。
    不管是ClusterNode,或者DefaultNode节点,对其添加,或记录Qps,rt都是基于父类去实现,这样来讲,所有Sentinel最核心的代码就在StatisticNode中。


    StatisticNode中,是这样注释的:

    Sentinel使用滑动窗口来记录和统计实时调用数据。

    • 当第一个请求到来,Sentinel会创建一个特殊的时间片(time-span)去保存运行时的数据,比如:响应时间(rt),QPS, block request,在这里叫做滑动窗口(window bucket),这个滑动窗口通过sample count定义。Sentinel通过滑动窗口有效的数据来决定当前请求是否通过,滑动窗口将记录所有的qps,将其与规则中定义的阈值进行比较。
    • 不同的请求进来,根据不同的时间存放在不同滑动窗口中。
    • 请求不断的进入系统,先前的滑动窗口将会过期无效。

    理解StatisticNode节点之前,先了解几个数据结构:

    • LeapArray Sentinel中的metrics的基本数据结构
      • LeapArray使用滑动窗口算法统计数据,每一个桶覆盖windowLengthInMs的时间长数据,总的时间长度是intervalInMs,所以,sampleCount = intervalInMs / windowLengthInMs。
    public abstract class LeapArray<T> {
        //单位时间窗口长度
        protected int windowLengthInMs;
        //总的桶个数
        protected int sampleCount;
        //总的时间长度
        protected int intervalInMs;
        //记录的窗口数,长度与sampleCount一样
        protected final AtomicReferenceArray<WindowWrap<T>> array;
    }
    

    构造方法如下:

    public LeapArray(int sampleCount, int intervalInMs) {
        //每ms的窗口长度为总的时间长度/桶的总数
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
        //记录每个windowLengthInMs的滑动窗口信息
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    

    而在WindowWrap中,则记录了该窗口的开始时间,和时长,和该时间窗口的数据信息。

    public class WindowWrap<T> {
        //窗口长度
        private final long windowLengthInMs;
        //窗口开始时间 long类型,
        private long windowStart;
        //data数据
        private T value;
        //复位该时间窗口
        public WindowWrap<T> resetTo(long startTime) {
            this.windowStart = startTime;
            return this;
        }
        //判断是否该时间在该窗口内
        public boolean isTimeInWindow(long timeMillis) {
            return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
        }
    }
    

    继续回到 LeapArray,看看如何根据时间找到该窗口:

    • 根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx
    • 根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位,时间窗口开始时间为 windowLengthInMs的整数倍(取该时间单位整数开始时间,比如1000501,则从1000500开始)
    • 获取idx位置的窗口
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        //计算当前时间的时间窗口的位置
        int idx = calculateTimeIdx(timeMillis);
        //计算当前时间窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            //取该下表对应的时间窗口
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                //不存在,则创建一个新的
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
                if (array.compareAndSet(idx, null, window)) {
                    return window;
                } else {
                    //如果失败,则代表有其他的线程再创建,放弃时间片
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                如果是这个窗口的开始时间,则直接返回
                return old;
            } else if (windowStart > old.windowStart()) {
                //如果当前时间的窗口开始时间>老的时间窗口,则重置该时间窗口时间
                // 防止并发,加重入锁
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    //失败则代表锁已经被其他线程占用
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
            }
        }
    }
    

    而在StatisticNode节点中,实质也是使用LeapArray来存储,从LeapArray中获取MetricBucket,对QPS,请求线程数,rt时间等坐记录。
    再来看一下StatisticNode的定义:

    public class StatisticNode implements Node {
        //每秒的滚动计数器 SAMPLE_COUNT为2对应LeapArray中的sample count,IntervalProperty.INTERVAL为1000代表1s,1s分为两个桶,保存数据。
        private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);
        //每分钟的滚动计数器1分钟分为60个记录,1分钟一个。
        private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
        //当前线程数
        private AtomicInteger curThreadNum = new AtomicInteger(0);
        //最后一次metrics被获取的时间
        private long lastFetchTime = -1;
    }
    

    所以,在添加rt时间,qps,BlockQps等实质都是使用LeapArray的当前窗口去做添加

    //StatisticNode.java
    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
    
    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);
        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }
    @Override
    public void increaseBlockQps(int count) {
        rollingCounterInSecond.addBlock(count);
        rollingCounterInMinute.addBlock(count);
    }
    @Override
    public void increaseExceptionQps(int count) {
        rollingCounterInSecond.addException(count);
        rollingCounterInMinute.addException(count);
    }   
    
    @Override
    public void addBlock(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addBlock(count);
    }
    
    @Override
    public void addSuccess(int count) {
        //当前窗口
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }
    
    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
    
    @Override
    public void addRT(long rt) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addRT(rt);
    }
    

    https://www.jianshu.com/p/6ee4b7bdb844 这篇博客对滑动窗口讲的比较细,可以看看。

    相关文章

      网友评论

        本文标题:【sentinel】深入浅出之原理篇StatisticSlot&

        本文链接:https://www.haomeiwen.com/subject/jyfimqtx.html