美文网首页一些收藏
sentinel滑动窗口限流源码分析

sentinel滑动窗口限流源码分析

作者: 小民自愚 | 来源:发表于2022-05-10 02:56 被阅读0次

阅读提示:如果对sentinel比较熟悉,可以跳过概述,以及源码分析的前半部分,直接从StatisticSlot部分开始阅读,滑动窗口的实现逻辑都是从这里开始的。

1.概述

sentinel的限流是通过滑动窗口来实现的
在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责,例如:

NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
总体的框架如下:

image.png
上面内容来自Sentinel官网给出的内容
从图里可以看出,sentinel对请求的限流是通过责任链模式实现的,经过一系列的slot流程来判断,本次请求是否通过

2.Sentinel源码分析

一般Sentinel限流都是通过:

Entry entry = SphU.entry('entryName');

这个Entry相当于是获取到了一个令牌,如果能够获取到这个令牌,表示可以通过,能够访问资源。
在Sentinel中有几个比较重要的概念:

Entry 代表的是一个令牌,如果能够通过,则获取到entry不为空
Context 代表的则是一次请求的上下文
Node 代表的则是一次请求、一个资源、一个节点集群的请求调用信息记录
当执行SphU.entry的时候,会访问:

// SphU.java
public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }
    // CtSph.java
    StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }
//入口逻辑:初始化上下文,获取链路,进入链路
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
//初始化上下文
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
//获取链路
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
//执行责任链
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

这里首先会获取到当前线程上线文的执行环境Context
然后获取到ProcessorSlot执行链,执行ProcessorSlot.entry
如果上一步执行成功,表示能够访问,返回CtEntry,否则抛出异常
另外需要注意的是,对于资源,在Sentinel抽象成了ResourceWrapper,并重写了equals和hashCode方法:

@Override
public int hashCode() {
    return getName().hashCode();
}

@Override
public boolean equals(Object obj) {
    if (obj instanceof ResourceWrapper) {
        ResourceWrapper rw = (ResourceWrapper)obj;
        return rw.getName().equals(getName());
    }
    return false;
}

只要资源的名称一样,这就是同一个资源

跳过Context,直接看链路执行
先获取执行链路ProcessorSlot:

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}
public static ProcessorSlotChain newSlotChain() {
    if (slotChainBuilder != null) {
        return slotChainBuilder.build();
    }

    // Resolve the slot chain builder SPI.
    slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

    if (slotChainBuilder == null) {
        // Should not go through here.
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        slotChainBuilder = new DefaultSlotChainBuilder();
    } else {
        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
            slotChainBuilder.getClass().getCanonicalName());
    }
    return slotChainBuilder.build();
}

这里也是通过Spi机制获取,在META-INF.services下面,有这个几个文件SPI会用到,这里首先会获取一个SlotChainBuilder,默认获取到的就是DefaultSlotChainBuilder,
在DefaultSlotChainBuilder会加载com.alibaba.csp.sentinel.slotchain.ProcessorSlot里面的类,Sentinel中默认提供了如下实现:

 # Sentinel default ProcessorSlots
 com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

这里加载完之后,会根据ProcessorSlot的注解的order属性进行从大到小的排序,默认几个实现的排序大小大家可对下:

public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
    public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
    public static final int ORDER_LOG_SLOT = -8000;
    public static final int ORDER_STATISTIC_SLOT = -7000;
    public static final int ORDER_AUTHORITY_SLOT = -6000;
    public static final int ORDER_SYSTEM_SLOT = -5000;
    public static final int ORDER_FLOW_SLOT = -2000;
    public static final int ORDER_DEGRADE_SLOT = -1000;

然后这里有一点需要注意,Sentinel中,每个资源会对应一组ProcessorSlot,在这些ProcessorSlot有很多类实例变量,只会记录该资源的信息,,而有些则是全局的,属于整个节点的

NodeSelectorSlot
接下来开始执行chain.entry(context, resourceWrapper, null, count, prioritized, args);,
这里的chain是一个DefaultProcessorSlotChain,这个里面只有了上面加载的ProcessorSlot的链表,最终会从第一个ProcessorSlot往后执行,首选in执行的是NodeSelectorSlot:

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = new DefaultNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
                ((DefaultNode) context.getLastNode()).addChild(node);
            }

        }
    }
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

同样这里的clusterNode也是该资源全局一个。接着完后执行,传递的仍是NodeSelectorSlot中的DefaultNode

StatisticSlot

StatisticSlot的作用是记录每个资源的请求情况,滑动窗口就在这个slot里实现

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
      //  先走FlowSlot,判断当前窗口是否允许通过
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
        node.increaseThreadNum();
      // 允许通过,窗口通过计数增加
        node.addPassRequest(count);

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        context.getCurEntry().setBlockError(e);
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }
        throw e;
    } catch (Throwable e) {
    context.getCurEntry().setError(e);
        throw e;
    }
}


@Override
public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

最后都是给了rollingCounterInSecond和rollingCounterInMinute去执行,
而这两个实现定义如下:

//SampleCountProperty.SAMPLE_COUNT=2,IntervalProperty.INTERVAL=1000;
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

实现为一个ArrayMetric,分别按照一秒来统计,我们看看一秒钟是怎么统计的,这个就是根据QPS来限流的关键:

public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

这里通过data.currentWindow获取到了一个窗口,然后对窗口进行数据的更新,而这里的data实现为OccupiableBucketLeapArray,继承自LeapArray:

 public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

而这里传入的sampleCount=2,intervalInMs=1000

 public LeapArray(int sampleCount, int intervalInMs) {   
    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.intervalInSecond = intervalInMs / 1000.0;
    this.sampleCount = sampleCount;
    this.array = new AtomicReferenceArray<>(sampleCount);
}

这里初始化的时候,几个参数比较重要,windowLengthInMs是每个时间窗口的大小,这里可以看到,默认一个时间窗口的大小是500ms,因此在Sentinel中默认秒级的窗口每次滑动的范围时500ms``即滑动窗口大小是500ms,array数组的大小为2。
我们回到addPass中:

WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

我们看看LeapArray中currentWindow怎么实现:

public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    int idx = calculateTimeIdx(timeMillis);
    long windowStart = calculateWindowStart(timeMillis);
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {.
                return window;
            } else {
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            return old;
        } else if (windowStart > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

这里首选会通过calculateTimeIdx计算当前的这个时间属于哪个array哪个元素里面,也即是属于哪个窗口:

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    // windowLengthInMs默认为500
    long timeId = timeMillis / windowLengthInMs;
    //array默认为长度2
    return (int)(timeId % array.length());
}

然后通过calculateWindowStart计算当前时间对应的时间窗口的起始范围:

protected long calculateWindowStart(long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

说白了就是将某一段500ms的时间按照计算,得到一个相同的起始位置。
接下来,如果当前对应的时间窗口为空,那么直接新建一个窗口并通过cas设置到array中去,如果当前时间的windowStart和窗口的windowStart一样,那么就属于同一个窗口,返回该窗口;如果当前时间windowStart大于窗口的windowStart,那么证明该窗口已经过期了,需要滑动窗口,重置对应的窗口时间。


image.png

然后获取到窗口之后,通过wrap.value().addPass(count);

public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}
public void addPass(int n) {
    add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}
private final LongAdder[] counters;

可以看到,最终是通过LongAdder来计数的。

FlowSlot

FlowSlot则是流控实现的核心。通过StatisticSlot我们记录相关请求的统计信息,接下来在FlowSlot进行流控的判断处理:

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new           Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};
// FlowRuleChecker.java
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
          //是否通过
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

这里每个FlowSlot都会有一个FlowRuleChecker来进行流控的检查。FlowRuleChecker会通过FlowRuleManager获取当前资源的所有流控规则FlowRule,然后

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }

    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }

    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
 private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
  //是否可以通过
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

如果是非Cluster模式(后面我们在说这个),则会通过rule.getRater()去判断是否需要流控,这里的getRater返回的是一个TrafficShapingController,有如下几个实现:

我们看看默认的DefaultController在基于QPS限流下怎么实现流控的:

 public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    //统计1秒钟内所有窗口通过的请求数
    int curCount = avgUsedTokens(node);
    //加上本次请求数,是否可以通过
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

这里首先会判断当前资源的请求总次数:

private int avgUsedTokens(Node node) {
    if (node == null) {
        return DEFAULT_AVG_USED_TOKENS;
    }
    return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

而这里的node.passQps就是获取我们上面分析的窗口里面1秒内的两个窗口的总和然后除以1就得到了每秒的QPS,然后看看这个QPS是否大于我们预设的值,如果大于的话,而这里的prioritized默认为false,即如果当前QPS大于指定的QPS的话,那么返回false,而在FlowSlot中则会跑出一个FlowException,继承自BlockException异常,这样限流的功能就实现了

于2022年5月10日凌晨2点56分

相关文章

网友评论

    本文标题:sentinel滑动窗口限流源码分析

    本文链接:https://www.haomeiwen.com/subject/cogkurtx.html