美文网首页
Nacos健康检查源码

Nacos健康检查源码

作者: 念䋛 | 来源:发表于2021-10-01 23:24 被阅读0次

    Nacos健康检查源码
    在客户端注册服务端的时候,调用的是
    InstanceController的register方法

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //注册instance
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
    

    调用链
    ServiceManager#registerInstance
    -->createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    -->ServiceManager#createServiceIfAbsent
    -->ServiceManager#putServiceAndInit

    private void putServiceAndInit(Service service) throws NacosException {
    //service可能是没有instance,也可能有instance,没有是新注册的instance可能是
    //第一个注册的服务,如果其他服务已经注册,那么service就存在instance 
        putService(service);
    //健康检查
        service.init();
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }
    
    

    健康检查

    public void init() {
    //健康检查,利用多线程
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
    
    

    一般看到这种代码就去找ClientBeatCheckTask的run方法

    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }
    
    

    ClientBeatCheckTask的run方法

    @Override
    public void run() {
    //集群模式用,后续回分析
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {
    //当前时间减去上一次的心跳时间,如果小于15面则标注instance是不健康的
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
    //标注instance为不健康
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
    //instance发生变化,会调用serviceChanged方法,下面会分析到                       
    getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                //如果当前时间减掉上一次心跳时间大于30秒的话,则删除instance
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
    //从容器中删除instance
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }
    
    

    那我们先关注getPushService().serviceChanged(service);当instance发生变化的时候,比如这里的instance没有在15秒之内发送心跳,就会触发事件

    public void serviceChanged(Service service) {
        // merge some change events to reduce the push frequency:
        if (futureMap
                .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        //PushService类监听了事件
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }
    
    

    PushService类监听事件
    onApplicationEvent方法根据namepaceId和serviceName获取所有的client

    ConcurrentMap<String, PushClient> clients = clientMap
            .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
    
    

    遍历所有的clients,并执行udpPush方法,作用是利用udp协议发送消息,作用是让客户端发送一次心跳,判断是否健康

    udpPush(ackEntry);
    

    clients是什么时候存放到clientMap中的,是客户端定时的从服务端拉去最新的客户端列表的时候

    发送udp需要ip和端口,客户端是什么把upd端口发送个服务端的
    InstanceController#ObjectNode list(HttpServletRequest request)
    -->return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
    -->InstanceController#doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly)
    --> pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);
    -->PushService#addClient
    --> addClient(client);
    addClient方法中将client存放clientMap,其中key是namespaceId和servicename
    同一个namespaceId和servicename下的所有instance,如果其中一个5秒内没有发送心跳,则利用udp的方式,通知同一个namespaceId和servicename下的所有instance发送一次心跳.
    小结一下,就是客户端从幅度段拉去客户端list的时候,将udpPort和端口传给服务端,服务端将namespaceId+servicename作为key,client的map集合作为value存放进clientMap中,当其中一个instance5秒内没有心跳,就会通知clientMap中namespaceId+servicename为key的所有instnce发送一次心跳.

    相关文章

      网友评论

          本文标题:Nacos健康检查源码

          本文链接:https://www.haomeiwen.com/subject/islpnltx.html