美文网首页spring cloud alibaba
nacos-服务注册发现源码分析-注册请求接口-03

nacos-服务注册发现源码分析-注册请求接口-03

作者: 愤怒的奶牛 | 来源:发表于2020-08-08 15:10 被阅读0次

    nacos-服务注册发现源码分析-启动注册-02分析了nacos在springboot 启动后,通过事件机制,通知到启动事件监听器,在事件监听器里面完成了服务的注册。现在我们来分析一下,服务注册的具体访问接口。

    • NacosServiceRegistry 服务注册接口
    /**
     * @author xiaojing
     * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
     */
    public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    
        private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);
    
        private final NacosDiscoveryProperties nacosDiscoveryProperties;
    
        private final NamingService namingService;
    
        public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
            this.nacosDiscoveryProperties = nacosDiscoveryProperties;
            this.namingService = nacosDiscoveryProperties.namingServiceInstance(); // 依赖这个接口实现服务注册请求
        }
            // 服务注册
        @Override
        public void register(Registration registration) {
    
            if (StringUtils.isEmpty(registration.getServiceId())) {
                log.warn("No service to register for nacos client...");
                return;
            }
    
            String serviceId = registration.getServiceId();
            String group = nacosDiscoveryProperties.getGroup();
    
            Instance instance = getNacosInstanceFromRegistration(registration);
    
            try {
                namingService.registerInstance(serviceId, group, instance); // 注册接口
                log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                        instance.getIp(), instance.getPort());
            }
            catch (Exception e) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                        registration.toString(), e);
                // rethrow a RuntimeException if the registration is failed.
                // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
                rethrowRuntimeException(e);
            }
        }
    
    • this.namingService = nacosDiscoveryProperties.namingServiceInstance(); 实例化
    public NamingService namingServiceInstance() {
    
            if (null != namingService) {
                return namingService;
            }
    
            try {  // 继续看这个方法
                namingService = NacosFactory.createNamingService(getNacosProperties());
            }
            catch (Exception e) {
                log.error("create naming service error!properties={},e=,", this, e);
                return null;
            }
            return namingService;
        }
    
    • namingService = NacosFactory.createNamingService(getNacosProperties());
    /**
         * Create naming service
         *
         * @param properties init param
         * @return Naming
         * @throws NacosException Exception
         */
        public static NamingService createNamingService(Properties properties) throws NacosException {
            return NamingFactory.createNamingService(properties);
        }
    
        public static NamingService createNamingService(Properties properties) throws NacosException {
            try { // 创建的对象
                Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
                Constructor constructor = driverImplClass.getConstructor(Properties.class);
                NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
                return vendorImpl;
            } catch (Throwable e) {
                throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
            }
        }
    

    com.alibaba.nacos.client.naming.NacosNamingService 从上面的方法我们可以看出 this.namingService = nacosDiscoveryProperties.namingServiceInstance(); 这里的namingService 就是com.alibaba.nacos.client.naming.NacosNamingService。ok ,回国头去看看这句话namingService.registerInstance(serviceId, group, instance); 也就是说这个方法就是 com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(serviceId, group, instance);

    • registerInstance(serviceId, group, instance);
    .......
    private NamingProxy serverProxy; //代理对象,这个对象就是发送底层http请求的
    
        public NacosNamingService(String serverList) { //构造
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    
            init(properties);
        }
    
        public NacosNamingService(Properties properties) { // 构造
            init(properties); // 调用下面的方法
        }
    
        private void init(Properties properties) {
            namespace = InitUtils.initNamespaceForNaming(properties);
            initServerAddr(properties);
            InitUtils.initWebRootContext();
            initCacheDir();
            initLogName(properties);
    
            eventDispatcher = new EventDispatcher();
            serverProxy = new NamingProxy(namespace, endpoint, serverList); // 初始化 代理对象`serverList` 注册中心服务地址集合,也就是nacos 服务端集合
            serverProxy.setProperties(properties);
            beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
            hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
        }
    // 负责服务注册
    @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    
            if (instance.isEphemeral()) {
                BeatInfo beatInfo = new BeatInfo();
                beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
                beatInfo.setIp(instance.getIp());
                beatInfo.setPort(instance.getPort());
                beatInfo.setCluster(instance.getClusterName());
                beatInfo.setWeight(instance.getWeight());
                beatInfo.setMetadata(instance.getMetadata());
                beatInfo.setScheduled(false);
                long instanceInterval = instance.getInstanceHeartBeatInterval();
                beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
    // 添加心跳
                beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
            }
    // 我们看到这里,就知道注册是继续调用这句话
            serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
        }
    
    • NamingProxy http 服务代理 的registerService
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                namespaceId, serviceName, instance);
    // 组装请求参数
            final Map<String, String> params = new HashMap<String, String>(9);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp()); //服务的ip
            params.put("port", String.valueOf(instance.getPort())); // 端口
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    // 发送服务注册请求 post 请求
            reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    
        }
    
    • reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {
    
            List<String> snapshot = serversFromEndpoint;
            if (!CollectionUtils.isEmpty(serverList)) {
                snapshot = serverList;
            }
    
            return reqAPI(api, params, snapshot, method); // 发送请求
        }
    
    • reqAPI(api, params, snapshot, method);
    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
    
            params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
    
            if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
                throw new IllegalArgumentException("no server available");
            }
    
            Exception exception = new Exception();
    
            if (servers != null && !servers.isEmpty()) {
    
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size()); // 随机取一个服务地址进行注册
    
                for (int i = 0; i < servers.size(); i++) { 
                    String server = servers.get(index);
                    try {
                        return callServer(api, params, server, method); // 如果注册失败,就执行下面的逻辑,记录日志,并取 下一个服务地址,直到注册成功,return
                    } catch (NacosException e) {
                        exception = e;
                        NAMING_LOGGER.error("request {} failed.", server, e);
                    } catch (Exception e) {
                        exception = e;
                        NAMING_LOGGER.error("request {} failed.", server, e);
                    }
    // 如果 上面 注册失败,就再取一个服务地址进行注册
                    index = (index + 1) % servers.size();
                }
    
                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
                    + exception.getMessage());
            }
    
            for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
                try {
                    return callServer(api, params, nacosDomain);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
                }
            }
    
            throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
    
        }
    

    上面代码是服务注册的逻辑,大致的意思是:注册请求随机发送的服务器,如果第一次就注册成功,就return,否则继续遍历服务列表,继续注册,直到注册成功。下面看看callServer

    • callServer
     public String callServer(String api, Map<String, String> params, String curServer, String method)
            throws NacosException {
            long start = System.currentTimeMillis();
            long end = 0;
            checkSignature(params);
            List<String> headers = builderHeaders();
    
            String url;
            if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
                url = curServer + api; // 请求地址
            } else {
                if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
                    curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
                }
                url = HttpClient.getPrefix() + curServer + api;
            }
    // 发送http 请求
            HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
            end = System.currentTimeMillis();
    
            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
                .observe(end - start);
    
            if (HttpURLConnection.HTTP_OK == result.code) {
                return result.content; // 请求成功,返回数据
            }
    
            if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
                return StringUtils.EMPTY;
            }
    
            throw new NacosException(NacosException.SERVER_ERROR, "failed to req API:"
                + curServer + api + ". code:"
                + result.code + " msg: " + result.content);
        }
    
    • 请求接口UtilAndComs.NACOS_URL_INSTANCE
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
            NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
                namespaceId, serviceName, instance);
    
            final Map<String, String> params = new HashMap<String, String>(9);
            params.put(CommonParams.NAMESPACE_ID, namespaceId);
            params.put(CommonParams.SERVICE_NAME, serviceName);
            params.put(CommonParams.GROUP_NAME, groupName);
            params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
            params.put("ip", instance.getIp());
            params.put("port", String.valueOf(instance.getPort()));
            params.put("weight", String.valueOf(instance.getWeight()));
            params.put("enable", String.valueOf(instance.isEnabled()));
            params.put("healthy", String.valueOf(instance.isHealthy()));
            params.put("ephemeral", String.valueOf(instance.isEphemeral()));
            params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    // url = UtilAndComs.NACOS_URL_INSTANCE
            reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
    
        }
    
    • UtilAndComs.NACOS_URL_INSTANCE
        public static String WEB_CONTEXT = "/nacos";
    
        public static String NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";
    
        public static String NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";
    

    /nacos/v1/ns/instance 接口地址,请求方式:POST。
    ok,到这里我们就分析完了,最底层服务注册请求逻辑。主要有:1.随机将服务注册到一个服务器,直到注册成功。2. 请求接口是:/nacos/v1/ns/instance,请求方式是post,通信协议采用http 协议。

    • 总结:
      我们使用nacos 的时候,发现我们服务一启动后,就会注册服务到注册中心,于是我们就分析了一下,nacos 服务注册的逻辑,主要有一下几点:
    1. 服务启动就注册,原因是 springboot 启动后发送启动事件,然后事件监听器里发送服务注册请求。
    2. 服务底层通信,采用http 协议
    3. 服务注册的时候,随机注册到一个nacos 服务器,如果第一次注册失败,就继续取下一服务器地址,发送请求进行注册,直到注册成功,便返回。
      注意:有一段代码没有分析:
    for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
                try {
                    return callServer(api, params, nacosDomain);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
                }
            }
    
            throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
    

    如果有兴趣的可以 nacosDomain 取得是啥值。上面的逻辑是没有获取到nacos 服务器地址时的执行逻辑。

    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
    
            params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
    
            if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
                throw new IllegalArgumentException("no server available");
            }
    
            Exception exception = new Exception();
    
            if (servers != null && !servers.isEmpty()) { // 服务器地址不为空
    
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());
    
                for (int i = 0; i < servers.size(); i++) {
                    String server = servers.get(index);
                    try {
                        return callServer(api, params, server, method);
                    } catch (NacosException e) {
                        exception = e;
                        NAMING_LOGGER.error("request {} failed.", server, e);
                    } catch (Exception e) {
                        exception = e;
                        NAMING_LOGGER.error("request {} failed.", server, e);
                    }
    
                    index = (index + 1) % servers.size();
                }
    
                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
                    + exception.getMessage());
            }
    // 服务端地址为空的时候,执行这段逻辑
            for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { // 循环注册3次
                try {
                    return callServer(api, params, nacosDomain);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
                }
            }
    
            throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
    
        }
    

    相关文章

      网友评论

        本文标题:nacos-服务注册发现源码分析-注册请求接口-03

        本文链接:https://www.haomeiwen.com/subject/ejlhdktx.html