完整源码流程图
Sentinel限流、熔断降级源码剖析.png架构图
image.png几个重要概念
Resource
Sentinel 通过资源来保护具体的业务代码或其他后方服务。 Sentinel 把复杂的逻辑给屏蔽掉了,用户只需要为受保护的代码或服务定义一个资源,然后定义规则就可以了,剩下的通通交给 Sentinel 来处理了。并且资源和规则是解耦的,规则可以在运行时动态修改。定义完资源后,就可以通过在程序中埋点来保护你自己的服务了,埋点的方式有两种:
- try-catch 方式(通过 SphU.entry(...)),当 catch 到BlockException时执行异常处理(或fallback)
- if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理(或fallback)
以上这两种方式都是通过硬编码的形式定义资源然后进行资源埋点的,对业务代码的侵入太大.
Sentinel 也可以通过注解来定义资源,具体的注解为:SentinelResource 。通过注解除了可以定义资源外,还可以指定 blockHandler 和 fallback 方法。
Sentinel 中具体表示资源的类是:ResourceWrapper ,他是一个抽象的包装类,包装了资源的 Name 和EntryType。他有两个实现类,分别是:StringResourceWrapper 和 MethodResourceWrapper
顾名思义,StringResourceWrapper 是通过对一串字符串进行包装,是一个通用的资源包装类,MethodResourceWrapper 是对方法调用的包装。
image.pngSlot
Sentinel 的工作流程就是围绕着一个个插槽所组成的插槽链来展开的。需要注意的是每个插槽都有自己的职责,他们各司其职完好的配合,通过一定的编排顺序,来达到最终的限流降级的目的。默认的各个插槽之间的顺序是固定的,因为有的插槽需要依赖其他的插槽计算出来的结果才能进行工作。
但是这并不意味着我们只能按照框架的定义来,Sentinel 通过 SlotChainBuilder 作为 SPI 接口,使得 Slot Chain 具备了扩展的能力。我们可以通过实现 SlotsChainBuilder 接口加入自定义的 slot 并自定义编排各个 slot 之间的顺序,从而可以给 Sentinel 添加自定义的功能。
创建过程
com.alibaba.csp.sentinel.CtSph#lookProcessChain
可以看到,会根据当前请求的资源先去一个静态的HashMap中获取,如果获取不到才会创建,创建后会保存到HashMap中。这就意味着,同一个资源会全局共享一个SlotChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
image.png
Context
我们可以看到该类的注释
This class holds metadata of current invocation
就是说在context中维护着当前调用链的元数据,那元数据有哪些呢,从context类的源码中可以看到有:
entranceNode:当前调用链的入口节点
curEntry:当前调用链的当前entry
node:与当前entry所对应的curNode
origin:当前调用链的调用源
它的作用
不同上下文相同名称的资源会被分开统计
我们来看Context是怎么被使用的?
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot#entry
//1.根据context的名称 获取一个DefaultNode节点
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
//2.双重检索 将资源resourceWrapper 包装成一个DefaultNode 节点
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
//3.设置当前节点
context.setCurNode(node);
//4.传递到下一个链条
fireEntry(context, resourceWrapper, node, count, prioritized, args);
我们可以看到一点是统计节点NodeSelectorSlot是以Context的维度去统计的,而不是直接以resourceWrapper的维度去统计的?
那么为什么要这么去设计?
试想一下,如果用resourceName来做map的key,那对于同一个资源resourceA来说,在context1中获取到的defaultNodeA和在context2中获取到的defaultNodeA是同一个,那么怎么在这两个context中对defaultNodeA进行更改呢,修改了一个必定会对另一个产生影响。
所以在NodeSelectorSlot这个类里面,map里面保存的是contextName和DefaultNode的映射关系,目的是为了可以在不同的context对相同的资源进行分开统计。
同一个context中对同一个resource进行多次entry()调用时,会形式一颗调用树,这个树是通过CtEntry之间的parent/child关系维护的。
Entry
Entry 是 Sentinel 中用来表示是否通过限流的一个凭证,就像一个token一样。每次执行 SphU.entry() 或 SphO.entry() 都会返回一个 Entry 给调用者,意思就是告诉调用者,如果正确返回了 Entry 给你,那表示你可以正常访问被 Sentinel 保护的后方服务了,否则 Sentinel 会抛出一个BlockException(如果是 SphO.entry() 会返回false),这就表示调用者想要访问的服务被保护了,也就是说调用者本身被限流了。
entry中保存了本次执行 entry() 方法的一些基本信息,包括:
- createTime:当前Entry的创建时间,主要用来后期计算rt
- node:当前Entry所关联的node,该node主要是记录了当前context下该资源的统计信息
- origin:当前Entry的调用来源,通常是调用方的应用名称,在 ClusterBuilderSlot.entry() 方法中设置的
- resourceWrapper:当前Entry所关联的资源
Node
Node 中保存了资源的实时统计数据,例如:passQps,blockQps,rt等实时数据。正是有了这些统计数据后, Sentinel 才能进行限流、降级等一系列的操作。
node是一个接口,他有一个实现类:StatisticNode,但是StatisticNode本身也有两个子类,一个是DefaultNode,另一个是ClusterNode,DefaultNode又有一个子类叫EntranceNode。
其中entranceNode是每个上下文的入口,该节点是直接挂在root下的,是全局唯一的,每一个context都会对应一个entranceNode。另外defaultNode是记录当前调用的实时数据的,每个defaultNode都关联着一个资源和clusterNode,有着相同资源的defaultNode,他们关联着同一个clusterNode。
image.png image.png
几种Node的作用先大概介绍下:
节点 | 作用 |
---|---|
StatisticNode | 执行具体的资源统计操作 |
DefaultNode | 该节点持有指定上下文中指定资源的统计信息,当在同一个上下文中多次调用entry方法时,该节点可能下会创建有一系列的子节点。另外每个DefaultNode中会关联一个ClusterNode |
ClusterNode | 该节点中保存了资源的总体的运行时统计信息,包括rt,线程数,qps等等,相同的资源会全局共享同一个ClusterNode,不管他属于哪个上下文 |
EntranceNode | 该节点表示一棵调用链树的入口节点,通过他可以获取调用链树中所有的子节点 |
当在一个上下文中多次调用了 SphU#entry() 方法时,就会创建一棵调用链树。具体的代码在entry方法中创建CtEntry对象时
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;
// 获取「上下文」中上一次的入口
parent = context.getCurEntry();
if (parent != null) {
// 然后将当前入口设置为上一次入口的子节点
((CtEntry)parent).child = this;
}
// 设置「上下文」的当前入口为该类本身
context.setCurEntry(this);
}
构造树干
context初始化的时候,context中的curEntry属性是没有值的,如下图所示:
image.png
创建Entry
每创建一个新的Entry对象时,都会重新设置context的curEntry,并将context原来的curEntry设置为该新Entry对象的父节点,如下图所示:
image.png
退出Entry
某个Entry退出时,将会重新设置context的curEntry,当该Entry是最顶层的一个入口时,将会把ThreadLocal中保存的context也清除掉,如下图所示:
image.png
构造叶子节点
上面的过程是构造了一棵调用链的树,但是这棵树只有树干,没有叶子,那叶子节点是在什么时候创建的呢?DefaultNode就是叶子节点,在叶子节点中保存着目标资源在当前状态下的统计信息。通过分析,我们知道了叶子节点是在NodeSelectorSlot的entry方法中创建的。具体的代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable {
// 根据「上下文」的名称获取DefaultNode
// 多线程环境下,每个线程都会创建一个context,
// 只要资源名相同,则context的名称也相同,那么获取到的节点就相同
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 如果当前「上下文」中没有该节点,则创建一个DefaultNode节点
node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
// 省略部分代码
}
// 将当前node作为「上下文」的最后一个节点的子节点添加进去
// 如果context的curEntry.parent.curNode为null,则添加到entranceNode中去
// 否则添加到context的curEntry.parent.curNode中去
((DefaultNode)context.getLastNode()).addChild(node);
}
}
// 将该节点设置为「上下文」中的当前节点
// 实际是将当前节点赋值给context中curEntry的curNode
// 在Context的getLastNode中会用到在此处设置的curNode
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, args);
}
上面的代码可以分解成下面这些步骤:
- 获取当前上下文对应的DefaultNode,如果没有的话会为当前的调用新生成一个DefaultNode节点,它的作用是对资源进行各种统计度量以便进行流控;
- 将新创建的DefaultNode节点,添加到context中,作为「entranceNode」或者「curEntry.parent.curNode」的子节点;
- 将DefaultNode节点,添加到context中,作为「curEntry」的curNode。
我们看第3步,把当前DefaultNode设置为context的curNode,实际上是把当前节点赋值给context中curEntry的curNode,用图形表示就是这样:
image.png
多次创建不同的Entry,并且执行NodeSelectorSlot的entry方法后,就会变成这样一棵调用链树:
image.png
这里图中的node0,node1,node2可能是相同的node,因为在同一个context中从map中获取的node是同一个,这里只是为了表述的更清楚所以用了不同的节点名。
保存子节点
上面已经分析了叶子节点的构造过程,叶子节点是保存在各个Entry的curNode属性中的。
我们知道context中只保存了入口节点和当前Entry,那子节点是什么时候保存的呢,其实子节点就是上面代码中的第2步中保存的。
下面我们来分析上面的第2步的情况:
第一次调用NodeSelectorSlot的entry方法时,map中肯定是没有DefaultNode的,那就会进入第2步中,创建一个node,创建完成后会把该节点加入到context的lastNode的子节点中去。我们先看一下context的getLastNode方法:
public Node getLastNode() {
// 如果curEntry不存在时,返回entranceNode
// 否则返回curEntry的lastNode,
// 需要注意的是curEntry的lastNode是获取的parent的curNode,
// 如果每次进入的资源不同,就会每次都创建一个CtEntry,则parent为null,
// 所以curEntry.getLastNode()也为null
if (curEntry != null && curEntry.getLastNode() != null) {
return curEntry.getLastNode();
} else {
return entranceNode;
}
}
代码中我们可以知道,lastNode的值可能是context中的entranceNode也可能是curEntry.parent.curNode,但是他们都是「DefaultNode」类型的节点,DefaultNode的所有子节点是保存在一个HashSet中的。
第一次调用getLastNode方法时,context中curEntry是null,因为curEntry是在第3步中才赋值的。所以,lastNode最初的值就是context的entranceNode。那么将node添加到entranceNode的子节点中去之后就变成了下面这样:
image.png
紧接着再进入一次,资源名不同,会再次生成一个新的Entry,上面的图形就变成下图这样:
image.png
此时再次调用context的getLastNode方法,因为此时curEntry的parent不再是null了,所以获取到的lastNode是curEntry.parent.curNode,在上图中可以很方便的看出,这个节点就是node0。那么把当前节点node1添加到lastNode的子节点中去,上面的图形就变成下图这样:
image.png
然后将当前node设置给context的curNode,上面的图形就变成下图这样:
image.png
假如再创建一个Entry,然后再进入一次不同的资源名,上面的图就变成下面这样:
image.png
至此NodeSelectorSlot的基本功能已经大致分析清楚了。
PS:以上的分析是基于每次执行SphU.entry(name)时,资源名都是不一样的前提下。如果资源名都一样的话,那么生成的node都相同,则只会再第一次把node加入到entranceNode的子节点中去,其他的时候,只会创建一个新的Entry,然后替换context中的curEntry的值。
NodeSelectorSlot 在执行的过程中完成了 curEntry 中 curNode 的初始化,curEntry 是在创建的时候被绑定到 context 上去的,并且在绑定的时候会添加到上一次的 entry 中去,从而形成一个链式结构。
ClusterBuilderSlot
NodeSelectorSlot的entry方法执行完之后,会调用fireEntry方法,此时会触发ClusterBuilderSlot的entry方法。
ClusterBuilderSlot的entry方法比较简单,具体代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = Env.nodeBuilder.buildClusterNode();
// 将clusterNode保存到全局的map中去
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<ResourceWrapper, ClusterNode>(16);
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 将clusterNode塞到DefaultNode中去
node.setClusterNode(clusterNode);
// 省略部分代码
fireEntry(context, resourceWrapper, node, count, args);
}
NodeSelectorSlot的职责比较简单,主要做了两件事:
一、为每个资源创建一个clusterNode,然后把clusterNode塞到DefaultNode中去
二、将clusterNode保持到全局的map中去,用资源作为map的key
PS:一个资源只有一个ClusterNode,但是可以有多个DefaultNode
StatistcSlot
StatisticSlot负责来统计资源的实时状态,具体的代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 触发下一个Slot的entry方法
fireEntry(context, resourceWrapper, node, count, args);
// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
// 统计信息
node.increaseThreadNum();
node.addPassRequest();
// 省略部分代码
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// 省略部分代码
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// 省略部分代码
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
node.rt(rt);
// 省略部分代码
node.decreaseThreadNum();
// 省略部分代码
}
fireExit(context, resourceWrapper, count);
}
代码分成了两部分,第一部分是entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。第二部分是在exit方法中,当退出该Entry入口时,会统计rt的时间,并减少线程数。
这些统计的实时数据会被后续的校验规则所使用,具体的统计方式是通过 滑动窗口 来实现的。
SystemSlot
SystemSlot就是根据总的请求统计信息,来做流控,主要是防止系统被搞垮,具体的代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, args);
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
// 省略部分代码
// total qps
double currentQps = Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// 完全按照RT,BBR算法来
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (currentThread > 1 &&
currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
}
其中的Constants.ENTRY_NODE是一个全局的ClusterNode,该节点的值是在StatisticsSlot中进行统计的。
AuthoritySlot
AuthoritySlot做的事也比较简单,主要是根据黑白名单进行过滤,只要有一条规则校验不通过,就抛出异常。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
FlowRuleManager.checkFlow(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, args);
}
public static void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
List<FlowRule> rules = flowRules.get(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new FlowException(rule.getLimitApp());
}
}
}
}
DegradeSlot
DegradeSlot主要是根据前面统计好的信息,与设置的降级规则进行匹配校验,如果规则校验不通过则进行降级,具体的代码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
fireEntry(context, resourceWrapper, node, count, args);
}
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
List<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules != null) {
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp());
}
}
}
}
以上几个概念的关系
image.pngMetric
Metric 是 Sentinel 中用来进行实时数据统计的度量接口,node就是通过metric来进行数据统计的。而metric本身也并没有统计的能力,他也是通过Window来进行统计的。
Metric有一个实现类:ArrayMetric,在ArrayMetric中主要是通过一个叫WindowLeapArray的对象进行窗口统计的。
image.png统计流程
image.pngWindow
首先一个时间窗口是用来在某个固定时间长度内保存一些统计值的虚拟概念。有了这个概念后,我们就可以通过时间窗口来计算统计一段时间内的诸如:qps,rt,threadNum等指标了。
ArrayMetric
private final WindowLeapArray data;
public ArrayMetric(int windowLength, int interval) {
this.data = new WindowLeapArray(windowLength, interval);
}
WindowLeapArray
public class WindowLeapArray extends LeapArray<Window> {
public WindowLeapArray(int windowLengthInMs, int intervalInSec) {
super(windowLengthInMs, intervalInSec);
}
}
该对象的构造方法有两个参数:
- windowLengthInMs :一个用毫秒做单位的时间窗口的长度
- intervalInSec ,一个用秒做单位的时间间隔,这个时间间隔具体是做什么的,下面会分析。
然后 WindowLeapArray 继承自 LeapArray ,在初始化 WindowLeapArray 的时候,直接调用了父类的构造方法,再来看一下父类 LeapArray 的代码:
public abstract class LeapArray<T> {
// 时间窗口的长度
protected int windowLength;
// 采样窗口的个数
protected int sampleCount;
// 以毫秒为单位的时间间隔
protected int intervalInMs;
// 采样的时间窗口数组
protected AtomicReferenceArray<WindowWrap<T>> array;
/**
* LeapArray对象
* @param windowLength 时间窗口的长度,单位:毫秒
* @param intervalInSec 统计的间隔,单位:秒
*/
public LeapArray(int windowLength, int intervalInSec) {
this.windowLength = windowLength;
// 时间窗口的采样个数,默认为2个采样窗口
this.sampleCount = intervalInSec * 1000 / windowLength;
this.intervalInMs = intervalInSec * 1000;
this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}
}
可以很清晰的看出来在 LeapArray 中创建了一个 AtomicReferenceArray 数组,用来对时间窗口中的统计值进行采样。通过采样的统计值再计算出平均值,就是我们需要的最终的实时指标的值了。
可以看到我在上面的代码中通过注释,标明了默认采样的时间窗口的个数是2个,这个值是怎么得到的呢?我们回忆一下 LeapArray 对象创建,是通过在 StatisticNode 中,new了一个 ArrayMetric ,然后将参数一路往上传递后创建的:
private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount,IntervalProperty.INTERVAL);
SampleCountProperty.sampleCount 的默认值是2,所以第一个参数 windowLengthInMs 的值是 500ms,那么1秒钟是1000ms,每个时间窗口的长度是500ms,也就是说总共分了两个采样的时间窗口。
现在继续回到 ArrayMetric.addPass() 方法:
@Override
public void addPass() {
WindowWrap<Window> wrap = data.currentWindow();
wrap.value().addPass();
}
获取当前Window
我们已经分析了 wrap.value().addPass() ,现在只需要分析清楚 data.currentWindow() 具体做了什么,拿到了当前时间窗口就可以 了。继续深入代码,最终定位到下面的代码:
@Override
public WindowWrap<Window> currentWindow(long time) {
// time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
long timeId = time / windowLength;
// Calculate current index.
// idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
int idx = (int)(timeId % array.length());
// Cut the time to current window start.
long time = time - time % windowLength;
while (true) {
//从采样数组中根据索引获取缓存的时间窗口
WindowWrap<Window> old = array.get(idx);
if (old == null) {
// array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象
// 如果没有获取到,则创建一个新的
WindowWrap<Window> window = new WindowWrap<Window>(windowLength, time, new Window());
// 通过CAS将新窗口设置到数组中去
if (array.compareAndSet(idx, null, window)) {
// 如果能设置成功,则将该窗口返回
return window;
} else {
// 否则当前线程让出时间片,等待
Thread.yield();
}
// 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口
} else if (time == old.windowStart()) {
return old;
} else if (time > old.windowStart()) {
// 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口
// 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动
if (addLock.tryLock()) {
try {
// if (old is deprecated) then [LOCK] resetTo currentTime.
return resetWindowTo(old, time);
} finally {
addLock.unlock();
}
} else {
Thread.yield();
}
} else if (time < old.windowStart()) {
// Cannot go through here.
return new WindowWrap<Window>(windowLength, time, new Window());
}
}
}
- 1.根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx
- 2.根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位
- 3.根据索引idx,在采样窗口数组中取得一个时间窗口old
- 4.循环判断知道获取到一个当前时间窗口
- 4.1.如果old为空,则创建一个时间窗口,并将它插入到array的第idx个位置,array上面已经分析过了,是一个 AtomicReferenceArray
- 4.2.如果当前窗口的开始时间time与old的开始时间相等,那么说明old就是当前时间窗口,直接返回old
- 4.3.如果当前窗口的开始时间time大于old的开始时间,则说明old窗口已经过时了,将old的开始时间更新为最新值:time,下个循环中会在步骤4.2中返回
- 4.4.如果当前窗口的开始时间time小于old的开始时间,实际上这种情况是不可能存在的,因为time是当前时间,old是过去的一个时间
示例
image.png初始的时候arrays数组中只有一个窗口(可能是第一个,也可能是第二个),每个时间窗口的长度是500ms,这就意味着只要当前时间与时间窗口的差值在500ms之内,时间窗口就不会向前滑动。例如,假如当前时间走到300或者500时,当前时间窗口仍然是相同的那个:
image.png
image.png
时间继续往前走,当超过500ms时,时间窗口就会向前滑动到下一个,这时就会更新当前窗口的开始时间:
image.png
时间继续往前走,只要不超过1000ms,则当前窗口不会发生变化:
image.png
当时间继续往前走,当前时间超过1000ms时,就会再次进入下一个时间窗口,此时arrays数组中的窗口将会有一个失效,会有另一个新的窗口进行替换:
image.png
以此类推随着时间的流逝,时间窗口也在发生变化,在当前时间点中进入的请求,会被统计到当前时间对应的时间窗口中。计算qps时,会用当前采样的时间窗口中对应的指标统计值除以时间间隔,就是具体的qps。具体的代码在StatisticNode中:
@Override
public long totalQps() {
return passQps() + blockedQps();
}
@Override
public long blockedQps() {
return rollingCounterInSecond.block() / IntervalProperty.INTERVAL;
}
@Override
public long passQps() {
return rollingCounterInSecond.pass() / IntervalProperty.INTERVAL;
}
网友评论