美文网首页
sentinel 调用链路解析

sentinel 调用链路解析

作者: justonemoretry | 来源:发表于2023-01-14 14:07 被阅读0次

1 sentinel节点

调用链路图

所有节点都实现了processorSlot节点中的entry和exit方法,继承自AbstractLinkedProcessorSlot类。


image.png

下面对其中的关键节点进行分析。

1.1 nodeSelectorSlot节点

创建上下文名称对应的defaultNode,用于构建调用路径链路,这里的DefaultNode继承自StatisticNode,也能进行统计,统计的是当前线程的运行数据。
不同的入口进行调用,会生成不同的调用路径,下面是代码注释中的例子, entrance1 和 entrance2会分别生成entranceNode,并且会分别生成DefaultNode, 因为两者的上下文名称不同。
这里面有个要注意的点是,processorSlotChain和resource是一一对应的,所以NodeSelectorSlot对象对于相同的resource是一样的,但上下文名称不同的情况下,DefaultNode会有多个。

/* * 
 * The {@link ClusterNode} is uniquely identified by the ResourceId; the {@link DefaultNode}
 * is identified by both the resource id and {@link Context}. In other words, one resource
 * id will generate multiple {@link DefaultNode} for each distinct context, but only one
 * {@link ClusterNode}.
 * </p>
 * <p>
 * the following code shows one resource id in two different context:
 * </p>
 *
 * <pre>
 *    ContextUtil.enter("entrance1", "appA");
 *    Entry nodeA = SphU.entry("nodeA");
 *    if (nodeA != null) {
 *        nodeA.exit();
 *    }
 *    ContextUtil.exit();
 *
 *    ContextUtil.enter("entrance2", "appA");
 *    nodeA = SphU.entry("nodeA");
 *    if (nodeA != null) {
 *        nodeA.exit();
 *    }
 *    ContextUtil.exit();
 * </pre>
 *
 * Above code will generate the following invocation structure in memory:
 *
 * <pre>
 *
 *                  machine-root
 *                  /         \
 *                 /           \
 *         EntranceNode1   EntranceNode2
 *               /               \
 *              /                 \
 *      DefaultNode(nodeA)   DefaultNode(nodeA)
 *             |                    |
 *             +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
 * </pre>
 **/

生成defaultNode一般会放到entranceNode下面,或者当前entry的父entry的defaultNode下面,代表调用路径,且entry存在嵌套调用的情况下,也会有父子关系,在生成entry时会进行设置,代码如下,新添加时如果curEntry不为空,则设置为对应curEntry的子节点。

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
        super(resourceWrapper);
        this.chain = chain;
        this.context = context;

        setUpEntryFor(context);
    }

    private void setUpEntryFor(Context context) {
        // The entry should not be associated to NullContext.
        if (context instanceof NullContext) {
            return;
        }
        this.parent = context.getCurEntry();
        if (parent != null) {
            ((CtEntry) parent).child = this;
        }
        context.setCurEntry(this);
    }

entry的父子关系如下,helloWord资源里内嵌abc资源:


image.png

node的路径和entry一致,下面的context里的getLastNode,就是找当前entry的parent中的defaultNode,不过会多一个entranceNode,对应关系如下:


image.png
NodeSelectorSlot 源码如下:
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    /**
     * {@link DefaultNode}s of the same resource in different context.
     */
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

1.2 ClusterBuilderSlot

用来构建统计信息的节点ClusterNode,一样是双重检查锁去生成单例的clusterNode, 并放到clusterNodeMap中,resource为key,供获取使用,同时设置到defaultNode中,用来进行对应资源的数据统计。
一个resource全局对应一个clusterNode统计节点,一个resource对应一组processorSlotChain,对应的ClusterBuilderSlot实例是唯一的,里面持有的clusterNode统计节点也是唯一的。
clusterNode和defaultNode都继承自StatisticNode,都可以进行数据统计。

@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);

        /*
         * if context origin is set, we should get or create a new {@link Node} of
         * the specific origin.
         */
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

1.3 StatisticSlot

统计节点,先向后面检查节点执行,能接着往下走,说明校验通过,增加该节点线程数,通过请求数,捕获到blockedException时,增加阻塞请求数。
在exit退出entry时,增加请求的rt耗时,异常数信息统计。

@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        Node node = context.getCurNode();

        if (context.getCurEntry().getBlockError() == null) {
            // Calculate response time (use completeStatTime as the time of completion).
            long completeStatTime = TimeUtil.currentTimeMillis();
            context.getCurEntry().setCompleteTimestamp(completeStatTime);
            long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

            Throwable error = context.getCurEntry().getError();

            // Record response time and success count.
            recordCompleteFor(node, count, rt, error);
            recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
            }
        }

        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        // fix bug https://github.com/alibaba/Sentinel/issues/2374
        fireExit(context, resourceWrapper, count, args);
    }
}

1.4 flowSlot

entry方法里checkFlow进行各种限流规则校验,如果不满足的话,会直接抛出FlowException异常。

@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {
        this(new FlowRuleChecker());
    }

    /**
     * Package-private for test.
     *
     * @param checker flow rule checker
     * @since 1.6.1
     */
    FlowSlot(FlowRuleChecker checker) {
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        @Override
        public Collection<FlowRule> apply(String resource) {
            // Flow rule map should not be null.
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };
}

1.5 DegradeSlot

熔断降级节点,根据资源名获取对应的circuitBreakers,遍历是否可以通过,不能通过则抛出DegradeException。这里是close直接返回true,否则进行断路器是否可以关闭的检测,到达一定的熔断时间则将状态改为half-open状态。
在exit方法中,执行对应circuitBreaker的onRequestComplete方法,里面会对请求的异常数和请求数进行统计,并根据统计信息和参数对断路器状态进行修改,包括half-open到close或open,以及超出限制时改为open状态。

/**
 * A {@link ProcessorSlot} dedicates to circuit breaking.
 *
 * @author Carpenter Lee
 * @author Eric Zhao
 */
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }
}

2 总结

sentinel限流熔断的主要逻辑就在调用链路上,processorSlotChain支持进行自定义扩展,通过spi的方式。
NodeSelectorSlot负责invocation Tree的搭建,里面会构建DefaultNode,同一resource且同一context有着相同的DefaultNode,resource调用出现嵌套时会有树状的调用路径,默认会有entranceNode。
ClusterBuilderSlot负责构建ClusterNode,同一个resource全局只有一个ClusterNode,是进行限流熔断的主要依据。
StatisticSlot负责进行具体的统计,会增加DefaultNode、ClusterNode、OriginNode、entryNode里的统计信息。
FlowSlot和DegradeSlot负责进行限流和降级,后面的文章单独讲。

Sentinel源码分析(三) - 调用链路
sentinel笔记 NodeSelectorSlot,ClusterBuilderSlot

相关文章

  • Sentinel 原理-滑动窗口

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Sentinel 原理-实体类

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Sentinel 实战-规则持久化

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Sentinel 实战-限流篇

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Sentinel 实战-控制台篇

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Sentinel 实战-集群限流

    逅弈 转载请注明原创出处,谢谢! 系列文章 Sentinel 原理-全解析Sentinel 原理-调用链Senti...

  • Spring-cloud-alibaba 之 Sentinel

    Sentienl 熔断降级 Sentinel 熔断降级会在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比...

  • 14.微服务保护Sentinel

    一、Sentinel安装介绍 雪崩问题微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪...

  • 02-sentinel限流与熔断

    1 sentinel应用场景之雪崩问题 雪崩问题 微服务调用链路中的某个服务故障,引起整个链路中所有微服务都不可用...

  • Sentinel降级规则

    服务降级 Sentinel 熔断降级会在调用链路中某个资源出现不稳定状态时(例如:调用超时或异常比例升高),对这个...

网友评论

      本文标题:sentinel 调用链路解析

      本文链接:https://www.haomeiwen.com/subject/xoancdtx.html