Sentinel之集群原理

作者: 九点半的马拉 | 来源:发表于2020-05-07 13:32 被阅读0次

    之前的限流功能都是单机版的,只能统计本地的服务调用次数信息,那么如果是在集群状态下,一个服务被放在了多个服务器上,假设一个集群有5台机器,每台机器单机限流阈值为10qps,理想状态下整个集群的限流阈值就是50qps,不过实际状态下路由到每台机器的流量可能会不均匀,会导致总量没有到的情况下某些机器就开始限流。

    每个单机实例只关心自己的阈值,但是对于整个系统的全局阈值大家都漠不关心,当我们希望为某个api设置一个总的Qps时,那么单机模式下的限流就无法满足条件。
    单机版是在每个实例中进行统计,而集群版是有一个专门的实例进行统计。

    这个专门用来统计数据的称为Sentinel的token server,其他的实例作为Sentinel的token client会向token server去请求token,如果能获取到token,则说明当前的qps还未达到总的阈值,否则就说明已经达到集群的总阈值,当前实例就会被block。

    集群限流适合的场景:
    1) 在API Gateway处统计某个api的总访问量,并对某个api或者服务的总qps进行限制。
    2)Service Mesh中对服务间的调用进行全局流控。
    3)集群内对热点商品的总访问频次进行限制。

    起点

    Sentinel的集群限流是在FlowSlot中实现的。它会根据资源名找到所有的限流规则FlowRule,然后依次对每个规则调用canPassCheck进行判断,是否能够通过该限流规则。

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                        boolean prioritized) {
            String limitApp = rule.getLimitApp();
            if (limitApp == null) {
                return true;
            }
            if (rule.isClusterMode()) {
                return passClusterCheck(rule, context, node, acquireCount, prioritized);
            }
            return passLocalCheck(rule, context, node, acquireCount, prioritized);
        }
    

    如果该规则是应用在集群模式下,则会调用passClusterCheck方法。

    private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                boolean prioritized) {
            try {
                TokenService clusterService = pickClusterService(); \\@1
                if (clusterService == null) {
                    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);  //@2
                }
                long flowId = rule.getClusterConfig().getFlowId(); //@3
                TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);  //@4
                return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
                // If client is absent, then fallback to local mode.
            } catch (Throwable ex) {
                RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
            }
            // Fallback to local flow control when token client or server for this rule is not available.
            // If fallback is not enabled, then directly pass.
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        }
    

    @1代码处是获取当前节点是Token Client还是Token Server。
    1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient
    2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService

    @2代码处: 如果无法获取集群的的TokenService,那么该流量控制规则可以退化为单机限流模式。

    private static boolean fallbackToLocalOrPass(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                     boolean prioritized) {
            if (rule.getClusterConfig().isFallbackToLocalWhenFail()) {
                return passLocalCheck(rule, context, node, acquireCount, prioritized);
            } else {
                return true;
            }
        }
    

    @3代码处获取该流量控制的flowId,在集群模式下,每一个rule都有一个对应的ClusterFlowConfig

    ClusterFlowConfig类介绍:

    public class ClusterFlowConfig {
        // 全局唯一id
        private Long flowId;
        // 有两种阈值类型,一种是单机均摊,一种是集群总体模式
        private int thresholdType = ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL;
        //集群不可用时是否回退到单机模式
        private boolean fallbackToLocalWhenFail = true;
        
        private int strategy = ClusterRuleConstant.FLOW_CLUSTER_STRATEGY_NORMAL;
        // 集群采样数 10 
        private int sampleCount = ClusterRuleConstant.DEFAULT_CLUSTER_SAMPLE_COUNT;
       // 1000ms,即1秒
        private int windowIntervalMs = RuleConstant.DEFAULT_WINDOW_INTERVAL_MS;
    

    @4代码处根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClientDefaultTokenService

    下面分别从TokenClient和TokenService两个角色进行解读。

    DefaultClusterTokenClient

    public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
            // 如果flowId是无效的,或则count小于等于0
            // id == null || id <= 0 || count <= 0;
            if (notValidRequest(flowId, acquireCount)) {
                return badRequest();
            }
            // 新建一个请求对象
            FlowRequestData data = new FlowRequestData().setCount(acquireCount)
                .setFlowId(flowId).setPriority(prioritized);
            // 进一步封装为ClusterRequest,消息类型是Flow,
            //  MSG_TYPE_PING = 0; MSG_TYPE_FLOW = 1; MSG_TYPE_PARAM_FLOW = 2;
            ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
            try {
                // 然后向TokenServer发送请求
                TokenResult result = sendTokenRequest(request);
                logForResult(result);
                return result;
            } catch (Exception ex) {
                ClusterClientStatLogUtil.log(ex.getMessage());
                return new TokenResult(TokenResultStatus.FAIL);
            }
        }
    
    private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
            if (transportClient == null) {
                RecordLog.warn(
                    "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
                return clientFail();
            }
            ClusterResponse response = transportClient.sendRequest(request);
            TokenResult result = new TokenResult(response.getStatus());
            if (response.getData() != null) {
                FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
                result.setRemaining(responseData.getRemainingCount())
                    .setWaitInMs(responseData.getWaitInMs());
            }
            return result;
        }
    

    在客户端启动的时候会创建与TokenServer之间的连接,当发送请求时如何客户端对象为空,就会记录请求失败。

    DefaultTokenService

    Token Server收到客户端的请求后,会调用FlowRequestProcessorprocessRequest,最终会调用DefaultTokenServicerequestToken方法。

    @RequestType(ClusterConstants.MSG_TYPE_FLOW)
    public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
        @Override
        public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
            TokenService tokenService = TokenServiceProvider.getService();
            long flowId = request.getData().getFlowId();
            int count = request.getData().getCount();
            boolean prioritized = request.getData().isPriority();
            TokenResult result = tokenService.requestToken(flowId, count, prioritized);
            return toResponse(result, request);
        }
    
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
            // 和上面一样,先进行验证
            if (notValidRequest(ruleId, acquireCount)) {
                return badRequest();
            }
            // 从一个Map中进行查找 
            // private static final Map<Long, FlowRule> FLOW_RULES = new ConcurrentHashMap<>();
            FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
            // 没有该规则
            if (rule == null) {
                return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
            }
            // 服务端进行检查,是否发送令牌token
            return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
        }
    

    因为acquireClusterToken相对较长,故进行了拆分讲解。

    第一步:

    Long id = rule.getClusterConfig().getFlowId();
    if (!allowProceed(id)) {
       return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
    } // @1
    

    @1代码处首先判断是否允许本次许可申请,这是因为TokenServe支持嵌入式,即支持在应用节点中嵌入一个TokenServer,为了保证许可申请的请求不对正常业务造成比较大的影响,故对申请许可这个动作进行了限流。

    一旦触发了限流,将向客户端返回Too_Many_Request状态码,Sentinel支持按namespace进行限流,具体由GlobalRequestLimiter实现,该类的内部同样是基于滑动窗口进行收集,原理与FlowSlot相似,默认的限流TPS是3W。

    static boolean allowProceed(long flowId) {
            String namespace = ClusterFlowRuleManager.getNamespace(flowId);
            return GlobalRequestLimiter.tryPass(namespace);
        }
    
    public static boolean tryPass(String namespace) {
            if (namespace == null) {
                return false;
            }
            // private static final Map<String, RequestLimiter> GLOBAL_QPS_LIMITER_MAP = new ConcurrentHashMap<>();
            RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
            if (limiter == null) {
                return true;
            }
            return limiter.tryPass();
        }
    

    canPass方法是计算当前的通过数+1后是否超过qpsAllowed。

    public boolean tryPass() {
            if (canPass()) {
                add(1);
                return true;
            }
            return false;
    }
    
    public void add(int x) {
            data.currentWindow().value().add(x);
    }
    
    public boolean canPass() {
            return getQps() + 1 <= qpsAllowed;
    }
    
    public double getQps() {
            return getSum() / data.getIntervalInSecond();
    }
    private final LeapArray<LongAdder> data;
    
    public long getSum() {
            data.currentWindow();
            long success = 0;
    
            List<LongAdder> list = data.values();
            for (LongAdder window : list) {
                success += window.sum();
            }
            return success;
    }
    

    从上面可以看出在集群模式下也是利用时间窗口进行统计的。

    第二大步:

    根据FlowId获取对用的指标采集器metric

    private static final Map<Long, ClusterMetric> METRIC_MAP = new ConcurrentHashMap<>();  
    
    ClusterMetric metric = ClusterMetricStatistics.getMetric(id);  
    if (metric == null) {
        return new TokenResult(TokenResultStatus.FAIL);
    }
    

    该Metric具体的是一个ClusterMetricLeapArray,与之前的OccupiableBucketLeapArray类似,多了一个记录有关抢占数据的数组。

    public class ClusterMetricLeapArray extends LeapArray<ClusterMetricBucket> {
    
        private final LongAdder[] occupyCounter;
        private boolean hasOccupied = false;
    
        public ClusterMetricLeapArray(int sampleCount, int intervalInMs) {
            super(sampleCount, intervalInMs);
            ClusterFlowEvent[] events = ClusterFlowEvent.values();
            this.occupyCounter = new LongAdder[events.length];
            for (ClusterFlowEvent event : events) {
                occupyCounter[event.ordinal()] = new LongAdder();
            }
        }
    

    第三大步:

    获取当前通过的Qps,设置的总许可数和剩余的许可数。

    double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
    double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
    double nextRemaining = globalThreshold - latestQps - acquireCount;
    

    如果是FLOW_THRESHOLD_GLOBAL,即集群的许可数等于限流规则中配置的count值。
    如果是FLOW_THRESHOLD_AVG_LOCAL,此时限流规则中配置的值只是单机的count值,还要乘以集群中客户端的数量。 上面的getExceedCount默认是1.0。

    private static double calcGlobalThreshold(FlowRule rule) {
            double count = rule.getCount();
            switch (rule.getClusterConfig().getThresholdType()) {
                case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL:
                    return count;
                case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL:
                default:
                    int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId());
                    return count * connectedCount;
            }
    }
    

    第四大步:

    如果剩余的许可数大于等于0,更新当前的统计信息。

    if (nextRemaining >= 0) {
         //增加通过数和通过的请求数
         metric.add(ClusterFlowEvent.PASS, acquireCount);
         metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
         if (prioritized) {
               // Add prioritized pass.
               // Pass (pre-occupy incoming buckets)
                metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
          }
          // Remaining count is cut down to a smaller integer.
          return new TokenResult(TokenResultStatus.OK)
                    .setRemaining((int) nextRemaining)
                    .setWaitInMs(0);
    } 
    

    第五大步:

    如果剩余数小于0。

    if (prioritized) {
           // Try to occupy incoming buckets.
           // Waiting due to flow shaping or for next bucket tick.
           // 获取当前等待的Qps(以1s为维度,当前等待的请求数量)
           double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
           // 如果当前等待的Qps低于可借用未来窗口的许可阈值时,可通过,但要设置等待时间
           if (occupyAvg <= 
            // 默认是1.0 , 后面是全局的通过阈值数
            ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
            // 计算等待的时间
            int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                        // waitInMs > 0 indicates pre-occupy incoming buckets successfully.
                        if (waitInMs > 0) {
                            ClusterServerStatLogUtil.log("flow|waiting|" + id);
                            return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                                .setRemaining(0)
                                .setWaitInMs(waitInMs);
               }
         // Or else occupy failed, should be blocked.
         }
    }
    // Blocked.
    // 发生阻塞,当前请求不能通过,增加与阻塞相关指标的统计数。 
    metric.add(ClusterFlowEvent.BLOCK, acquireCount);
    metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
    if (prioritized) {
          // Add prioritized block.
          metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
          ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
    }
    
         return blockedResult();
    }
    

    下面再讲解下如何计算等待时间的。

    // event 为 pass
    public int tryOccupyNext(ClusterFlowEvent event, int acquireCount, double threshold) {
            // 当前的通过数
            double latestQps = getAvg(ClusterFlowEvent.PASS);
            // 判断是否支持抢占
            if (!canOccupy(event, acquireCount, latestQps, threshold)) {
                return 0;
            }
            
            // 在抢占数组中添加本次的占用数
            /**
            *  public void addOccupyPass(int count) {
            occupyCounter[ClusterFlowEvent.PASS.ordinal()].add(count);
            occupyCounter[ClusterFlowEvent.PASS_REQUEST.ordinal()].add(1);
            this.hasOccupied = true;
             }
            **/
            metric.addOccupyPass(acquireCount);
            
            // 在普通的时间窗口增加等待数
            /**
            *  public void add(ClusterFlowEvent event, long count) {
            metric.currentWindow().value().add(event, count);
        }
            **/
            add(ClusterFlowEvent.WAITING, acquireCount);
            // 这里有些不懂,sampleCount默认值应该是10,
            // 这样的话返回的是一个时间窗口的大小
            return 1000 / metric.getSampleCount();
    }
    

    从上面可以看出,canOccupy是一个关键方法。

    private boolean canOccupy(ClusterFlowEvent event, int acquireCount, double latestQps, double threshold) {
            long headPass = metric.getFirstCountOfWindow(event);
            /**
            *  public long getOccupiedCount(ClusterFlowEvent event) {
            return occupyCounter[event.ordinal()].sum();
        }
            **/
            // 获得Pass事件下的已占用的数  
            long occupiedCount = metric.getOccupiedCount(event);
            // 已通过的请求数 + 本次需要的请求数 + 占用的请求数  - 第一个统计的时间窗口的数 
            // 如果小于等于阈值,即可以抢占
            return latestQps + (acquireCount + occupiedCount) - headPass <= threshold;
        }
    
    public long getFirstCountOfWindow(ClusterFlowEvent event) {
            if (event == null) {
                return 0;
            }
            WindowWrap<ClusterMetricBucket> windowWrap = getValidHead();
            if (windowWrap == null) {
                return 0;
            }
            return windowWrap.value().get(event);
    }
    
    public WindowWrap<T> getValidHead() {
            return getValidHead(TimeUtil.currentTimeMillis());
    }
    
    WindowWrap<T> getValidHead(long timeMillis) {
            // Calculate index for expected head time.
            int idx = calculateTimeIdx(timeMillis + windowLengthInMs);
            WindowWrap<T> wrap = array.get(idx);
            if (wrap == null || isWindowDeprecated(wrap)) {
                return null;
            }
    
            return wrap;
    }
    

    执行到这里,服务端已经判断出是否要发送令牌。

    然后再将代码跳回到客户端请求发送的位置。

    private TokenResult sendTokenRequest(ClusterRequest request) throws Exception {
            if (transportClient == null) {
                RecordLog.warn(
                    "[DefaultClusterTokenClient] Client not created, please check your config for cluster client");
                return clientFail();
            }
            ClusterResponse response = transportClient.sendRequest(request);
            TokenResult result = new TokenResult(response.getStatus());
            if (response.getData() != null) {
                FlowTokenResponseData responseData = (FlowTokenResponseData)response.getData();
                result.setRemaining(responseData.getRemainingCount())
                    .setWaitInMs(responseData.getWaitInMs());
            }
            return result;
        }
    

    从上面可以看出,会将返回的结果中的剩余数量,等待时间。

    那如何处理得到的结果Result。

    private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                             DefaultNode node,
                                                             int acquireCount, boolean prioritized) {
            switch (result.getStatus()) {
                case TokenResultStatus.OK:   
                    return true;
                case TokenResultStatus.SHOULD_WAIT:
                    // Wait for next tick.
                    try {
                        Thread.sleep(result.getWaitInMs());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return true;
                case TokenResultStatus.NO_RULE_EXISTS:
                case TokenResultStatus.BAD_REQUEST:
                case TokenResultStatus.FAIL:
                case TokenResultStatus.TOO_MANY_REQUEST:
                    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
                case TokenResultStatus.BLOCKED:
                default:
                    return false;
            }
        }
    }
    

    从上面可以看出如果状态是OK的话,返回true,允许通过。

    如果是在优先级情况下,支持抢占,则根据返回的等待时间进行等待。

    如果是其他的状态:则回退到单机模式进行判断。

    默认情况下,返回false。

    至此,关于关于集群限流就讲解完了,但是在关于计算等待时间的逻辑还是有些不清楚,欢迎大家交流。

    相关文章

      网友评论

        本文标题:Sentinel之集群原理

        本文链接:https://www.haomeiwen.com/subject/bjtaghtx.html