美文网首页
sentinel通信机制

sentinel通信机制

作者: 自由人_ded7 | 来源:发表于2020-03-06 16:08 被阅读0次

背景

sentinel作为开源的限流组件,官方的介绍比较简单。但实际使用起来会有各种问题,需要看源码解决。之前写过一篇sentinel源码解析的文章,介绍了一些核心类的工作原理。接下来会分模块展开源码学习分享。
本次分享是sentinel通信机制。

如何使用

官方对sentinel的使用描述非常简单

1.在客户端引入依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
    <version>x.y.z</version></dependency>

2.启动客户端应用

启动时加入 JVM 参数 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口。若启动多个应用,则需要通过 -Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是 8719)。

然而并不是那么顺利

问题来了

按github上的说明进行试验,控制台上并没有出现客户端的应用名称

我们刚才引入了sentinel-transport-simple-http这个包,看这个包名像是通信依赖的,于是找到源码里这个包所在的位置。

sentinel-transport下的
sentinel-transport-simple-httpsentinel-transport-netty-http
都依赖sentinel-transport-common


sentinel-transport-common下有两个spi

  com.alibaba.csp.sentinel.command.CommandHandler下是一系列客户端暴露的api,用于服务端获取簇点链路、监控数据等
   com.alibaba.csp.sentinel.init.InitFunc下是spi的初始化,这是客户端和控制台通信的关键部分。两个
   CommandCenterInitFunc: 客户端发命令
   HeartbeatSenderInitFunc:启动客户端socket

CommandCenterInitFunc源码




public class SimpleHttpCommandCenter implements CommandCenter {

   
    @Override
    @SuppressWarnings("rawtypes")
    public void beforeStart() throws Exception {
        // Register handlers
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        registerCommands(handlers);
    }

    @Override
    public void start() throws Exception {
        省略不关键代码
        ...
        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    //获取本地socket的port
                    port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception e) {
                    port = DEFAULT_PORT;
                }
            }

            @Override
            public void run() {
                boolean success = false;
                ServerSocket serverSocket = getServerSocketFromBasePort(port);

                if (serverSocket != null) {
                    CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                    socketReference = serverSocket;
                    executor.submit(new ServerThread(serverSocket));
                    success = true;
                    port = serverSocket.getLocalPort();
                } else {
                    CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
                }

                if (!success) {
                    port = PORT_UNINITIALIZED;
                }

                TransportConfig.setRuntimePort(port);
                executor.shutdown();
            }

        };

        new Thread(serverInitTask).start();
    }

    /**
     * Get a server socket from an available port from a base port.<br>
     * Increasing on port number will occur when the port has already been used.
     *
     * @param basePort base port to start
     * @return new socket with available port
     */
    private static ServerSocket getServerSocketFromBasePort(int basePort) {
        int tryCount = 0;
        while (true) {
            try {
                // 端口如果被占用,自动重试下个端口
                ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
                server.setReuseAddress(true);
                return server;
            } catch (IOException e) {
                tryCount++;
                try {
                    TimeUnit.MILLISECONDS.sleep(30);
                } catch (InterruptedException e1) {
                    break;
                }
            }
        }
        return null;
    }

 
    class ServerThread extends Thread {

        private ServerSocket serverSocket;

        ServerThread(ServerSocket s) {
            this.serverSocket = s;
            setName("sentinel-courier-server-accept-thread");
        }

        @Override
        public void run() {
            while (true) {
                Socket socket = null;
                try {
                    socket = this.serverSocket.accept();
                    setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    bizExecutor.submit(eventTask);
                } catch (Exception e) {
                    CommandCenterLog.info("Server error", e);
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e1) {
                            CommandCenterLog.info("Error when closing an opened socket", e1);
                        }
                    }
                    try {
                        // In case of infinite log.
                        Thread.sleep(10);
                    } catch (InterruptedException e1) {
                        // Indicates the task should stop.
                        break;
                    }
                }
            }
        }
    }
  
}


客户端会启动一个socket来监听httpEvent,让服务端读取监控数据。如果配置了jvm参数 csp.sentinel.api.port,以配置为准,否则默认8719端口。创建socket时,如果失败,获取端口失败则自动重试

由于测试环境当时是docker,没法获取未暴露的端口,所以监控数据拿不到。后来让运维传端口的方式解决这个问题


SimpleHttpHeartbeatSender 用于向服务端发送心跳,注册机器信息.sendHeartbeat方法用于
sendHeartbeat只在HeartbeatSenderInitFunc这个spi加载的时候被初始化。并且5000ms发一次心跳

public class SimpleHttpHeartbeatSender implements HeartbeatSender {
     public boolean sendHeartbeat() throws Exception {
             if (TransportConfig.getRuntimePort() <= 0) {
                 RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
                 return false;
             }
             InetSocketAddress addr = getAvailableAddress();
             if (addr == null) {
                 return false;
             }
             SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
             request.setParams(heartBeat.generateCurrentMessage());
             try {
                 SimpleHttpResponse response = httpClient.post(request);
                 if (response.getStatusCode() == OK_STATUS) {
                     return true;
                 }
             } catch (Exception e) {
                 RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
             }
             return false;
         }

getAvailableAddress会获取控制台的地址

private List<InetSocketAddress> getDefaultConsoleIps() {
        List<InetSocketAddress> newAddrs = new ArrayList<InetSocketAddress>();
        try {
            String ipsStr = TransportConfig.getConsoleServer();
            if (StringUtil.isEmpty(ipsStr)) {
                RecordLog.warn("[SimpleHttpHeartbeatSender] Dashboard server address not configured");
                return newAddrs;
            }

            for (String ipPortStr : ipsStr.split(",")) {
                if (ipPortStr.trim().length() == 0) {
                    continue;
                }
                if (ipPortStr.startsWith("http://")) {
                    ipPortStr = ipPortStr.trim().substring(7);
                }
                String[] ipPort = ipPortStr.trim().split(":");
                int port = 80;
                if (ipPort.length > 1) {
                    port = Integer.parseInt(ipPort[1].trim());
                }
                newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port));
            }
        } catch (Exception ex) {
            RecordLog.warn("[SimpleHeartbeatSender] Parse dashboard list failed, current address list: " + newAddrs, ex);
            ex.printStackTrace();
        }
        return newAddrs;
    }

在配置域名的情况下,每台机器会都发一遍。但代码里写死只支持http协议
运维给每个域名都加了https,所有控制台不会显示机器。


sentinel-transport-netty-http的实现类似,只不过socket被替换成了主从reactor。性能更好。

以上就是sentient通信的原理

相关文章

网友评论

      本文标题:sentinel通信机制

      本文链接:https://www.haomeiwen.com/subject/vwtwrhtx.html