1 sentinel节点

所有节点都实现了processorSlot节点中的entry和exit方法,继承自AbstractLinkedProcessorSlot类。

下面对其中的关键节点进行分析。
1.1 nodeSelectorSlot节点
创建上下文名称对应的defaultNode,用于构建调用路径链路,这里的DefaultNode继承自StatisticNode,也能进行统计,统计的是当前线程的运行数据。
不同的入口进行调用,会生成不同的调用路径,下面是代码注释中的例子, entrance1 和 entrance2会分别生成entranceNode,并且会分别生成DefaultNode, 因为两者的上下文名称不同。
这里面有个要注意的点是,processorSlotChain和resource是一一对应的,所以NodeSelectorSlot对象对于相同的resource是一样的,但上下文名称不同的情况下,DefaultNode会有多个。
/* *
* The {@link ClusterNode} is uniquely identified by the ResourceId; the {@link DefaultNode}
* is identified by both the resource id and {@link Context}. In other words, one resource
* id will generate multiple {@link DefaultNode} for each distinct context, but only one
* {@link ClusterNode}.
* </p>
* <p>
* the following code shows one resource id in two different context:
* </p>
*
* <pre>
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
* ContextUtil.enter("entrance2", "appA");
* nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
* </pre>
*
* Above code will generate the following invocation structure in memory:
*
* <pre>
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
* </pre>
**/
生成defaultNode一般会放到entranceNode下面,或者当前entry的父entry的defaultNode下面,代表调用路径,且entry存在嵌套调用的情况下,也会有父子关系,在生成entry时会进行设置,代码如下,新添加时如果curEntry不为空,则设置为对应curEntry的子节点。
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;
setUpEntryFor(context);
}
private void setUpEntryFor(Context context) {
// The entry should not be associated to NullContext.
if (context instanceof NullContext) {
return;
}
this.parent = context.getCurEntry();
if (parent != null) {
((CtEntry) parent).child = this;
}
context.setCurEntry(this);
}
entry的父子关系如下,helloWord资源里内嵌abc资源:

node的路径和entry一致,下面的context里的getLastNode,就是找当前entry的parent中的defaultNode,不过会多一个entranceNode,对应关系如下:

NodeSelectorSlot 源码如下:
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* {@link DefaultNode}s of the same resource in different context.
*/
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
1.2 ClusterBuilderSlot
用来构建统计信息的节点ClusterNode,一样是双重检查锁去生成单例的clusterNode, 并放到clusterNodeMap中,resource为key,供获取使用,同时设置到defaultNode中,用来进行对应资源的数据统计。
一个resource全局对应一个clusterNode统计节点,一个resource对应一组processorSlotChain,对应的ClusterBuilderSlot实例是唯一的,里面持有的clusterNode统计节点也是唯一的。
clusterNode和defaultNode都继承自StatisticNode,都可以进行数据统计。
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
1.3 StatisticSlot
统计节点,先向后面检查节点执行,能接着往下走,说明校验通过,增加该节点线程数,通过请求数,捕获到blockedException时,增加阻塞请求数。
在exit退出entry时,增加请求的rt耗时,异常数信息统计。
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// Calculate response time (use completeStatTime as the time of completion).
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
// Record response time and success count.
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
// fix bug https://github.com/alibaba/Sentinel/issues/2374
fireExit(context, resourceWrapper, count, args);
}
}
1.4 flowSlot
entry方法里checkFlow进行各种限流规则校验,如果不满足的话,会直接抛出FlowException异常。
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
/**
* Package-private for test.
*
* @param checker flow rule checker
* @since 1.6.1
*/
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
1.5 DegradeSlot
熔断降级节点,根据资源名获取对应的circuitBreakers,遍历是否可以通过,不能通过则抛出DegradeException。这里是close直接返回true,否则进行断路器是否可以关闭的检测,到达一定的熔断时间则将状态改为half-open状态。
在exit方法中,执行对应circuitBreaker的onRequestComplete方法,里面会对请求的异常数和请求数进行统计,并根据统计信息和参数对断路器状态进行修改,包括half-open到close或open,以及超出限制时改为open状态。
/**
* A {@link ProcessorSlot} dedicates to circuit breaking.
*
* @author Carpenter Lee
* @author Eric Zhao
*/
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
}
2 总结
sentinel限流熔断的主要逻辑就在调用链路上,processorSlotChain支持进行自定义扩展,通过spi的方式。
NodeSelectorSlot负责invocation Tree的搭建,里面会构建DefaultNode,同一resource且同一context有着相同的DefaultNode,resource调用出现嵌套时会有树状的调用路径,默认会有entranceNode。
ClusterBuilderSlot负责构建ClusterNode,同一个resource全局只有一个ClusterNode,是进行限流熔断的主要依据。
StatisticSlot负责进行具体的统计,会增加DefaultNode、ClusterNode、OriginNode、entryNode里的统计信息。
FlowSlot和DegradeSlot负责进行限流和降级,后面的文章单独讲。
Sentinel源码分析(三) - 调用链路
sentinel笔记 NodeSelectorSlot,ClusterBuilderSlot
网友评论