背景
sentinel作为开源的限流组件,官方的介绍比较简单。但实际使用起来会有各种问题,需要看源码解决。之前写过一篇sentinel源码解析的文章,介绍了一些核心类的工作原理。接下来会分模块展开源码学习分享。
本次分享是sentinel通信机制。
如何使用
官方对sentinel的使用描述非常简单
1.在客户端引入依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>x.y.z</version></dependency>
2.启动客户端应用
启动时加入 JVM 参数 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口。若启动多个应用,则需要通过 -Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是 8719)。
然而并不是那么顺利
问题来了
按github上的说明进行试验,控制台上并没有出现客户端的应用名称
我们刚才引入了sentinel-transport-simple-http这个包,看这个包名像是通信依赖的,于是找到源码里这个包所在的位置。
sentinel-transport下的
sentinel-transport-simple-http和sentinel-transport-netty-http
都依赖sentinel-transport-common
sentinel-transport-common下有两个spi
com.alibaba.csp.sentinel.command.CommandHandler下是一系列客户端暴露的api,用于服务端获取簇点链路、监控数据等
com.alibaba.csp.sentinel.init.InitFunc下是spi的初始化,这是客户端和控制台通信的关键部分。两个
CommandCenterInitFunc: 客户端发命令
HeartbeatSenderInitFunc:启动客户端socket
CommandCenterInitFunc源码
public class SimpleHttpCommandCenter implements CommandCenter {
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
省略不关键代码
...
Runnable serverInitTask = new Runnable() {
int port;
{
try {
//获取本地socket的port
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
/**
* Get a server socket from an available port from a base port.<br>
* Increasing on port number will occur when the port has already been used.
*
* @param basePort base port to start
* @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
// 端口如果被占用,自动重试下个端口
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
}
客户端会启动一个socket来监听httpEvent,让服务端读取监控数据。如果配置了jvm参数 csp.sentinel.api.port,以配置为准,否则默认8719端口。创建socket时,如果失败,获取端口失败则自动重试
由于测试环境当时是docker,没法获取未暴露的端口,所以监控数据拿不到。后来让运维传端口的方式解决这个问题
SimpleHttpHeartbeatSender 用于向服务端发送心跳,注册机器信息.sendHeartbeat方法用于
sendHeartbeat只在HeartbeatSenderInitFunc这个spi加载的时候被初始化。并且5000ms发一次心跳
public class SimpleHttpHeartbeatSender implements HeartbeatSender {
public boolean sendHeartbeat() throws Exception {
if (TransportConfig.getRuntimePort() <= 0) {
RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
return false;
}
InetSocketAddress addr = getAvailableAddress();
if (addr == null) {
return false;
}
SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
request.setParams(heartBeat.generateCurrentMessage());
try {
SimpleHttpResponse response = httpClient.post(request);
if (response.getStatusCode() == OK_STATUS) {
return true;
}
} catch (Exception e) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
}
return false;
}
getAvailableAddress会获取控制台的地址
private List<InetSocketAddress> getDefaultConsoleIps() {
List<InetSocketAddress> newAddrs = new ArrayList<InetSocketAddress>();
try {
String ipsStr = TransportConfig.getConsoleServer();
if (StringUtil.isEmpty(ipsStr)) {
RecordLog.warn("[SimpleHttpHeartbeatSender] Dashboard server address not configured");
return newAddrs;
}
for (String ipPortStr : ipsStr.split(",")) {
if (ipPortStr.trim().length() == 0) {
continue;
}
if (ipPortStr.startsWith("http://")) {
ipPortStr = ipPortStr.trim().substring(7);
}
String[] ipPort = ipPortStr.trim().split(":");
int port = 80;
if (ipPort.length > 1) {
port = Integer.parseInt(ipPort[1].trim());
}
newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port));
}
} catch (Exception ex) {
RecordLog.warn("[SimpleHeartbeatSender] Parse dashboard list failed, current address list: " + newAddrs, ex);
ex.printStackTrace();
}
return newAddrs;
}
在配置域名的情况下,每台机器会都发一遍。但代码里写死只支持http协议
运维给每个域名都加了https,所有控制台不会显示机器。
sentinel-transport-netty-http的实现类似,只不过socket被替换成了主从reactor。性能更好。
以上就是sentient通信的原理
网友评论