美文网首页服务监控和治理
Sentinel之集群限流源码分析(一)

Sentinel之集群限流源码分析(一)

作者: 橘子_好多灰 | 来源:发表于2020-03-21 14:01 被阅读0次

    集群限流源码分析(一)

    • sentinel-cluster-common-default: 公共模块,包含公共接口和实体
    • sentinel-cluster-client-default: 默认集群流控 client 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展
    • sentinel-cluster-server-default: 默认集群流控 server 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展;
      同时提供扩展接口对接规则判断的具体实现(TokenService),默认实现是复用 sentinel-core 的相关逻辑

    sentinel-cluster-common-default

    源码内容如下:


    image.png

    可以看到该模块下主要定义公共的注解、接口公共方法。

    codec包

    数据流接口

    该包下定义EntityWriter和EntityDecoder两个顶层接口。

    • EntityWriter:用于对目标数据的序列化
    public interface EntityWriter<E, T> {
    
        /**
         * Write the provided entity to target stream.
         *
         * @param entity entity to publish
         * @param target the target stream
         */
        void writeTo(E entity, T target);
    }
    
    • EntityDecoder:对响应数据的反序列接口
    public interface EntityDecoder<S, T> {
    
        /**
         * Decode target object from source stream.
         *
         * @param source source stream
         * @return decoded target object
         */
        T decode(S source);
    }
    
    • RequestEntityWriter:客户端请求的数据序列化处理接口,继承自EntityWriter;
    public interface RequestEntityWriter<E extends Request, T> extends EntityWriter<E, T> {
    
    }
    

    请求实体对象继承自Request

    • RequestEntityDecoder:客户端对服务返回的数据反序列化接口,继续自EntityDecoder:
    public interface RequestEntityDecoder<S, T extends Request> extends EntityDecoder<S, T> {
    
    }
    
    • ResponseEntityDecoder:服务端对请求客户端的数据反序列化接口,继续自EntityDecoder:
    public interface ResponseEntityDecoder<S, T extends Response> extends EntityDecoder<S, T> {
    
    }
    

    响应实体对象继承自Response

    • ResponseEntityWriter:服务端对响应数据的序列化接口,继承自EntityWriter。
    public interface ResponseEntityWriter<E extends Response, T> extends EntityWriter<E, T> {
    
    }
    

    request包

    请求对象接口Request

    public interface Request {
    
        /**
         * Get request type.
         *
         * @return request type
         */
        int getType();
    
        /**
         * Get request ID.
         *
         * @return unique request ID
         */
        int getId();
    }
    
    • 该接口定义两个方法:getType()getId();用于获取请求的类型和Id;实际上就是ClusterRequest实体对象的get方法。

    ClusterRequest

    public class ClusterRequest<T> implements Request {
    
        private int id;
        private int type;
    
        private T data;
    
        public ClusterRequest() {}
    
        public ClusterRequest(int id, int type, T data) {
            this.id = id;
            this.type = type;
            this.data = data;
        }
        //省略部分代码
    }
    
    • ClusterRequest:集群客户端请求对象,对象实体是泛型T表示

    FlowRequestData、ParamFlowRequestData
    这两个对象表示正常限流请求对象,以及热点参数限流对象;源码如下:

    public class FlowRequestData {
        //限流id,规则id
        private long flowId;
        private int count;
        private boolean priority;
    }
    
    public class ParamFlowRequestData {
    
        private long flowId;
        private int count;
        private Collection<Object> params;
    }
    
    • 两个主体具体用法后面分析;

    response包

    响应对象接口Response

    public interface Response {
        /**
         * Get response ID.
         *
         * @return response ID
         */
        int getId();
    
        /**
         * Get response type.
         *
         * @return response type
         */
        int getType();
    
        /**
         * Get response status.
         *
         * @return response status
         */
        int getStatus();
    }
    
    • 和Request接口一样,定义了几个获取Id,type,status的方法。
      ClusterResponse:集群响应对象
    public class ClusterResponse<T> implements Response {
    
        private int id;
        private int type;
        private int status;
    
        //响应的具体数据
        private T data;
    
        public ClusterResponse() {}
        
        //省略部分代码
    }
    

    FlowTokenResponseData:服务端响应的data

    public class FlowTokenResponseData {
    
        //剩余次数
        private int remainingCount;
        //等待时间
        private int waitInMs;
    }
    

    ClusterTransportClient接口

    public interface ClusterTransportClient {
    
        /**
         * Start the client.
         *
         * @throws Exception some error occurred (e.g. initialization failed)
         */
         //启动客户端
        void start() throws Exception;
    
        /**
         * Stop the client.
         *
         * @throws Exception some error occurred (e.g. shutdown failed)
         */
         //停止客户端
        void stop() throws Exception;
    
        /**
         * Send request to remote server and get response.
         *
         * @param request Sentinel cluster request
         * @return response from remote server
         * @throws Exception some error occurs
         */
         //发送请求
        ClusterResponse sendRequest(ClusterRequest request) throws Exception;
    
        /**
         * Check whether the client has been started and ready for sending requests.
         *
         * @return true if the client is ready to send requests, otherwise false
         */
         //判断客户端是否准备好
        boolean isReady();
    }
    
    • ClusterTransportClient客户端传送接口,定义4个方法如上;默认实现类NettyTransportClient在client模块中,后面讲解。

    ConfigSupplierRegistry配置注册

    public final class ConfigSupplierRegistry {
    
        /**
         * The default namespace supplier provides appName as namespace.
         */
        private static final Supplier<String> DEFAULT_APP_NAME_SUPPLIER = new Supplier<String>() {
            @Override
            public String get() {
                return AppNameUtil.getAppName();
            }
        };
        /**
         * Registered namespace supplier.
         */
        private static Supplier<String> namespaceSupplier = DEFAULT_APP_NAME_SUPPLIER;
    
        /**
         * Get the registered namespace supplier.
         *
         * @return the registered namespace supplier
         */
        public static Supplier<String> getNamespaceSupplier() {
            return namespaceSupplier;
        }
    
        public static void setNamespaceSupplier(Supplier<String> namespaceSupplier) {
            AssertUtil.notNull(namespaceSupplier, "namespaceSupplier cannot be null");
            ConfigSupplierRegistry.namespaceSupplier = namespaceSupplier;
            RecordLog.info("[ConfigSupplierRegistry] New namespace supplier provided, current supplied: "
                + namespaceSupplier.get());
        }
    
        private ConfigSupplierRegistry() {}
    
    • 集群限流服务端作用设置或获取,默认是服务的name

    其他

    • ClusterConstants:常量定义
    • ClusterErrorMessages:器群错误消息定义
    • SentinelClusterException:定义了SentinelCluster异常
    • RequestType:定义了一个请求类型注解

    相关文章

      网友评论

        本文标题:Sentinel之集群限流源码分析(一)

        本文链接:https://www.haomeiwen.com/subject/tcmyyhtx.html