阅读提示:如果对sentinel比较熟悉,可以跳过概述,以及源码分析的前半部分,直接从StatisticSlot部分开始阅读,滑动窗口的实现逻辑都是从这里开始的。
1.概述
sentinel的限流是通过滑动窗口来实现的
在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责,例如:
NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
总体的框架如下:
上面内容来自Sentinel官网给出的内容
从图里可以看出,sentinel对请求的限流是通过责任链模式实现的,经过一系列的slot流程来判断,本次请求是否通过
2.Sentinel源码分析
一般Sentinel限流都是通过:
Entry entry = SphU.entry('entryName');
这个Entry相当于是获取到了一个令牌,如果能够获取到这个令牌,表示可以通过,能够访问资源。
在Sentinel中有几个比较重要的概念:
Entry 代表的是一个令牌,如果能够通过,则获取到entry不为空
Context 代表的则是一次请求的上下文
Node 代表的则是一次请求、一个资源、一个节点集群的请求调用信息记录
当执行SphU.entry的时候,会访问:
// SphU.java
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
// CtSph.java
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
//入口逻辑:初始化上下文,获取链路,进入链路
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
//初始化上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//获取链路
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//执行责任链
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
这里首先会获取到当前线程上线文的执行环境Context
然后获取到ProcessorSlot执行链,执行ProcessorSlot.entry
如果上一步执行成功,表示能够访问,返回CtEntry,否则抛出异常
另外需要注意的是,对于资源,在Sentinel抽象成了ResourceWrapper,并重写了equals和hashCode方法:
@Override
public int hashCode() {
return getName().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceWrapper) {
ResourceWrapper rw = (ResourceWrapper)obj;
return rw.getName().equals(getName());
}
return false;
}
只要资源的名称一样,这就是同一个资源
跳过Context,直接看链路执行
先获取执行链路ProcessorSlot:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
这里也是通过Spi机制获取,在META-INF.services下面,有这个几个文件SPI会用到,这里首先会获取一个SlotChainBuilder,默认获取到的就是DefaultSlotChainBuilder,
在DefaultSlotChainBuilder会加载com.alibaba.csp.sentinel.slotchain.ProcessorSlot里面的类,Sentinel中默认提供了如下实现:
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
这里加载完之后,会根据ProcessorSlot的注解的order属性进行从大到小的排序,默认几个实现的排序大小大家可对下:
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEGRADE_SLOT = -1000;
然后这里有一点需要注意,Sentinel中,每个资源会对应一组ProcessorSlot,在这些ProcessorSlot有很多类实例变量,只会记录该资源的信息,,而有些则是全局的,属于整个节点的
NodeSelectorSlot
接下来开始执行chain.entry(context, resourceWrapper, null, count, prioritized, args);,
这里的chain是一个DefaultProcessorSlotChain,这个里面只有了上面加载的ProcessorSlot的链表,最终会从第一个ProcessorSlot往后执行,首选in执行的是NodeSelectorSlot:
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
同样这里的clusterNode也是该资源全局一个。接着完后执行,传递的仍是NodeSelectorSlot中的DefaultNode
StatisticSlot
StatisticSlot的作用是记录每个资源的请求情况,滑动窗口就在这个slot里实现
。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 先走FlowSlot,判断当前窗口是否允许通过
fireEntry(context, resourceWrapper, node, count, prioritized, args);
node.increaseThreadNum();
// 允许通过,窗口通过计数增加
node.addPassRequest(count);
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
最后都是给了rollingCounterInSecond和rollingCounterInMinute去执行,
而这两个实现定义如下:
//SampleCountProperty.SAMPLE_COUNT=2,IntervalProperty.INTERVAL=1000;
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
实现为一个ArrayMetric,分别按照一秒来统计,我们看看一秒钟是怎么统计的,这个就是根据QPS来限流的关键:
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这里通过data.currentWindow获取到了一个窗口,然后对窗口进行数据的更新,而这里的data实现为OccupiableBucketLeapArray,继承自LeapArray:
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
而这里传入的sampleCount=2,intervalInMs=1000
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
这里初始化的时候,几个参数比较重要,windowLengthInMs是每个时间窗口的大小,这里可以看到,默认一个时间窗口的大小是500ms,因此在Sentinel中默认秒级的窗口每次滑动的范围时500ms``即滑动窗口大小是500ms,array数组的大小为2。
我们回到addPass中:
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
我们看看LeapArray中currentWindow怎么实现:
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {.
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
这里首选会通过calculateTimeIdx计算当前的这个时间属于哪个array哪个元素里面,也即是属于哪个窗口:
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// windowLengthInMs默认为500
long timeId = timeMillis / windowLengthInMs;
//array默认为长度2
return (int)(timeId % array.length());
}
然后通过calculateWindowStart计算当前时间对应的时间窗口的起始范围:
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
说白了就是将某一段500ms的时间按照计算,得到一个相同的起始位置。
接下来,如果当前对应的时间窗口为空,那么直接新建一个窗口并通过cas设置到array中去,如果当前时间的windowStart和窗口的windowStart一样,那么就属于同一个窗口,返回该窗口;如果当前时间windowStart大于窗口的windowStart,那么证明该窗口已经过期了,需要滑动窗口,重置对应的窗口时间。
image.png
然后获取到窗口之后,通过wrap.value().addPass(count);
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
private final LongAdder[] counters;
可以看到,最终是通过LongAdder来计数的。
FlowSlot
FlowSlot则是流控实现的核心。通过StatisticSlot我们记录相关请求的统计信息,接下来在FlowSlot进行流控的判断处理:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
// FlowRuleChecker.java
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//是否通过
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
这里每个FlowSlot都会有一个FlowRuleChecker来进行流控的检查。FlowRuleChecker会通过FlowRuleManager获取当前资源的所有流控规则FlowRule,然后
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//是否可以通过
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
如果是非Cluster模式(后面我们在说这个),则会通过rule.getRater()去判断是否需要流控,这里的getRater返回的是一个TrafficShapingController,有如下几个实现:
我们看看默认的DefaultController在基于QPS限流下怎么实现流控的:
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//统计1秒钟内所有窗口通过的请求数
int curCount = avgUsedTokens(node);
//加上本次请求数,是否可以通过
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
这里首先会判断当前资源的请求总次数:
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
而这里的node.passQps就是获取我们上面分析的窗口里面1秒内的两个窗口的总和然后除以1就得到了每秒的QPS,然后看看这个QPS是否大于我们预设的值,如果大于的话,而这里的prioritized默认为false,即如果当前QPS大于指定的QPS的话,那么返回false,而在FlowSlot中则会跑出一个FlowException,继承自BlockException异常,这样限流的功能就实现了
于2022年5月10日凌晨2点56分
网友评论