美文网首页
深入理解Eureka-Client 发送心跳(三)

深入理解Eureka-Client 发送心跳(三)

作者: sharedCode | 来源:发表于2018-08-08 10:23 被阅读0次

    DiscoverClient

    com.netflix.discovery.DiscoveryClient ,使用的@Inject //google guice 注入遵循 JSR-330规范

    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
         
        // 省略N多代码
        // 初始化定时器信息
        initScheduledTasks();
        
    }
    private void initScheduledTasks() {
        // 省略N多代码。。。
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
     
            // 在这里,初始化一个定时器任务
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
     
            // 省略N多代码。。。
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    由上可以看出,在DiscoverClient这个类初始化的时候,会初始化定期任务,每30秒执行一次,用来发送心跳

    HeartbeatThread

    这个是用来续约的线程,主要看其run方法,

    private class HeartbeatThread implements Runnable {
     
        public void run() {
            if (renew()) {
                // 更新最后一次心跳的时间
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    // 续约的主方法
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                return register();
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
            return false;
        }
    }
    

    上面的代码很简单,主要就是启动一个线程,然后线程执行renew()方法, 最终发送心跳给Eureka-Server

    接口地址: apps/ + appName + /' + id ,

    如果接口返回值为404,就是说不存在,从来没有注册过,那么重新走注册流程

    lastDirtyTimestamp

    即该instance在client端最后被修改的时间戳

    Eureka-Server接收心跳

    InstanceResource

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
        // 续约失败
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response = null;
        // 比较lastDirtyTimestamp 
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            // 比较lastDirtyTimestamp的大小,这个还是比较重要的
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
        return response;
    }
     
     
     
     
     
     
    private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
                                            boolean isReplication) {
        // 获取本机的instance实例信息
        InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
        if (appInfo != null) {
            //如果lastDirtyTimestamp不为空,并且lastDirtyTimestamp和本地的不相等
            if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
                Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
                // lastDirtyTimestamp>本地的时间,则认为当前实例是无效的,返回404错误,客户端重新发起注册
                if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
                    logger.debug(
                            "Time to sync, since the last dirty timestamp differs -"
                                    + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                            args);
                    return Response.status(Status.NOT_FOUND).build();
                } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
                    // 如果是集群同步请求,本地的时间,大于客户端传过来的时间,则返回 “冲突” 这个状态回去,以本地的时间大的为准
                    if (isReplication) {
                        logger.debug(
                                "Time to sync, since the last dirty timestamp differs -"
                                        + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
                                args);
                        return Response.status(Status.CONFLICT).entity(appInfo).build();
                    } else {
                        return Response.ok().build();
                    }
                }
            }
     
        }
        return Response.ok().build();
    }
    

    代码说明:

    1.lastDirtyTimestamp 是客户端向服务端发请求的版本号 , 一切请求都以版本号大的为准。, 如: 注册

    2.在调用续约的方法之后,Eureka Server 会对请求过来的lastDirtyTimestamp和本地的做对比, 如果

    请求lastDirtyTimestamp>本地的时间,则认为当前实例是无效的,返回404错误,客户端重新发起注册。

    3.如果是集群同步请求,本地的时间,大于其他Eureka Server传过来的时间,则返回 “冲突” 这个状态回去,

    以本地的时间大的为准,注意是集群同步请求,如果是客户端传过的,是不会有这个规则的。

    应用续约

    //PeerAwareInstanceRegistryImpl.java
    public boolean renew(final String appName, final String id, final boolean isReplication) {
        // 执行续约操作
        if (super.renew(appName, id, isReplication)) {
            // 同步Eureka-Server集群
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    
    
    //AbstractInstanceRegistry.java
    public boolean renew(String appName, String id, boolean isReplication) {
        // 增加续约次数到统计枚举
        RENEW.increment(isReplication);
        // 从Eureka-Server端本地的CurrentHashMap中,通过appName获取Lease信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        // lease为空,lease在第一次注册的时候会创建,为空,则表示从来没有注册过,租约不存在
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取lease里面的instance信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                // 一系列状态判断,目前还不是很清楚,但是不影响主流程
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            // 设置每分钟的续约次数
            renewsLastMin.increment();
            // 续约
            leaseToRenew.renew();
            return true;
        }
    }
    

    从上面可以看到整个续约过程,主要就是从本地的CurrentHashMap中获取租约信息, 获取到了之后,设置每分钟的续约次数以及续约时间。
    renewsLastMin.increment(), 这个里面。主要是更新一个currentBucket的变量,类型为AtomicLong , 同时有个定时器一分钟去更新一次。一分钟之后,这个值会重新设置为0 。

    leaseToRenew.renew() , 更新lastUpdateTimestamp, duration默认为90秒

    //Lease.java
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
     
    }
    

    总结:

    在一下三种情况,续约是返回404 , 需要客户端重新发起注册的。

    1.当客户端的lastDirtyTimestamp> 大于服务端的instance的lastDirtyTimestamp时候,会认为服务端

    的信息是无效的,因此无法续约,需要重新发起注册请求。

    2.服务端的注册信息不存在

    3.服务端的instance的status = UNKONW, 为什么会出现UNKONW这个状态呢,因为在deleteStatusOverride

    的时候存在传入UNKONW的可能性。

    相关文章

      网友评论

          本文标题:深入理解Eureka-Client 发送心跳(三)

          本文链接:https://www.haomeiwen.com/subject/rhstbftx.html