美文网首页
sentinel通信机制

sentinel通信机制

作者: 自由人_ded7 | 来源:发表于2020-03-06 16:08 被阅读0次

    背景

    sentinel作为开源的限流组件,官方的介绍比较简单。但实际使用起来会有各种问题,需要看源码解决。之前写过一篇sentinel源码解析的文章,介绍了一些核心类的工作原理。接下来会分模块展开源码学习分享。
    本次分享是sentinel通信机制。

    如何使用

    官方对sentinel的使用描述非常简单

    1.在客户端引入依赖

    <dependency>
        <groupId>com.alibaba.csp</groupId>
        <artifactId>sentinel-transport-simple-http</artifactId>
        <version>x.y.z</version></dependency>
    

    2.启动客户端应用

    启动时加入 JVM 参数 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口。若启动多个应用,则需要通过 -Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是 8719)。

    然而并不是那么顺利

    问题来了

    按github上的说明进行试验,控制台上并没有出现客户端的应用名称

    我们刚才引入了sentinel-transport-simple-http这个包,看这个包名像是通信依赖的,于是找到源码里这个包所在的位置。

    sentinel-transport下的
    sentinel-transport-simple-httpsentinel-transport-netty-http
    都依赖sentinel-transport-common


    sentinel-transport-common下有两个spi

      com.alibaba.csp.sentinel.command.CommandHandler下是一系列客户端暴露的api,用于服务端获取簇点链路、监控数据等
       com.alibaba.csp.sentinel.init.InitFunc下是spi的初始化,这是客户端和控制台通信的关键部分。两个
       CommandCenterInitFunc: 客户端发命令
       HeartbeatSenderInitFunc:启动客户端socket
    
    

    CommandCenterInitFunc源码

    
    
    
    public class SimpleHttpCommandCenter implements CommandCenter {
    
       
        @Override
        @SuppressWarnings("rawtypes")
        public void beforeStart() throws Exception {
            // Register handlers
            Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
            registerCommands(handlers);
        }
    
        @Override
        public void start() throws Exception {
            省略不关键代码
            ...
            Runnable serverInitTask = new Runnable() {
                int port;
    
                {
                    try {
                        //获取本地socket的port
                        port = Integer.parseInt(TransportConfig.getPort());
                    } catch (Exception e) {
                        port = DEFAULT_PORT;
                    }
                }
    
                @Override
                public void run() {
                    boolean success = false;
                    ServerSocket serverSocket = getServerSocketFromBasePort(port);
    
                    if (serverSocket != null) {
                        CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                        socketReference = serverSocket;
                        executor.submit(new ServerThread(serverSocket));
                        success = true;
                        port = serverSocket.getLocalPort();
                    } else {
                        CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
                    }
    
                    if (!success) {
                        port = PORT_UNINITIALIZED;
                    }
    
                    TransportConfig.setRuntimePort(port);
                    executor.shutdown();
                }
    
            };
    
            new Thread(serverInitTask).start();
        }
    
        /**
         * Get a server socket from an available port from a base port.<br>
         * Increasing on port number will occur when the port has already been used.
         *
         * @param basePort base port to start
         * @return new socket with available port
         */
        private static ServerSocket getServerSocketFromBasePort(int basePort) {
            int tryCount = 0;
            while (true) {
                try {
                    // 端口如果被占用,自动重试下个端口
                    ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
                    server.setReuseAddress(true);
                    return server;
                } catch (IOException e) {
                    tryCount++;
                    try {
                        TimeUnit.MILLISECONDS.sleep(30);
                    } catch (InterruptedException e1) {
                        break;
                    }
                }
            }
            return null;
        }
    
     
        class ServerThread extends Thread {
    
            private ServerSocket serverSocket;
    
            ServerThread(ServerSocket s) {
                this.serverSocket = s;
                setName("sentinel-courier-server-accept-thread");
            }
    
            @Override
            public void run() {
                while (true) {
                    Socket socket = null;
                    try {
                        socket = this.serverSocket.accept();
                        setSocketSoTimeout(socket);
                        HttpEventTask eventTask = new HttpEventTask(socket);
                        bizExecutor.submit(eventTask);
                    } catch (Exception e) {
                        CommandCenterLog.info("Server error", e);
                        if (socket != null) {
                            try {
                                socket.close();
                            } catch (Exception e1) {
                                CommandCenterLog.info("Error when closing an opened socket", e1);
                            }
                        }
                        try {
                            // In case of infinite log.
                            Thread.sleep(10);
                        } catch (InterruptedException e1) {
                            // Indicates the task should stop.
                            break;
                        }
                    }
                }
            }
        }
      
    }
    
    
    

    客户端会启动一个socket来监听httpEvent,让服务端读取监控数据。如果配置了jvm参数 csp.sentinel.api.port,以配置为准,否则默认8719端口。创建socket时,如果失败,获取端口失败则自动重试

    由于测试环境当时是docker,没法获取未暴露的端口,所以监控数据拿不到。后来让运维传端口的方式解决这个问题


    SimpleHttpHeartbeatSender 用于向服务端发送心跳,注册机器信息.sendHeartbeat方法用于
    sendHeartbeat只在HeartbeatSenderInitFunc这个spi加载的时候被初始化。并且5000ms发一次心跳

    public class SimpleHttpHeartbeatSender implements HeartbeatSender {
         public boolean sendHeartbeat() throws Exception {
                 if (TransportConfig.getRuntimePort() <= 0) {
                     RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
                     return false;
                 }
                 InetSocketAddress addr = getAvailableAddress();
                 if (addr == null) {
                     return false;
                 }
                 SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
                 request.setParams(heartBeat.generateCurrentMessage());
                 try {
                     SimpleHttpResponse response = httpClient.post(request);
                     if (response.getStatusCode() == OK_STATUS) {
                         return true;
                     }
                 } catch (Exception e) {
                     RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
                 }
                 return false;
             }
    
    

    getAvailableAddress会获取控制台的地址

    private List<InetSocketAddress> getDefaultConsoleIps() {
            List<InetSocketAddress> newAddrs = new ArrayList<InetSocketAddress>();
            try {
                String ipsStr = TransportConfig.getConsoleServer();
                if (StringUtil.isEmpty(ipsStr)) {
                    RecordLog.warn("[SimpleHttpHeartbeatSender] Dashboard server address not configured");
                    return newAddrs;
                }
    
                for (String ipPortStr : ipsStr.split(",")) {
                    if (ipPortStr.trim().length() == 0) {
                        continue;
                    }
                    if (ipPortStr.startsWith("http://")) {
                        ipPortStr = ipPortStr.trim().substring(7);
                    }
                    String[] ipPort = ipPortStr.trim().split(":");
                    int port = 80;
                    if (ipPort.length > 1) {
                        port = Integer.parseInt(ipPort[1].trim());
                    }
                    newAddrs.add(new InetSocketAddress(ipPort[0].trim(), port));
                }
            } catch (Exception ex) {
                RecordLog.warn("[SimpleHeartbeatSender] Parse dashboard list failed, current address list: " + newAddrs, ex);
                ex.printStackTrace();
            }
            return newAddrs;
        }
    
    

    在配置域名的情况下,每台机器会都发一遍。但代码里写死只支持http协议
    运维给每个域名都加了https,所有控制台不会显示机器。


    sentinel-transport-netty-http的实现类似,只不过socket被替换成了主从reactor。性能更好。

    以上就是sentient通信的原理

    相关文章

      网友评论

          本文标题:sentinel通信机制

          本文链接:https://www.haomeiwen.com/subject/vwtwrhtx.html