美文网首页Sentinel
Sentinel集群流量控制

Sentinel集群流量控制

作者: 晴天哥_王志 | 来源:发表于2021-12-18 20:57 被阅读0次

系列

集群限流架构

集群限流架构
  • Sentinel的集群限流的架构就是各client 节点向中心节点发起请求(携带请求个数),由中心节点进行计算返回是否通过。

集群限流源码

Sentinel 1.4.0 开始引入了集群流控模块,主要包含以下几部分:

  • sentinel-cluster-common-default: 公共模块,包含公共接口和实体
  • sentinel-cluster-client-default: 默认集群流控 client 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展
  • sentinel-cluster-server-default: 默认集群流控 server 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展;同时提供扩展接口对接规则判断的具体实现(TokenService),默认实现是复用 sentinel-core 的相关逻辑

client

public class FlowRuleChecker {

    public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        // @1 针对集群模式,通过passClusterCheck来进行是否允许通过
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }

        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService();
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            // @2 向 tokenServer 发送获取token 获取请求
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }
}

// @3 DefaultClusterTokenClient底层使用 netty 进行通信
public class DefaultClusterTokenClient implements ClusterTokenClient {

    @Override
    public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
        if (notValidRequest(flowId, acquireCount)) {
            return badRequest();
        }
        FlowRequestData data = new FlowRequestData().setCount(acquireCount)
            .setFlowId(flowId).setPriority(prioritized);
        ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
        try {
            TokenResult result = sendTokenRequest(request);
            logForResult(result);
            return result;
        } catch (Exception ex) {
            ClusterClientStatLogUtil.log(ex.getMessage());
            return new TokenResult(TokenResultStatus.FAIL);
        }
    }
}
  • @1 针对集群模式,通过passClusterCheck来判断是否允许通过。
  • @2 由DefaultClusterTokenClient向 tokenServer 发送获取token 获取请求。
  • @3 DefaultClusterTokenClient底层使用 netty 进行通信。

server

public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {

    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
        TokenService tokenService = TokenServiceProvider.getService();

        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();

        TokenResult result = tokenService.requestToken(flowId, count, prioritized);
        return toResponse(result, request);
    }
}


public class DefaultTokenService implements TokenService {

    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        // @1 通过ClusterFlowChecker处理 token 获取请求
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }
}

final class ClusterFlowChecker {

    static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) {
        Long id = rule.getClusterConfig().getFlowId();

        // @2 根据限流的总数 - 已经发放token数 - 当前申请的 token 数
        double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
        double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
        double nextRemaining = globalThreshold - latestQps - acquireCount;
        
        // @3 剩余 token 数大于0说明可以通过
        if (nextRemaining >= 0) {
            metric.add(ClusterFlowEvent.PASS, acquireCount);
            metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
            if (prioritized) {
                metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
            }
            return new TokenResult(TokenResultStatus.OK)
                .setRemaining((int) nextRemaining)
                .setWaitInMs(0);
        } else {
            if (prioritized) {
                double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
                if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
                    int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                    if (waitInMs > 0) {
                        ClusterServerStatLogUtil.log("flow|waiting|" + id);
                        return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                            .setRemaining(0)
                            .setWaitInMs(waitInMs);
                    }
                }
            }

            // 进行限流
            metric.add(ClusterFlowEvent.BLOCK, acquireCount);
            metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
            ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
            ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
            if (prioritized) {
                metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
                ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
            }

            return blockedResult();
        }
    }
}
  • FlowRequestProcessor的processRequest负责处理集群限流的 token 请求。
  • DefaultTokenService作为 server 侧调用ClusterFlowChecker.acquireClusterToken进行处理。
  • ClusterFlowChecker内部负责处理 token 申请,整体逻辑就是总 token 减去已经发放和此次需要方法的数量,如果剩余就不限流(特殊处理处理剩余不够但是允许通过),否则就进行限流。

集群限流服务端部署

独立部署
  • 独立模式(Alone),即作为独立的 token server 进程启动,独立部署,隔离性好,但是需要额外的部署操作。独立模式适合作为 Global Rate Limiter 给集群提供流控服务。
  • 如果独立部署的 token server 服务挂掉的话,那其他的 token client 就会退化成本地流控的模式,也就是单机版的流控,所以这种方式的集群限流需要保证 token server 的高可用性。
嵌入部署
  • 嵌入模式(Embedded),即作为内置的 token server 与服务在同一进程中启动。在此模式下,集群中各个实例都是对等的,token server 和 client 可以随时进行转变,因此无需单独部署,灵活性比较好。但是隔离性不佳,需要限制 token server 的总 QPS,防止影响应用本身。嵌入模式适合某个应用集群内部的流控。
  • 嵌入式部署的模式中,如果 token server 服务挂掉的话,我们可以将另外一个 token client 升级为token server。

参考

相关文章

网友评论

    本文标题:Sentinel集群流量控制

    本文链接:https://www.haomeiwen.com/subject/sgzwfrtx.html