美文网首页Sentinel
Sentinel的职责链slot介绍

Sentinel的职责链slot介绍

作者: 晴天哥_王志 | 来源:发表于2021-06-20 21:10 被阅读0次

    系列

    开篇

    • Sentinel的核心在于职责链的构建和执行,职责链的每个节点为slot,这篇文章的主要目的是为了理顺Sentinel的职责链。

    职责链slot

    # Sentinel default ProcessorSlots
    com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
    com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
    com.alibaba.csp.sentinel.slots.logger.LogSlot
    com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
    com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
    com.alibaba.csp.sentinel.slots.system.SystemSlot
    com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
    com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
    
    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
        // 通过next保存下一个节点
        private AbstractLinkedProcessorSlot<?> next = null;
    }
    
    • 职责链的slot通过next保存下一个slot来进行串联。

    职责链构造过程

    public final class SlotChainProvider {
    
        private static volatile SlotChainBuilder slotChainBuilder = null;
    
        public static ProcessorSlotChain newSlotChain() {
            if (slotChainBuilder != null) {
                return slotChainBuilder.build();
            }
    
            slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
            if (slotChainBuilder == null) {
                slotChainBuilder = new DefaultSlotChainBuilder();
            } 
            // 通过DefaultSlotChainBuilder
            return slotChainBuilder.build();
        }
    }
    
    • SlotChainProvider通过DefaultSlotChainBuilder实现职责链的构建。
    public class DefaultSlotChainBuilder implements SlotChainBuilder {
    
        @Override
        public ProcessorSlotChain build() {
            // 职责链的头部对象ProcessorSlotChain
            ProcessorSlotChain chain = new DefaultProcessorSlotChain();
            // sortedSlotList返回按照
            List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
            for (ProcessorSlot slot : sortedSlotList) {
                if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                    continue;
                }
                // 通过尾添法将职责slot添加到DefaultProcessorSlotChain当中
                chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
            }
    
            return chain;
        }
    }
    
    • DefaultSlotChainBuilder包含职责链对象ProcessorSlotChain对象chain。
    • loadInstanceListSorted通过SPI机制来加载所有的slot节点。
    • ProcessorSlotChain的通过addLast来构建职责链。
    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
        // 创建DefaultProcessorSlotChain的头尾节点first和end
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
    
        };
        AbstractLinkedProcessorSlot<?> end = first;
    
        @Override
        public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
            end.setNext(protocolProcessor);
            end = protocolProcessor;
        }
    }
    
    • DefaultProcessorSlotChain通过尾插法构建职责链。
    • 整体的职责链如图所示,DefaultProcessorSlotChain负责维护整个职责链。
    • 职责链之间通过next节点进行相互串联。

    职责链执行过程

    public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    
        AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    
            @Override
            public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
                throws Throwable {
                // 执行fireEntry方法
                super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
            }
    
            @Override
            public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
                super.fireExit(context, resourceWrapper, count, args);
            }
        };
    
        AbstractLinkedProcessorSlot<?> end = first;
    
        // 执行的入口entry方法,会调用AbstractLinkedProcessorSlot的transformEntry
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    
    • DefaultProcessorSlotChain通过entry触发整个职责链的执行流程。
    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    
        private AbstractLinkedProcessorSlot<?> next = null;
    
        @Override
        public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            if (next != null) {
                next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
            }
        }
    
        @SuppressWarnings("unchecked")
        void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
            throws Throwable {
            T t = (T)o;
            entry(context, resourceWrapper, t, count, prioritized, args);
        }
    }
    
    
    public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            checkFlow(resourceWrapper, context, node, count, prioritized);
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }
    
    • 职责链按照 transformEntry => entry => fireEntry => transformEntry顺序循环执行。

    职责链功能介绍

    NodeSelectorSlot

    public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
    
        private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
            throws Throwable {
    
            DefaultNode node = map.get(context.getName());
            if (node == null) {
                synchronized (this) {
                    node = map.get(context.getName());
                    if (node == null) {
                        node = new DefaultNode(resourceWrapper, null);
                        HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                        cacheMap.putAll(map);
                        cacheMap.put(context.getName(), node);
                        map = cacheMap;
                        // Build invocation tree
                        ((DefaultNode) context.getLastNode()).addChild(node);
                    }
    
                }
            }
    
            context.setCurNode(node);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }
    
    • 负责生成DefaultNode。

    ClusterBuilderSlot

    public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
        private static final Object lock = new Object();
        private volatile ClusterNode clusterNode = null;
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args)
            throws Throwable {
            if (clusterNode == null) {
                synchronized (lock) {
                    if (clusterNode == null) {
                        // Create the cluster node.
                        clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                        HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                        newMap.putAll(clusterNodeMap);
                        newMap.put(node.getId(), clusterNode);
    
                        clusterNodeMap = newMap;
                    }
                }
            }
            node.setClusterNode(clusterNode);
    
            if (!"".equals(context.getOrigin())) {
                Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
                context.getCurEntry().setOriginNode(originNode);
            }
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    }
    
    • 负责生成clusterNode节点并保存到DefaultNode。

    LogSlot

    public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
            throws Throwable {
            try {
                fireEntry(context, resourceWrapper, obj, count, prioritized, args);
            } catch (BlockException e) {
                EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                    context.getOrigin(), count);
                throw e;
            } catch (Throwable e) {
                RecordLog.warn("Unexpected entry exception", e);
            }
        }
    }
    
    • 负责记录日志。

    StatisticSlot

    public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            try {
                // Do some checking.
                fireEntry(context, resourceWrapper, node, count, prioritized, args);
    
                // Request passed, add thread count and pass count.
                node.increaseThreadNum();
                node.addPassRequest(count);
    
                if (context.getCurEntry().getOriginNode() != null) {
                    // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                    context.getCurEntry().getOriginNode().addPassRequest(count);
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                    Constants.ENTRY_NODE.addPassRequest(count);
                }
    
                // Handle pass event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (PriorityWaitException ex) {
                node.increaseThreadNum();
                if (context.getCurEntry().getOriginNode() != null) {
                    // Add count for origin node.
                    context.getCurEntry().getOriginNode().increaseThreadNum();
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseThreadNum();
                }
                // Handle pass event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onPass(context, resourceWrapper, node, count, args);
                }
            } catch (BlockException e) {
                // Blocked, set block exception to current entry.
                context.getCurEntry().setBlockError(e);
    
                // Add block count.
                node.increaseBlockQps(count);
                if (context.getCurEntry().getOriginNode() != null) {
                    context.getCurEntry().getOriginNode().increaseBlockQps(count);
                }
    
                if (resourceWrapper.getEntryType() == EntryType.IN) {
                    // Add count for global inbound entry node for global statistics.
                    Constants.ENTRY_NODE.increaseBlockQps(count);
                }
    
                // Handle block event with registered entry callback handlers.
                for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                    handler.onBlocked(e, context, resourceWrapper, node, count, args);
                }
    
                throw e;
            } catch (Throwable e) {
                // Unexpected internal error, set error to current entry.
                context.getCurEntry().setError(e);
    
                throw e;
            }
        }
    }
    
    public class DefaultNode extends StatisticNode {
        @Override
        public void addRtAndSuccess(long rt, int successCount) {
            super.addRtAndSuccess(rt, successCount);
            this.clusterNode.addRtAndSuccess(rt, successCount);
        }
    
        @Override
        public void increaseThreadNum() {
            super.increaseThreadNum();
            this.clusterNode.increaseThreadNum();
        }
    }
    
    • 负责DefaultNode和clusterNode节点的数据统计。

    SystemSlot

    public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            SystemRuleManager.checkSystem(resourceWrapper);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            fireExit(context, resourceWrapper, count, args);
        }
    }
    
    • 系统自适应控制。

    AuthoritySlot

    public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
            throws Throwable {
            checkBlackWhiteAuthority(resourceWrapper, context);
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
            Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
    
            if (authorityRules == null) {
                return;
            }
    
            Set<AuthorityRule> rules = authorityRules.get(resource.getName());
            if (rules == null) {
                return;
            }
    
            for (AuthorityRule rule : rules) {
                if (!AuthorityRuleChecker.passCheck(rule, context)) {
                    throw new AuthorityException(context.getOrigin(), rule);
                }
            }
        }
    }
    
    • 授权控制。

    FlowSlot

    public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        private final FlowRuleChecker checker;
        public FlowSlot() {
            this(new FlowRuleChecker());
        }
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            checkFlow(resourceWrapper, context, node, count, prioritized);
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
            throws BlockException {
            checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
        }
    }
    
    • 流量控制。

    DegradeSlot

    public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {
            performChecking(context, resourceWrapper);
    
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
        }
    
        void performChecking(Context context, ResourceWrapper r) throws BlockException {
            List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
            if (circuitBreakers == null || circuitBreakers.isEmpty()) {
                return;
            }
            for (CircuitBreaker cb : circuitBreakers) {
                if (!cb.tryPass(context)) {
                    throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
                }
            }
        }
    }
    
    • 降级控制。

    相关文章

      网友评论

        本文标题:Sentinel的职责链slot介绍

        本文链接:https://www.haomeiwen.com/subject/utajyltx.html