美文网首页
Spring Cloud Eureka源码分析之心跳续约及自我保

Spring Cloud Eureka源码分析之心跳续约及自我保

作者: 跟着Mic学架构 | 来源:发表于2022-01-07 16:08 被阅读0次
    Eureka

    Eureka-Server是如何判断一个服务不可用的?

    Eureka是通过心跳续约的方式来检查各个服务提供者的健康状态。

    实际上,在判断服务不可用这个部分,会分为两块逻辑。

    1. Eureka-Server需要定期检查服务提供者的健康状态。
    2. Eureka-Client在运行过程中需要定期更新注册信息。

    Eureka的心跳续约机制如下图所示。

    image-20211120163209820
    1. 客户端在启动时, 会开启一个心跳任务,每隔30s向服务单发送一次心跳请求。
    2. 服务端维护了每个实例的最后一次心跳时间,客户端发送心跳包过来后,会更新这个心跳时间。
    3. 服务端在启动时,开启了一个定时任务,该任务每隔60s执行一次,检查每个实例的最后一次心跳时间是否超过90s,如果超过则认为过期,需要剔除。

    关于上述流程中涉及到的时间,可以通过以下配置来更改.

    #Server 至上一次收到 Client 的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该 Instance。
    eureka.instance.lease-expiration-duration-in-seconds=90
    # Server 清理无效节点的时间间隔,默认60000毫秒,即60秒。
    eureka.server.eviction-interval-timer-in-ms=60
    

    客户端心跳发起流程

    心跳续约是客户端发起的,每隔30s执行一次。

    DiscoveryClient.initScheduledTasks

    继续回到DiscoveryClient.initScheduledTasks方法中,

    private void initScheduledTasks() {
        //省略....
        heartbeatTask = new TimedSupervisorTask(
            "heartbeat",
            scheduler,
            heartbeatExecutor,
            renewalIntervalInSecs,
            TimeUnit.SECONDS,
            expBackOffBound,
            new HeartbeatThread()
        );
        scheduler.schedule(
            heartbeatTask,
            renewalIntervalInSecs, TimeUnit.SECONDS);
        //省略....
    }
    

    renewalIntervalInSecs=30s, 默认每隔30s执行一次。

    HeartbeatThread

    这个线程的实现很简单,调用renew()续约,如果续约成功,则更新最后一次心跳续约时间。

    private class HeartbeatThread implements Runnable {
    
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    

    renew()方法中,调用EurekaServer的"apps/" + appName + '/' + id;这个地址,进行心跳续约。

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }
    

    服务端收到心跳处理

    服务端具体为调用[com.netflix.eureka.resources]包下的InstanceResource类的renewLease方法进行续约,代码如下

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        //调用renew进行续约
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) { //如果续约失败,返回异常
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response;
        //校验客户端与服务端的时间差异,如果存在问题则需要重新发起注册
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build(); // 续约成功,返回200
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
    

    InstanceRegistry.renew

    renew的实现方法如下,主要有两个流程

    1. 从服务注册列表中找到匹配当前请求的实例
    2. 发布EurekaInstanceRenewedEvent事件
    @Override
    public boolean renew(final String appName, final String serverId,
                         boolean isReplication) {
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
            + isReplication);
        //获取所有服务注册信息
        List<Application> applications = getSortedApplications();
        for (Application input : applications) { //逐一遍历
            if (input.getName().equals(appName)) { //如果当前续约的客户端和某个服务注册信息节点相同
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) { //遍历这个服务集群下的所有节点,找到某个匹配的实例instance返回。
                    if (info.getId().equals(serverId)) {
                        instance = info; //
                        break;
                    }
                }
                //发布EurekaInstanceRenewedEvent事件,这个事件在EurekaServer中并没有处理,我们可以监听这个事件来做一些事情,比如做监控。
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                                                            instance, isReplication));
                break;
            }
        }
        return super.renew(appName, serverId, isReplication);
    }
    

    super.renew

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) { //调用父类的续约方法,如果续约成功
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); //同步给集群中的所有节点
            return true;
        }
        return false;
    }
    

    AbstractInstanceRegistry.renew

    在这个方法中,会拿到应用对应的实例列表,然后调用Lease.renew()去进行心跳续约。

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); //根据服务名字获取实例信息
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) { 
            leaseToRenew = gMap.get(id);  //获取需要续约的服务实例,
        }
        if (leaseToRenew == null) { //如果为空,说明这个服务实例不存在,直接返回续约失败
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else { //表示实例存在
            InstanceInfo instanceInfo = leaseToRenew.getHolder(); //获取实例的基本信息
            if (instanceInfo != null) { //实例基本信息不为空
                // touchASGCache(instanceInfo.getASGName());
                //获取实例的运行状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { //如果运行状态未知,也返回续约失败
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                //如果当前请求的实例信息
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    overriddenInstanceStatus.name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
    
                }
            }
            //更新上一分钟的续约数量
            renewsLastMin.increment();
            leaseToRenew.renew(); //续约
            return true;
        }
    }
    

    续约的实现,就是更新服务端最后一次收到心跳请求的时间。

    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    
    }
    

    Eureka的自我保护机制

    实际,心跳检测机制有一定的不确定行,比如服务提供者可能是正常的,但是由于网络通信的问题,导致在90s内没有收到心跳请求,那将会导致健康的服务被误杀。

    为了避免这种问题,Eureka提供了一种叫自我保护机制的东西。简单来说,就是开启自我保护机制后,Eureka Server会包这些服务实例保护起来,避免过期导致实例被剔除的问题,从而保证Eurreka集群更加健壮和稳定。

    进入自我保护状态后,会出现以下几种情况

    • Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务,如果在保护期内如果服务刚好这个服务提供者非正常下线了,此时服务消费者就会拿到一个无效的服务实例,此时会调用失败,对于这个问题需要服务消费者端要有一些容错机制,如重试,断路器等!
    • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。

    Eureka自我保护机制,通过配置 eureka.server.enable-self-preservation 来【true】打开/【false禁用】自我保护机制,默认打开状态,建议生产环境打开此配置。

    自我保护机制应该如何设计,才能更加精准的控制到“是网络异常”导致的通信延迟,而不是服务宕机呢?

    Eureka是这么做的: 如果低于85%的客户端节点都没有正常的心跳,那么Eureka Server就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护状态.

    其中,85%这个阈值,可以通过下面这个配置来设置

    # 自我保护续约百分比,默认是0.85
    eureka.server.renewal-percent-threshold=0.85
    

    但是还有个问题,超过谁的85%呢?这里有一个预期的续约数量,这个数量计算公式如下:

    //自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子
    

    假设如果有100个服务,续约间隔是30S,自我保护阈值0.85,那么它的预期续约数量为:

    自我保护阈值 =100 * 60 / 30 * 0.85 = 170。
    

    自动续约的阈值设置

    在EurekaServerBootstrap这个类的contextInitialized方法中,会调用initEurekaServerContext进行初始化

    public void contextInitialized(ServletContext context) {
        try {
            initEurekaEnvironment();
            initEurekaServerContext();
    
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            log.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }
    

    继续往下看。

    protected void initEurekaServerContext() throws Exception {
            EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
        //...
        registry.openForTraffic(applicationInfoManager, registryCount);
    }
    

    在openForTraffic方法中,会初始化expectedNumberOfClientsSendingRenews这个值,这个值的含义是: 预期每分钟收到续约的客户端数量,取决于注册到eureka server上的服务数量

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfClientsSendingRenews = count; //初始值是1.
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }
    

    updateRenewsPerMinThreshold

    接着调用updateRenewsPerMinThreshold方法,会更新一个每分钟最小的续约数量,也就是Eureka Server期望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我保护机制。

    protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }
    //自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子
    
    • getExpectedClientRenewalIntervalSeconds,客户端的续约间隔,默认为30s
    • getRenewalPercentThreshold,自我保护续约百分比阈值因子,默认0.85。 也就是说每分钟的续约数量要大于85%

    预期值的变化触发机制

    expectedNumberOfClientsSendingRenewsnumberOfRenewsPerMinThreshold 这两个值,会随着新增服务注册以及服务下线的触发而发生变化。

    PeerAwareInstanceRegistryImpl.cancel

    当服务提供者主动下线时,表示这个时候Eureka-Server要剔除这个服务提供者的地址,同时也代表这这个心跳续约的阈值要发生变化。所以在PeerAwareInstanceRegistryImpl.cancel中可以看到数据的更新

    调用路径 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel

    服务下线之后,意味着需要发送续约的客户端数量递减了,所以在这里进行修改

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
      //....
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
    }
    

    PeerAwareInstanceRegistryImpl.register

    当有新的服务提供者注册到eureka-server上时,需要增加续约的客户端数量,所以在register方法中会进行处理

    register ->super.register(AbstractInstanceRegistry)

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        //....    
        // The lease does not exist and hence it is a new registration
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to register it, increase the number of clients sending renews
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                updateRenewsPerMinThreshold();
            }
        }
    }
    

    每隔15分钟刷新自我保护阈值

    PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask

    每隔15分钟,更新一次自我保护阈值!

    private void updateRenewalThreshold() {
        try {
            // 1. 计算应用实例数
            Applications apps = eurekaClient.getApplications();
            int count = 0;
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            
            synchronized (lock) {
                // Update threshold only if the threshold is greater than the
                // current expected threshold or if self preservation is disabled.
                //当节点数量count大于最小续约数量时,或者没有开启自我保护机制的情况下,重新计算expectedNumberOfClientsSendingRenews和numberOfRenewsPerMinThreshold
                if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                    || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfClientsSendingRenews = count;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
    }
    

    自我保护机制的触发

    AbstractInstanceRegistrypostInit方法中,会开启一个EvictionTask的任务,这个任务用来检测是否需要开启自我保护机制。

    这个方法也是在EurekaServerBootstrap方法启动时触发。

    protected void postInit() {
        renewsLastMin.start(); //开启一个定时任务,用来实现每分钟的续约数量,每隔60s归0重新计算
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask()); //启动一个定时任务EvictionTask,每隔60s执行一次
        evictionTimer.schedule(evictionTaskRef.get(),
                               serverConfig.getEvictionIntervalTimerInMs(),
                               serverConfig.getEvictionIntervalTimerInMs());
    }
    

    其中,EvictionTask的代码如下。

    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
    @Override
    public void run() {
        try {
            //获取补偿时间毫秒数
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {
            logger.error("Could not run the evict task", e);
        }
    }
    

    evict方法

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
         // 是否需要开启自我保护机制,如果需要,那么直接RETURE, 不需要继续往下执行了
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
    
        //这下面主要是做服务自动下线的操作的。
    }
    

    isLeaseExpirationEnabled

    • 是否开启了自我保护机制,如果没有,则跳过,默认是开启
    • 计算是否需要开启自我保护,判断最后一分钟收到的续约数量是否大于numberOfRenewsPerMinThreshold
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }
    

    相关文章

      网友评论

          本文标题:Spring Cloud Eureka源码分析之心跳续约及自我保

          本文链接:https://www.haomeiwen.com/subject/mdijcrtx.html