美文网首页
eureka(二)-客户端之服务续约和注册

eureka(二)-客户端之服务续约和注册

作者: Olla_0632 | 来源:发表于2020-03-17 17:02 被阅读0次

    Eureka源码分析(2.1.4.Release)

    首先源码切忌一行一行debug,需先了解eureka主要功能后,再分析其功能如何实现。

    image.png
    由于上一篇文章提及到客户端如何从注册中心中获取服务列表eureka(一)-功能介绍与客户端之服务获取

    客户端之服务续约

    服务续约任务与服务列表获取一样,都是通过TimedSupervisorTask实现动态时间间隔执行任务。TimedSupervisorTask是如何实现动态间隔时间执行任务的,可看上文提到的文章。

    继续从上文提到的initScheduledTasks出发:
    com.netflix.discovery.DiscoveryClient#initScheduledTasks:

    ……上部分代码为客户端获取服务列表任务……
    if (clientConfig.shouldRegisterWithEureka()) {
                //eureka.instance.leaseRenewallIntervalInSeconds=30,默认每30秒执行心跳续约
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                //eureka.client.heartbeatExecutorExponentialBackOffBound=10,如果发生超时的重试延迟的最大乘数。
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
                //定义个名为heartbeat的线程,执行心跳续约任务
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    ……下部分代码为实例更新并通知eureka注册中心任务……
    
    

    由于TimedSupervisorTask在上篇文章已有介绍,现在则直接查看续约线程逻辑

    com.netflix.discovery.DiscoveryClient.HeartbeatThread:

    image.png

    com.netflix.discovery.DiscoveryClient.HeartbeatThread#run:

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
    

    com.netflix.discovery.DiscoveryClient#renew:

    boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                //发起heartbeat请求
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                //如果返回的是无法找到,则重新注册
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    //设置该实例的dirty标志位true,并更新dirty时间
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    //重新注册
                    boolean success = register();
                    //如果成功,则将实例ditry的标志改为false
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    

    实例的dirty标志isInstanceInfoDirty的作用:在执行服务注册任务时,如果dirty标志为true,则将实例重新注册到远程eureka注册中心。

    客户端之服务注册

    还是从上文提到的initScheduledTasks出发:
    com.netflix.discovery.DiscoveryClient#initScheduledTasks:

            if (clientConfig.shouldRegisterWithEureka()) {
                ……忽略心跳续约代码……
                // InstanceInfo replicator
                //实例更新后,需要将更新信息发送到远程eureka中心的线程,其实现了Runnable接口
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),//instanceInfoReplicationIntervalSeconds=30,默认每30秒检查是否有更新,并注册到注册中心
                        2); //当前时间允许的最大请求数
               //状态监听器
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
          
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        //状态更新后,立即更新到eureka注册中心(这里涉及到令牌限流算法)
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
                //通过ApplicationInfoManager进行的本地状态更新将立即触发对远程eureka服务器的注册/更新(速率受限)。默认为true
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
          //eureka.client.initialInstanceInfoReplicationIntervalSeconds=40s,默认延时40s,实例更新或注册线程启动      
      instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
    
    

    从上面的代码可以知道,instanceInfoReplicator的run方法实现了具体的逻辑代码
    com.netflix.discovery.InstanceInfoReplicator#start:

        public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                //第一次启动:设置dirty标志为true,触发第一次注册。
                instanceInfo.setIsDirty(); 
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    com.netflix.discovery.InstanceInfoReplicator#run:

    public void run() {
            try {
                //检查实例是否需要更新,如果需要更新,设置dirty标志为true,更新dirty时间。
                discoveryClient.refreshInstanceInfo();
               //如果dirty标志为true,则返回dirty时间,如果标志为false则返回null
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    //dirty时间不为null,则需要更新到注册中心,重新注册
                    discoveryClient.register();
                    //设置dirty标志为false
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                //replicationIntervalSeconds秒后(即30秒),又重新执行该方法。
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    com.netflix.discovery.DiscoveryClient#refreshInstanceInfo:

    /**
         * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
         * isDirty flag on the instanceInfo is set to true
         */
        void refreshInstanceInfo() {
            applicationInfoManager.refreshDataCenterInfoIfRequired();
            applicationInfoManager.refreshLeaseInfoIfRequired();
    
            InstanceStatus status;
            try {
                status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
            } catch (Exception e) {
                logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
                status = InstanceStatus.DOWN;
            }
    
            if (null != status) {
                applicationInfoManager.setInstanceStatus(status);
            }
        }
    

    refreshInstanceInfo方法具体实现流程图如下:


    image.png

    服务注册任务整体流程如下:


    image.png

    附加算法介绍:令牌桶限流算法(简要)

    instanceInfoReplicator.onDemandUpdate():实例状态变更后,马上更新到远程eureka远程中心。
    该方法用到了RateLimiter来实现限流。RateLimiter实现了令牌桶限流算法。为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。


    image.png

    相关文章

      网友评论

          本文标题:eureka(二)-客户端之服务续约和注册

          本文链接:https://www.haomeiwen.com/subject/fqgzehtx.html