集群限流源码分析(一)
- sentinel-cluster-common-default: 公共模块,包含公共接口和实体
- sentinel-cluster-client-default: 默认集群流控 client 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展
- sentinel-cluster-server-default: 默认集群流控 server 模块,使用 Netty 进行通信,提供接口方便序列化协议扩展;
同时提供扩展接口对接规则判断的具体实现(TokenService),默认实现是复用 sentinel-core 的相关逻辑
sentinel-cluster-common-default
源码内容如下:
image.png
可以看到该模块下主要定义公共的注解、接口公共方法。
codec包
数据流接口该包下定义EntityWriter和EntityDecoder两个顶层接口。
-
EntityWriter
:用于对目标数据的序列化
public interface EntityWriter<E, T> {
/**
* Write the provided entity to target stream.
*
* @param entity entity to publish
* @param target the target stream
*/
void writeTo(E entity, T target);
}
-
EntityDecoder
:对响应数据的反序列接口
public interface EntityDecoder<S, T> {
/**
* Decode target object from source stream.
*
* @param source source stream
* @return decoded target object
*/
T decode(S source);
}
-
RequestEntityWriter
:客户端请求的数据序列化处理接口,继承自EntityWriter;
public interface RequestEntityWriter<E extends Request, T> extends EntityWriter<E, T> {
}
请求实体对象继承自Request
-
RequestEntityDecoder
:客户端对服务返回的数据反序列化接口,继续自EntityDecoder:
public interface RequestEntityDecoder<S, T extends Request> extends EntityDecoder<S, T> {
}
-
ResponseEntityDecoder
:服务端对请求客户端的数据反序列化接口,继续自EntityDecoder:
public interface ResponseEntityDecoder<S, T extends Response> extends EntityDecoder<S, T> {
}
响应实体对象继承自Response
-
ResponseEntityWriter
:服务端对响应数据的序列化接口,继承自EntityWriter。
public interface ResponseEntityWriter<E extends Response, T> extends EntityWriter<E, T> {
}
request包
请求对象接口Request
public interface Request {
/**
* Get request type.
*
* @return request type
*/
int getType();
/**
* Get request ID.
*
* @return unique request ID
*/
int getId();
}
- 该接口定义两个方法:
getType()
和getId()
;用于获取请求的类型和Id;实际上就是ClusterRequest实体对象的get方法。
ClusterRequest
public class ClusterRequest<T> implements Request {
private int id;
private int type;
private T data;
public ClusterRequest() {}
public ClusterRequest(int id, int type, T data) {
this.id = id;
this.type = type;
this.data = data;
}
//省略部分代码
}
- ClusterRequest:集群客户端请求对象,对象实体是泛型T表示
FlowRequestData、ParamFlowRequestData
这两个对象表示正常限流请求对象,以及热点参数限流对象;源码如下:
public class FlowRequestData {
//限流id,规则id
private long flowId;
private int count;
private boolean priority;
}
public class ParamFlowRequestData {
private long flowId;
private int count;
private Collection<Object> params;
}
- 两个主体具体用法后面分析;
response包
响应对象接口Response
public interface Response {
/**
* Get response ID.
*
* @return response ID
*/
int getId();
/**
* Get response type.
*
* @return response type
*/
int getType();
/**
* Get response status.
*
* @return response status
*/
int getStatus();
}
- 和Request接口一样,定义了几个获取Id,type,status的方法。
ClusterResponse:集群响应对象
public class ClusterResponse<T> implements Response {
private int id;
private int type;
private int status;
//响应的具体数据
private T data;
public ClusterResponse() {}
//省略部分代码
}
FlowTokenResponseData:服务端响应的data
public class FlowTokenResponseData {
//剩余次数
private int remainingCount;
//等待时间
private int waitInMs;
}
ClusterTransportClient接口
public interface ClusterTransportClient {
/**
* Start the client.
*
* @throws Exception some error occurred (e.g. initialization failed)
*/
//启动客户端
void start() throws Exception;
/**
* Stop the client.
*
* @throws Exception some error occurred (e.g. shutdown failed)
*/
//停止客户端
void stop() throws Exception;
/**
* Send request to remote server and get response.
*
* @param request Sentinel cluster request
* @return response from remote server
* @throws Exception some error occurs
*/
//发送请求
ClusterResponse sendRequest(ClusterRequest request) throws Exception;
/**
* Check whether the client has been started and ready for sending requests.
*
* @return true if the client is ready to send requests, otherwise false
*/
//判断客户端是否准备好
boolean isReady();
}
- ClusterTransportClient客户端传送接口,定义4个方法如上;默认实现类NettyTransportClient在client模块中,后面讲解。
ConfigSupplierRegistry配置注册
public final class ConfigSupplierRegistry {
/**
* The default namespace supplier provides appName as namespace.
*/
private static final Supplier<String> DEFAULT_APP_NAME_SUPPLIER = new Supplier<String>() {
@Override
public String get() {
return AppNameUtil.getAppName();
}
};
/**
* Registered namespace supplier.
*/
private static Supplier<String> namespaceSupplier = DEFAULT_APP_NAME_SUPPLIER;
/**
* Get the registered namespace supplier.
*
* @return the registered namespace supplier
*/
public static Supplier<String> getNamespaceSupplier() {
return namespaceSupplier;
}
public static void setNamespaceSupplier(Supplier<String> namespaceSupplier) {
AssertUtil.notNull(namespaceSupplier, "namespaceSupplier cannot be null");
ConfigSupplierRegistry.namespaceSupplier = namespaceSupplier;
RecordLog.info("[ConfigSupplierRegistry] New namespace supplier provided, current supplied: "
+ namespaceSupplier.get());
}
private ConfigSupplierRegistry() {}
- 集群限流服务端作用设置或获取,默认是服务的name
其他
- ClusterConstants:常量定义
- ClusterErrorMessages:器群错误消息定义
- SentinelClusterException:定义了SentinelCluster异常
- RequestType:定义了一个请求类型注解
网友评论