美文网首页
Nacos源码分析

Nacos源码分析

作者: 刘大平 | 来源:发表于2021-06-30 16:00 被阅读0次

当一个client启动的时候, 客户端流程线

1.入口:AbstractApplicationContext refresh 方案

image.png
2.监听机制实现服务暴露
image.png
3.namingService.registerInstance : serviceName 服务名 instance 实例信息
image.png
  1.  NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
         namespaceId, serviceName, instance);
    
     final Map<String, String> params = new HashMap<String, String>(8);
     params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
     params.put("ip", instance.getIp());
     params.put("port", String.valueOf(instance.getPort()));
     params.put("weight", String.valueOf(instance.getWeight()));
     params.put("enable", String.valueOf(instance.isEnabled()));
     params.put("healthy", String.valueOf(instance.isHealthy()));
     params.put("metadata", JSON.toJSONString(instance.getMetadata()));
     params.put("serviceName", serviceName);
     params.put("clusterName", instance.getClusterName());
    
     reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); 调用api接口
    
  2. public String reqAPI(String api, Map<String, String> params, List<String> servers) {
    return reqAPI(api, params, servers, HttpMethod.GET);
    }
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {

    params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
        throw new IllegalArgumentException("no server available");
    }

    if (servers != null && !servers.isEmpty()) { 如果服务地址不为空

        Random random = new Random(System.currentTimeMillis()); 随便获取一个节点
        int index = random.nextInt(servers.size());

        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                return callServer(api, params, server, method); 执行调动服务
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + server, e);
            }

            index = (index + 1) % servers.size(); 轮询方案
        }

        throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried");
    }

    for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { 如果超出了重试次数 报错
        try {
            return callServer(api, params, nacosDomain);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
        }
    }

    throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried");

}
  1. 发现调用远程的接口地址是 http://10.10.0.126:8848/nacos/v1/ns/instance/list
    image.png

服务端


    /**
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

2. 进入ServerManager  registerInstance方法

/**
     * Register an instance to a service in AP mode.
     *
     * <p>This method creates service or cluster silently if they don't exist.
     *
     * @param namespaceId id of namespace
     * @param serviceName service name
     * @param instance    instance to register
     * @throws Exception any error occurred in the process
     */
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它 是一个ConcurrentHashMap集合
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //从serviceMap中,根据namespaceId和serviceName得到一个服务对象
        Service service = getService(namespaceId, serviceName);
        
        checkServiceIsNull(service, namespaceId, serviceName);
        //调用addInstance创建一个服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

3 
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
//从serviceMap中获取服务对象
其中serviceMap是一个ConcurrentHashMap , Map(namespace, Map(group::serviceName, Service)).
    
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

4. 
private void putServiceAndInit(Service service) throws NacosException { putService(service); //把服务信息保存到serviceMap集合
service.init(); //建立心跳检测机制 

//实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro 
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),
service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

5 把服务实例添加到集合中,然后基于一致性协议进行数据的同步
 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName,
ephemeral);
    Service service = getService(namespaceId, serviceName);
    synchronized (service) {
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances(); instances.setInstanceList(instanceList);
consistencyService.put(key, instances); }
}
最终把server信息放入到servicemap里面去
java```


    


相关文章

网友评论

      本文标题:Nacos源码分析

      本文链接:https://www.haomeiwen.com/subject/pdtcultx.html