当一个client启动的时候, 客户端流程线
1.入口:AbstractApplicationContext refresh 方案
![](https://img.haomeiwen.com/i10175105/569cbb6c8580c359.png)
2.监听机制实现服务暴露
![](https://img.haomeiwen.com/i10175105/a164f55388710d30.png)
3.namingService.registerInstance : serviceName 服务名 instance 实例信息
![](https://img.haomeiwen.com/i10175105/6002305290ee9d40.png)
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(8); params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); params.put("serviceName", serviceName); params.put("clusterName", instance.getClusterName()); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); 调用api接口
- public String reqAPI(String api, Map<String, String> params, List<String> servers) {
return reqAPI(api, params, servers, HttpMethod.GET);
}
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
if (servers != null && !servers.isEmpty()) { 如果服务地址不为空
Random random = new Random(System.currentTimeMillis()); 随便获取一个节点
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method); 执行调动服务
} catch (Exception e) {
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + server, e);
}
index = (index + 1) % servers.size(); 轮询方案
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried");
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { 如果超出了重试次数 报错
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried");
}
- 发现调用远程的接口地址是 http://10.10.0.126:8848/nacos/v1/ns/instance/list
image.png
服务端
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
2. 进入ServerManager registerInstance方法
/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它 是一个ConcurrentHashMap集合
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从serviceMap中,根据namespaceId和serviceName得到一个服务对象
Service service = getService(namespaceId, serviceName);
checkServiceIsNull(service, namespaceId, serviceName);
//调用addInstance创建一个服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
3
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从serviceMap中获取服务对象
其中serviceMap是一个ConcurrentHashMap , Map(namespace, Map(group::serviceName, Service)).
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
4.
private void putServiceAndInit(Service service) throws NacosException { putService(service); //把服务信息保存到serviceMap集合
service.init(); //建立心跳检测机制
//实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),
service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
5 把服务实例添加到集合中,然后基于一致性协议进行数据的同步
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName,
ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances(); instances.setInstanceList(instanceList);
consistencyService.put(key, instances); }
}
最终把server信息放入到servicemap里面去
java```
网友评论