前言
Sentinel作为ali开源的一款轻量级流控框架,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。相比于Hystrix,Sentinel的设计更加简单,在 Sentinel中资源定义和规则配置是分离的,也就是说用户可以先通过Sentinel API给对应的业务逻辑定义资源(埋点),然后在需要的时候再配置规则,通过这种组合方式,极大的增加了Sentinel流控的灵活性。
引入Sentinel带来的性能损耗非常小。只有在业务单机量级超过25W QPS的时候才会有一些显著的影响(5% - 10% 左右),单机QPS不太大的时候损耗几乎可以忽略不计。
Sentinel提供两种埋点方式:
-
try-catch 方式(通过 SphU.entry(...)),用户在 catch 块中执行异常处理 / fallback
-
if-else 方式(通过 SphO.entry(...)),当返回 false 时执行异常处理 / fallback
写在前面
在此之前,需要先了解一下Sentinel的工作流程。
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如默认情况下会创建以下7个插槽:
-
NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
-
ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。
-
StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。
-
SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。
-
AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。
-
FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
-
DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。
注意:这里的插槽链都是一一对应资源名称的。
每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了Sentinel的责任链。
上面的所介绍的插槽(slot chain)是Sentinel非常重要的概念。同时还有一个非常重要的概念那就是Node。
Node之间的树形结构
在创建context会先创建DefaultNode 实际是它的父类EntranceNode,context可以相同context-name反复申明创建,但是DefaultNode同一context-name只会创建一次,DefaultNode包含了一个链路所有的资源,每一个资源对应一个ClusterNode,ClusterNode再根据来源细分为StatisticNode,它们之间的关系就是一个树形结构 如下:
-
EntranceNode:根据context-name来创建,就算同一个context-name多次创建context,entranceNode也只会创建一次, 用来统计该链路上所有的资源信息。
-
DefaultNode:根据context-name + resource-name创建,用来统计某链路上的资源信息。
-
ClusterNode:根据resource-name来创建,用来统计资源信息。
-
StatisticsNode:根据origin-name+resource-name来创建,针对请求来源统计该来源的资源信息,上面几个node都是它的子类,基于它的数据做汇总。
一定要搞清楚这几个node之间的关系和作用,下面重点来看StatisticsNode,它用来完成信息统计,以供后续的限流规则使用, 它只统计了两个维度数据,qps和线程数。
入门案例
- 引入maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--整合Spring Cloud Alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.6.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
- 下面举一个最简单的案例埋点来引出流控入口
public String getOrderInfo(String orderNo) {
ContextUtil.enter("getOrderInfo", "application-a");
Entry entry = null;
try {
// name:资源名 EntryType 流量类型为入口还是出口,系统规则只针对入口流量, batchCount:当前请求流量, args:参数
entry = SphU.entry("getOrderInfo", EntryType.IN, 1, orderNo);
getUserInfo();
} catch (BlockException e) {
e.printStackTrace();
} finally {
entry.exit();
}
return "orderInfo = " + orderNo;
}
public String getUserInfo() {
Entry entry = null;
try {
entry = SphU.entry("getUserInfo", EntryType.OUT, 1);
// 通过http或feign对用户服务完成该接口调用
} catch (BlockException e) {
e.printStackTrace();
} finally {
entry.exit();
}
return "userInfo";
}
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException
也可以通过注解的方式引入,执行方法时SentinelResourceAspect会做拦截进行流控处理,当然什么都不配也是可以的,因为引入spring-cloud-starter-alibaba-sentinel包spring mvc和spring webflux做了适配,自动会对每一个请求做埋点
@GetMapping("getOrderInfo")
@SentinelResource(value = "/getOrderInfo", entryType = EntryType.IN)
public String getOrderInfo(@RequestParam("orderNo") String orderNo) {
return "orderInfo = " + orderNo;
}
ContextUtil.enter("getOrderInfo", "application-a") 来表示调用链的入口,可以暂时理解为上下文,一般不做声明后面会默认创建。
第一个参数为context-name,区分不同的调用链入口,默认常量值sentinel_default_context。
第二参数为调用来源,这个参数可以细分从不同应用来源发出的请求,授权规则白名单和黑名单会根据该参数做限制,然后通过SphU.entry()埋点进入,下面说下这个方法几个参数的含义:
- name:当前资源名。
- trafficType:流量类型 分别为入口流量和出口流量。入口流量和出口流量执行过程都是差不多,只是入口流量会多了一个系统规则拦截,像是上面案例从订单服务调用getUserInfo,getUserInfo是去调用用户服务,它的流量方式是出去的,压力都在用户服务那边,不用考虑当前服务器的压力,所以标为出口流量。
- batchCount:当前流量数量,一般默认为1。
- args:参数,后面做热点参数规则时用到。
BlockException:当某一规则不通过时会抛出对应异常。
SphU.entry(xxx) 需要与 entry.exit() 方法成对出现,匹配调用,如有嵌套像上面,需先退出getUserInfo的entry在退出getOrderInfo的entry
打开打控制台,此时应该是空白的,sentinel控制台是懒加载模式,需要调用一下相关资源接口就可以看到
可以看到sentinel规则配置主要有流控规则,降级规则,热点规则,系统规则,授权规则,先简单介绍下规则作用,其它配置也很简单 一目了然,后面通过结合源码来深入分析:
- 流控规则:针对资源流量控制。
- 热点规则:针对资源的热点参数做流量控制。
- 降级规则:针对资源的调度情况来做降级处理。
- 系统规则:针对当前服务做全局流量控制。
- 授权规则:对访问资源的特定应用做授权处理。
源码分析
从上面的Sentinel使用的示例代码,我们就从这里切入开始分析
ContextUtil.enter()
public class ContextUtil {
/**
* Store the context in ThreadLocal for easy access.
*/
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
/**
* Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name.
*/
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
public static Context enter(String name, String origin) {
// 判断上下文名称是否为默认的名称(sentinel_default_context) 是的话直接抛出异常
if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
throw new ContextNameDefineException(
"The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
}
return trueEnter(name, origin);
}
protected static Context trueEnter(String name, String origin) {
// 先从ThreadLocal中尝试获取,获取到则直接返回
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 尝试从缓存中获取该上下文名称对应的 入口节点
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 判断缓存中入口节点数量是否大于2000
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
try {
// 加锁
LOCK.lock();
node = contextNameNodeMap.get(name);
// 双重检查锁
if (node == null) {
// 判断缓存中入口节点数量是否大于2000
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 根据上下文名称生成入口节点(entranceNode)
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
// 加入至全局根节点下
Constants.ROOT.addChild(node);
// 加入缓存中
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 初始化上下文对象
context = new Context(node, name);
context.setOrigin(origin);
// 设置到当前线程中
contextHolder.set(context);
}
return context;
}
}
主要做了2件事情:
-
1、根据ContextName生成entranceNode,并加入缓存,每个ContextName对应一个入口节点entranceNode。
-
2、根据ContextName和entranceNode初始化上下文对象,并将上下文对象设置到当前线程中。
这里有几点需要注意:
- 1、入口节点数量不能大于2000,大于会直接抛异常。
- 2、每个ContextName对应一个入口节点entranceNode。
- 3、每个entranceNode都有共同的父节点。也就是根节点。
SphU.entry(xxx)执行过程分析
public class SphU {
private static final Object[] OBJECTS0 = new Object[0];
public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args)
throws BlockException {
// 默认为 出口流量类型,单位统计数为1
return Env.sph.entry(name, trafficType, batchCount, args);
}
}
public class CtSph implements Sph {
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
// 生成资源对象
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
}
上面的代码比较简单,不指定EntryType的话,则默认为出口流量类型,最终会调用entryWithPriority方法,主要业务逻辑也都在这个方法中
entryWithPriority方法
public class CtSph implements Sph {
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取当前线程上下文对象
Context context = ContextUtil.getContext();
// 上下文名称对应的入口节点是否已经超过阈值2000,超过则会返回空 CtEntry
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// 如果没有指定上下文名称,则使用默认名称,也就是默认入口节点
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局开关
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 生成插槽链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* 表示资源(插槽链)超过6000,因此不会进行规则检查。
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 生成 Entry 对象
// 创建一个流量入口,将context curEntry进行指定
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 开始执行插槽链 调用逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 发生流控异常进行退出 清除上下文
e.exit(count, args);
// 将异常向上抛
throw e1;
} catch (Throwable e1) {
// 除非Sentinel内部存在错误,否则不应发生这种情况。
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
这个方法可以说是涵盖了整个Sentinel的核心逻辑
- 1、获取上下文对象,如果上下文对象还未初始化,则使用默认名称初始化。初始化逻辑在上文已经分析过。
- 2、判断全局开关。
- 3、根据给定的资源生成插槽链,插槽链是跟资源相关的,Sentinel最关键的逻辑也都在各个插槽中。初始化的逻辑在lookProcessChain(resourceWrapper);中,下面会分析。
- 4、依顺序执行每个插槽逻辑。
lookProcessChain(resourceWrapper)方法
lookProcessChain方法为指定资源生成插槽链,下面我们来看下它的初始化逻辑
public class CtSph implements Sph {
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 根据资源尝试从全局缓存中获取
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 非常常见的双重检查锁
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 判断资源数是否大于6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 初始化插槽链
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
}
-
1、根据资源尝试从全局缓存中获取插槽链。每个资源对应一个插槽链(资源最多只能定义6000个)
-
2、初始化插槽链上的插槽(SlotChainProvider.newSlotChain()方法中)
下面我们看下初始化插槽链上的插槽的逻辑
SlotChainProvider.newSlotChain()
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
public static ProcessorSlotChain newSlotChain() {
// 判断是否已经初始化过
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
// SPI获取构造器
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
// 加载失败则使用默认 插槽链
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
// 构建完成
return slotChainBuilder.build();
}
}
public final class SpiLoader<S> {
private static final String SPI_FILE_PREFIX = "META-INF/services/";
//初始化spiLoader类加载器
public static <T> SpiLoader<T> of(Class<T> service) {
AssertUtil.notNull(service, "SPI class cannot be null");
AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()),
"SPI class[" + service.getName() + "] must be interface or abstract class");
String className = service.getName();
SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);
if (spiLoader == null) {
synchronized (SpiLoader.class) {
spiLoader = SPI_LOADER_MAP.get(className);
if (spiLoader == null) {
SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));
spiLoader = SPI_LOADER_MAP.get(className);
}
}
}
return spiLoader;
}
public S loadFirstInstanceOrDefault() {
//通过spiLoader去 META-INF/services/目录下去加载文件
load();
for (Class<? extends S> clazz : classList) {
if (defaultClass == null || clazz != defaultClass) {
return createInstance(clazz);
}
}
return loadDefaultInstance();
}
}
SPI获取构造器——spiLoader类加载器,通过spiLoader去 META-INF/services/目录下去加载文件DefaultSlotChainBuilder。
下面是DefaultSlotChainBuilder的build方法:
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
// 先创建一个default作为header
// chain是一个单向链表 first -> next -> end
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// SPI机制——即SpiLoader在META-INF/services/目录下去加载文件 找到所有slot的实现,并根据@SpiOrder注解排序
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
// 剔除掉Abstract类
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
//addLast方法往链表的最后追加Slot对象。
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
首先初始化一个DefaultProcessorSlotChain,DefaultProcessorSlotChain里面维护了一个Slot的单向链表。
然后利用Java SPI机制——即SpiLoader在META-INF/services/目录下去加载文件,找到所有slot的实现,并根据@SpiOrder注解排序, 最后调用addLast方法往链表的最后追加Slot对象。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
至此SlotChain构造完成。
执行SlotChain链
SlotChain链执行的入口是 chain.entry(context, resourceWrapper, null, count, prioritized, args);这行
com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
}
就是执行first的transformEntry
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
}
first的entry方法
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
}
父类的fireEntry
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
}
可以看到,实际上就是执行了next的transformEntry,而next的transformEntry又会调用它的entry方法,entry方法由每个Slot自己实现,只要在里面调用fireEntry,即触发next的next的entry调用。
这样继续下去,直到next为空
简单画个图描述一下这个过程:
exit()方法和entry方法类似,这里就不分析了。
至此SlotChain调用链分析完了,总结一下
- 1、Chain中维护了一个单向链表
- 2、通过fileEntry触发next的entry调用。
- 3、责任链模式
Sentinel中预设的SlotChain执行的完整流程:
参考:
https://www.cnblogs.com/taromilk/p/11750962.html
https://www.cnblogs.com/zzz-blogs/p/14342608.html
网友评论