系列
- Sentinel流程介绍
- Sentinel资源节点树构成
- Sentinel滑动窗口介绍
- Sentinel流量控制
- Sentinel的职责链slot介绍
- Sentinel熔断降级
- Sentinel Dashboard和应用通信
- Sentinel 控制台
开篇
Sentinel是2018年由阿里贡献的开源项目,是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。
整个Sentinel系列的介绍会按照一个系列进行介绍,中间穿插一些滑动窗口等实现的介绍。整体的大纲如下:
- Sentinel整体流程
- Sentinel单元介绍
- Sentinel规则
- Sentinel的熔断处理
- Sentinel的数据收集
- Sentinel的dashboard
本篇主要是介绍Sentinel整体流程,聚焦于讲解整个执行过程以及核心对象的存储方式。
基础概念
-
ProcessorSlotChain:Sentinel 的核心骨架,将不同的Slot按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。
-
Context:Context 代表调用链路上下文,贯穿一次调用链路中的所有 Entry。Context 维持着入口节点(entranceNode)、本次调用链路的 curNode、调用来源(origin)等信息。Context 名称即为调用链路入口名称。
-
Entry:每一次资源调用都会创建一个 Entry。Entry 包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。
CtEntry 为普通的 Entry,在调用 SphU.entry(xxx) 的时候创建。特性:Linked entry within current context(内部维护着 parent 和 child) -
Node:Sentinel 里面的各种种类的统计节点:
StatisticNode:最为基础的统计节点,包含秒级和分钟级两个滑动窗口结构。
DefaultNode:链路节点,用于统计调用链路上某个资源的数据,维持树状结构。
ClusterNode:簇点,用于统计每个资源全局的数据(不区分调用链路),以及存放该资源的按来源区分的调用数据(类型为 StatisticNode)。特别地,Constants.ENTRY_NODE 节点用于统计全局的入口资源数据。
EntranceNode:入口节点,特殊的链路节点,对应某个 Context 入口的所有调用数据。Constants.ROOT 节点也是入口节点。
职责链
- 从设计模式上来看,典型的的责任链模式。外部请求进来后,要经过责任链上各个节点的处理,而Sentinel的限流、熔断就是通过责任链上的这些节点实现的。
- NodeSelectorSlot负责构建树形的调用链条。
- ClusterBuilderSlot负责聚簇的统计。
- StatisticSlot负责统计。
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
- 目前Sentinel内置的slot对象。
源码分析
基础用法
public static void main(String[] args) {
initFlowRules();
while (true) {
Entry entry = null;
try {
entry = SphU.entry("HelloWorld");
/*您的业务逻辑 - 开始*/
System.out.println("hello world");
/*您的业务逻辑 - 结束*/
} catch (BlockException e1) {
/*流控逻辑处理 - 开始*/
System.out.println("block!");
/*流控逻辑处理 - 结束*/
} finally {
if (entry != null) {
entry.exit();
}
}
}
}
- Sentinel的一般的应用方式如上所示,通过SphU.entry()来判断是否拦截。
整体流程
public class SphU {
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
// 通过执行Env.sph#entryWithType方法来判断是否通过
return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
}
}
public class Env {
// 通过Sph继续执行
public static final Sph sph = new CtSph();
static {
InitExecutor.doInit();
}
}
public class CtSph implements Sph {
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
// 根据资源名创建资源的对象resource
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
return entryWithPriority(resource, count, prioritized, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 1、Context是线程维度的,通过ThreadLocal进行保存
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// 2、创建名为sentinel_default_context的Context对象
// 内部创建EntranceNode并保存到contextNameNodeMap
// contextNameNodeMap是静态全局变量
// EntranceNode添加到全局的ROOT节点
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 3、构建职责链,职责链的前两个核心节点为DefaultProcessorSlotChain和 NodeSelectorSlot
// DefaultProcessorSlotChain的作用为职责链的头部维护职责链的关系
// NodeSelectorSlot在初始化过程中创建DefaultNode用以统计信息
// NodeSelectorSlot的DefaultNode添加到context的EntranceNode节点后
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 4、创建CtEntry对象
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 5、执行职责链的整个流程操作
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
}
return e;
}
private final static class InternalContextUtil extends ContextUtil {
static Context internalEnter(String name) {
return trueEnter(name, "");
}
static Context internalEnter(String name, String origin) {
return trueEnter(name, origin);
}
}
}
- 1、从线程的ThreadLocal获取Context对象,为空通过InternalContextUtil.internalEnter创建Context对象。
- 2、通过lookProcessChain构建Sentinel的职责链。
- 3、使用Context和lookProcessChain创建CtEntry对象。
- 4、通过chain.entry触发整体的职责链执行。
构建Context
public class ContextUtil {
// 通过ThreadLocal保存Context对象
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
// 通过Context的名称作为key保存EntranceNode
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Context NULL_CONTEXT = new NullContext();
static {
initDefaultContext();
}
private static void initDefaultContext() {
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
// 创建EntranceNode
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
// 加入到全局的ROOT节点
Constants.ROOT.addChild(node);
// 以Context名称作为key保存EntranceNode
contextNameNodeMap.put(defaultContextName, node);
}
public static Context getContext() {
return contextHolder.get();
}
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 保存EntranceNode到ROOT节点当中
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
}
- contextHolder通过ThreadLocal保存Context对象。
- contextNameNodeMap通过Context的名称作为key保存EntranceNode。
- EntranceNode以资源对象StringResourceWrapper作为参数进行构建。
- EntranceNode添加到全局的Constants.ROOT的节点下。
构建职责链
public class CtSph implements Sph {
// 以全局静态变量chainMap保存资源对象作为key的职责链
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap
= new HashMap<ResourceWrapper, ProcessorSlotChain>();
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 构建职责链
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
// 以资源对象作为key保存职责链对象chain
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
}
- chainMap作为全局静态变量以资源对象作为key保存职责链。
- CtSph的chainMap以资源对象作为key保存职责链。
# com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通过SPI机制加载
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
slotChainBuilder = new DefaultSlotChainBuilder();
}
// 创建职责链对象
return slotChainBuilder.build();
}
}
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
- 通过SPI机制加载SlotChainBuilder对象DefaultSlotChainBuilder。
- 通过DefaultSlotChainBuilder的build来创建职责链对象。
- 职责链的头部对象为DefaultProcessorSlotChain,然后通过SPI机制加载所有的ProcessorSlot并通过尾加法构建职责链。
- 目前内置的ProcessorSlot对象如上图所示,职责链的前三个元素为DefaultProcessorSlotChain、NodeSelectorSlot、ClusterBuilderSlot。
执行职责链
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
}
- DefaultProcessorSlotChain作为职责链的第一个元素,通过调用slot的entry+transformEntry+ fireEntry的循环触发完成职责链的调用。
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
// 以上下文的名字作为key保存DefaultNode
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
- NodeSelectorSlot以资源对象作为参数创建DefaultNode对象。
- NodeSelectorSlot的map以Context的名称作为key保存DefaultNode。
- 通过context.getLastNode()).addChild(node)添加到EntranceNode尾部。
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 以资源对象作为key保存ClusterNode的全局静态变量
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// 以资源对象作为key保存集群节点
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
- 根据资源对象创建ClusterNode,并以资源对象作为key保存ClusterNode。
- ClusterBuilderSlot的clusterNodeMap以资源对象作为key保存ClusterNode。
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
* Above code will generate the following invocation structure in memory:
*
*
* machine-root
* /
* /
* EntranceNode1
* /
* /
* DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
*
*-------------------------------------------------------------------------------
*
*
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
* ContextUtil.enter("entrance2", "appA");
* nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
*
* Above code will generate the following invocation structure in memory:
*
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
*
- 在构建Context对象的过程中建立root和EntranceNode的关系。
- 在构建NodeSelectorSlot对象的过程中建立EntranceNode和DefaultNode的关系。
- 在构建ClusterBuilderSlot对象的过程中建立DefaultNode和ClusterNode的关系。
- EntranceNode以Context的名称作为key进行保存。
- DefaultNode以Context的名称作为key进行保存。
- ClusterNode以资源对象的名称作为key进行保存。
- CtSph的chainMap以资源对象作为key保存职责链。
总结
- 三大组件Context、Entry、Node,是Sentinel的核心组件,各类信息及资源调用情况都由这三大类持有;
- 采用责任链模式完成Sentinel的信息统计、熔断、限流等操作;
- 责任链中NodeSelectSlot负责选择当前资源对应的Node,同时构建node调用树;
- 责任链中ClusterBuilderSlot负责构建当前Node对应的ClusterNode,用于聚合同一资源对应不同Context的Node;
- 责任链中的StatisticSlot用于统计当前资源的调用情况,更新Node与其对用的ClusterNode的各种统计数据;
- 责任链中的FlowSlot根据当前Node对应的ClusterNode(默认)的统计信息进行限流;
- 资源调用统计数据(例如PassQps)使用滑动时间窗口进行统计;
- 所有工作执行完毕后,执行退出流程,补充一些统计数据,清理Context。
网友评论