主要流程
- springboot集成包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
该包的spring.factories里导入了这些组件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration
主要是通过SentinelWebAutoConfiguration这个类添加一个MVC拦截器
@Override
public void addInterceptors(InterceptorRegistry registry) {
if (!sentinelWebInterceptorOptional.isPresent()) {
return;
}
SentinelProperties.Filter filterConfig = properties.getFilter();
registry.addInterceptor(sentinelWebInterceptorOptional.get())
.order(filterConfig.getOrder())
//拦截全路径/*
.addPathPatterns(filterConfig.getUrlPatterns());
}
- 入口:在web请求过来的时候拦截,调用AbstractSentinelInterceptor接口
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
//获取请求的路径 比如访问的说localhost:8085/area/list,resourceName 为 area/list
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
String origin = parseOrigin(request);
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
//主要是进入到这个方法
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
//处理流控的exception返回给前端,可以自己定制返回内容
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
- 主要拦截方法:entry方法一直点进来会进到com.alibaba.csp.sentinel.CtSph#entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
...省略一堆校验
//获取该资源的拦截链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
//将资源,拦截链环境包装为一个entry
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//真正执行拦截链的方法(下面详解)
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//获取该资源上的拦截链
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
//当第一次进来的时候回拦截链chain为空
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//初始化拦截链,(下面单独开出来讲)
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
- chain = SlotChainProvider.newSlotChain();创建拦截链
这边主要是创建了8个拦截链,分别对应我们sentinel控制台配的8个,主要我们常用的说流控规则跟降级规则
表头 | 表头 |
---|---|
链路节点生成 | NodeSelectorSlot |
集群流控 | ClusterBuilderSlot |
规则限制日志相关 | LogSlot |
统计相关(重要) | StatisticSlot |
热点规则 | ParamFlowSlot |
授权规则 | AuthoritySlot |
系统规则 | SystemSlot |
流控规则(重要) | FlowSlot |
降级规则(重要) | DegradeSlot |
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
...省略spi等代码
}
public class HotParamSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new ParamFlowSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
构建后形成这个链表
image.png
- 真正执行拦截链的方法 chain.entry(...);
调用chain里面的first.transformEntry方法,也就是他的处理方法,因为第一个是默认的DefaultProcessorSlotChain,他没有做任何处理,直接调用 next.transformEntry给下面的责任链处理
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
几个责任链的主要代码
- NodeSelectorSlot(负责请求链路资源的归纳)
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据凋用路径来限流降级
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
//context.getName()的结果是sentinel_spring_web_context形成下面那个节点
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
//调用下个Slot处理
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- ClusterBuilderSlot(集群相关)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- LogSlot(日志相关)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
//当发生限制时记录信息。
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
-
StatisticSlot(重要,统计类)
用于存储资源的统计信息以及调用者信息,例如该资源的 RT QPS, thread count等等这些信息将用作为多维度限流,降级的依据
统计用的是滑动时间算法,篇幅有点长,放到滑动时间算法与sentinel实践
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
//执行后面的检查
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果其他的校验都成功没有问题
//增加单位时间成功线程数量
node.increaseThreadNum();
//增加单位时间成功请求,(滑动时间窗口计数实现)
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
- ParamFlowSlot(热点流控)
//对有添加热点流控规则的资源进行限制
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
- SystemSlot
当配置了系统规则后,会根据配置的系统规则进行校验
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
...
// qps限制数
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// thread限制
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
//rt限制
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// CPU使用率
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
- AuthoritySlot(根据配置的授权规则来限制)
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
-
FlowSlot(重要,限流相关,sentinel拉取配置应用nacos的动态加载配置com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener主要调用该方法)
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//获取当前资源下的所有的流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
//调用
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//这边的getRater()方法会根据你配置时的 1、快速失败 2、Warm Up 3、 排队等待 三个类型分别调用不同的方法
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
- 快速失败
主要是调用com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass()
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前平均访问次数,avgUsedTokens是获取node.passQps(),调用StatisticNode#passQps
int curCount = avgUsedTokens(node);
//如果超过限制则直接返回false
if (curCount + acquireCount > count) {
//DefaultController这边的 prioritized 为false里面的方法不提示
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
...
}
}
return false;
}
return true;
}
- Warm Up 调用了WarmUpController#canPass(...) 这边主要使用的是令牌桶算法
//构建函数
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
//冷因子不能小于1,默认为3
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
//设置阈值
this.count = count;
//设置冷因子
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
//预警token=(预热时长*阈值)/(冷因子-1)
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
//最大token=(预热预警token*阈值)/(冷因子-1)
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
- 排队等待
主要是用到RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//当请求通过量小于等于0时,直接返回通过
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
// 这个count是你控制台设置的阈值
if (count <= 0) {
return false;
}
//获取当前时间
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 计算两个请求之间需要花费的时间
// 假设acquireCount=1,count =10, 则((1*1)/10*1000)取整=1000
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
// 预期通过这个请求的时间
long expectedTime = costTime + latestPassedTime.get();
// 判断预期通过时间是否小于当前时间
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 计算预计需要等待的时间(当前预期需要花费的时间+最后一次成功的时间-当前时间)
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 如果预计需要等待的时间大于后台配置的等待时间,则直接拒绝
if (waitTime > maxQueueingTimeMs(后台配置为毫秒)) {
return false;
} else {
// 设置latestPassedTime,用atomic变量防止并发
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 再做一次超时判断
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 休眠等待时间
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
-
DegradeSlot(重要,降级相关)
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//熔断策略为RT(响应时间)慢调用比例 这边三个规则对应后台的配置
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//平均响应时间
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
// Sentinel will degrade the service only if count exceeds.
// 只有在通过数超过设置的最小值的时候才会降级
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
// 熔断策略为异常比例时
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
// 单位时间内的异常数
double exception = clusterNode.exceptionQps();
// 单位时间内的成功数
double success = clusterNode.successQps();
// 单位时间内的总共请求数
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 当总共请求数达不到最小请求数(后台配置)时直接放行
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
// success数包含了有异常跟无异常的,所以要求realSuc要减去异常数
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
// 熔断策略为异常数
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
//一分钟内的异常数 totalException() { return rollingCounterInMinute.exception(); }
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
网友评论