美文网首页
nacos客户端和服务端心跳源码

nacos客户端和服务端心跳源码

作者: 念䋛 | 来源:发表于2021-10-02 18:26 被阅读0次

在客户端注册源码的时候,我们提到了如何将客户端注册进fuwudua端的,是利用监听器监听事件调用了
AbstractAutoServiceRegistration的onApplicationEvent方法

public void onApplicationEvent(WebServerInitializedEvent event) {
   bind(event);
}

经过一系列的调用到NacosNamingService的registerInstance方法

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
//这里就是心跳注册的方法,也是利用线程池
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

addBeatInfo方法

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
//利用定时线程池调用,心跳方法,beatTask是任务,beatInfo.getPeriod()下一次调用的时间
//间隔
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

既然是线程池的话,就需要关注任务的run方法,每5秒发送心跳,如果服务端返回,没有该客户端,则重新发送注册接口

@Override
public void run() {
    if (beatInfo.isStopped()) {
        return;
    }
    long nextTime = beatInfo.getPeriod();
    try {
//点进去,就会发现我们经常看到的reqApi发送http请求的方法,路径为/instance/beat

        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        }
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) {
            nextTime = interval;
        }
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) {
            code = result.get(CommonParams.CODE).asInt();
        }
//如果发送心跳,服务端返回没有你这个客户端,服务端会自动的注册instance,节省了一次客//户端重新发起注册请求的流程,节省网络传输,但是如果服务端发现发送的信息不包含主要信息,
//比如端口 ip 是否是临时节点等信息,那就需要客户端重新调用注册接口,将该客户端注册进
//服务端

        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try {
                serverProxy.registerService(beatInfo.getServiceName(),
                        NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
            } catch (Exception ignore) {
            }
        }
    } catch (NacosException ex) {
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
        
    }
//下一次心跳发送    
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}

客户端心跳还是比较简单的.
服务端心跳源码
我们都知道服务端,我们都是关心InstanceController类,这个也不例外

@CanDistro
@PutMapping("/beat")
//这个注解是nacos的注解,并在AuthFilter中使用,看这个方法是否有权限调用
//但是我没有发现AuthFilter过滤器添加到security中
//NacosAuthConfig类是关于secutiry配置的,并没有添加AuthFilter过滤器,
//如果读者发现了@Secured作用可以指出来,共同交流
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
    //这里从request里面获取各种参数
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    
    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
//根据request获取的参数,从serviceMap中获取是否存在instance
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    //如果不存在
    if (instance == null) {
//客户端的信息也为空,直接返回,不存在,因为如果instance不存在,服务端可以重新的
//注册一下,但是客户端传过来的消息不包含主要的信息,比如ip 端口 是否临时等信息
//服务端也是没办法注册的,只能返回不存在,让客户端重新注册一下
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        
        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //如果instanc为null,证明没有注册,则注册一下
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    
    Service service = serviceManager.getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
//这里也是用的定时任务线程,只不过是立即执行,因为延时为0毫秒
//并且判断如果是不健康的,设置为健康,并且设置了最后的心跳时间,用于健康检查
    service.processClientBeat(clientBeat);
    
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

服务端的源码还是比较简单的,这里也没有过多的分析.

相关文章

网友评论

      本文标题:nacos客户端和服务端心跳源码

      本文链接:https://www.haomeiwen.com/subject/oqdmnltx.html