Nacos健康检查源码
在客户端注册服务端的时候,调用的是
InstanceController的register方法
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
//注册instance
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
调用链
ServiceManager#registerInstance
-->createEmptyService(namespaceId, serviceName, instance.isEphemeral());
-->ServiceManager#createServiceIfAbsent
-->ServiceManager#putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {
//service可能是没有instance,也可能有instance,没有是新注册的instance可能是
//第一个注册的服务,如果其他服务已经注册,那么service就存在instance
putService(service);
//健康检查
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
健康检查
public void init() {
//健康检查,利用多线程
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
一般看到这种代码就去找ClientBeatCheckTask的run方法
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
ClientBeatCheckTask的run方法
@Override
public void run() {
//集群模式用,后续回分析
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//当前时间减去上一次的心跳时间,如果小于15面则标注instance是不健康的
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
//标注instance为不健康
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//instance发生变化,会调用serviceChanged方法,下面会分析到
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//如果当前时间减掉上一次心跳时间大于30秒的话,则删除instance
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
//从容器中删除instance
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
那我们先关注getPushService().serviceChanged(service);当instance发生变化的时候,比如这里的instance没有在15秒之内发送心跳,就会触发事件
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
//PushService类监听了事件
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
PushService类监听事件
onApplicationEvent方法根据namepaceId和serviceName获取所有的client
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
遍历所有的clients,并执行udpPush方法,作用是利用udp协议发送消息,作用是让客户端发送一次心跳,判断是否健康
udpPush(ackEntry);
clients是什么时候存放到clientMap中的,是客户端定时的从服务端拉去最新的客户端列表的时候
发送udp需要ip和端口,客户端是什么把upd端口发送个服务端的
InstanceController#ObjectNode list(HttpServletRequest request)
-->return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
-->InstanceController#doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly)
--> pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);
-->PushService#addClient
--> addClient(client);
addClient方法中将client存放clientMap,其中key是namespaceId和servicename
同一个namespaceId和servicename下的所有instance,如果其中一个5秒内没有发送心跳,则利用udp的方式,通知同一个namespaceId和servicename下的所有instance发送一次心跳.
小结一下,就是客户端从幅度段拉去客户端list的时候,将udpPort和端口传给服务端,服务端将namespaceId+servicename作为key,client的map集合作为value存放进clientMap中,当其中一个instance5秒内没有心跳,就会通知clientMap中namespaceId+servicename为key的所有instnce发送一次心跳.
网友评论