美文网首页
sping boot eureka客户端延迟注册实现

sping boot eureka客户端延迟注册实现

作者: 刘勇_leo | 来源:发表于2020-12-02 21:34 被阅读0次

    声明: 本文eureka客户端版本

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>
    

    启动流程

    eureka客户端实现类: com.netflix.discovery.DiscoveryClient
    eureka配置项实体类: org.springframework.cloud.netflix.eureka.EurekaClientConfigBean

    1. 启动流程核心代码在DiscoveryClient构造函数中
    2. 核心操作

    2.1 是否强制在初始化阶段进行eureka注册

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
       try {
           if (!register() ) {
               throw new IllegalStateException("Registration error at startup. Invalid server response.");
           }
       } catch (Throwable th) {
           logger.error("Registration error at startup: {}", th.getMessage());
           throw new IllegalStateException(th);
       }
    }
    

    说明:
    shouldRegisterWithEureka对应配置eureka.client.registerWithEureka,是否注册eureka如果为false则不启动整个DiscoveryClient,默认为true
    shouldEnforceRegistrationAtInit对应配置eureka.client.shouldEnforceRegistrationAtInit,是否强制在初始化阶段进行进行eureka注册,默认false
    综合俩个参数,eureka客户端在初始化阶段本意是不注册eureka,因为此时服务处于不可用状态!!

    2.2 构造需要的定时任务(集群发现, 心跳, 实例副本...)

    private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    

    说明:

    1. 从eureka服务端拉取当前服务集群信息-task
    2. 心跳、实例信息副本-task
      特殊:InstanceInfoReplicator 类实现Runnable包含注册逻辑,因此需要分析该类的触发
    class InstanceInfoReplicator implements Runnable {
        .......
        InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
            this.discoveryClient = discoveryClient;
            this.instanceInfo = instanceInfo;
            this.scheduler = Executors.newScheduledThreadPool(1,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
                            .setDaemon(true)
                            .build());
    
            this.scheduledPeriodicRef = new AtomicReference<Future>();
    
            this.started = new AtomicBoolean(false);
            this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
            this.replicationIntervalSeconds = replicationIntervalSeconds;
            this.burstSize = burstSize;
    
            this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
            logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
        }
    
        public void start(int initialDelayMs) {
            //触发run方法
            if (started.compareAndSet(false, true)) {
                instanceInfo.setIsDirty();  // for initial register
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
        ......
        public boolean onDemandUpdate() {
            if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
                if (!scheduler.isShutdown()) {
                    scheduler.submit(new Runnable() {
                        @Override
                        public void run() {
                            logger.debug("Executing on-demand update of local InstanceInfo");
        
                            Future latestPeriodic = scheduledPeriodicRef.get();
                            if (latestPeriodic != null && !latestPeriodic.isDone()) {
                                logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                                latestPeriodic.cancel(false);
                            }
                            //触发run方法
                            InstanceInfoReplicator.this.run();
                        }
                    });
                    return true;
                } else {
                    logger.warn("Ignoring onDemand update due to stopped scheduler");
                    return false;
                }
            } else {
                logger.warn("Ignoring onDemand update due to rate limiter");
                return false;
            }
        }
    
        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    //触发注册接口
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    }
    
    重点: start()、onDemandUpdate()方法包含对run方法的触发
    start()方法在initScheduledTasks()方法被调用,代码如下:
    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    

    说明: start()方法提交一个延迟任务,延迟时间getInitialInstanceInfoReplicationIntervalSeconds()对应配置eureka.client.initialInstanceInfoReplicationIntervalSeconds,默认值为40,因此该任务会在启动之后才被触发

    onDemandUpdate()方法在initScheduledTasks()方法被调用,代码如下:
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }
    
    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
    };
    
    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
        applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }
    

    说明: eureka客户端启动时会有状态变更STARTING->UP,因此如果注册statusChangeListener,则notify()方法会被触发从而触发注册逻辑, shouldOnDemandUpdateStatusChange()直译为按需状态变更,对应配置项为eureka.client.onDemandUpdateStatusChange,默认值为true

    综合: 默认情况下,由statusChangeListener触发eureka的注册逻辑,因此如果在启动过程中不注册则需要修改eureka.client.onDemandUpdateStatusChange=false

    问题: 在关闭上述配置后,什么时候进行注册逻辑

    1. 实例副本任务(InstanceInfoReplicator),在初始化过程中会提交任务触发注册逻辑,但是触发时间不好控制,因为在初始化阶段提交,因此需要正确估算启动间隔!!
    2. 心跳任务(HeartbeatThread),会检测是否不存在(404),如果不存在才会触发注册逻辑,但是eureka有STARTING中间状态,因此如果远端eureka服务记录的当前实例处于该状态,服务依旧被认为不可用,因此该任务不可靠!!

    自定义: 在tomcat启动之后触发注册逻辑

    1. 服务注册逻辑复用,org.springframework.cloud.netflix.eureka.http.RestTemplateEurekaHttpClient包含了所有eureka操作的restful接口
    2. spring boot启动的run方法中包含扩展点: org.springframework.boot.SpringApplicationRunListener,该类中定义了不同阶段会被调用的方法!!
      综合上述细节产出如下扩展类
    @Slf4j
    public class EurekaRegisterListener implements SpringApplicationRunListener, Ordered {
        private final SpringApplication application;
        private final String[] args;
        //注意该构造方法为官方指定,必须包含这俩个参数
        public EurekaRegisterListener(SpringApplication sa, String[] arg) {
            this.application = sa;
            this.args = arg;
        }
    
        @Override
        public int getOrder() {
            //该listener最后被触发执行
            return Ordered.LOWEST_PRECEDENCE;
        }
    
        @Override
        public void starting() {
    
        }
    
        @Override
        public void environmentPrepared(ConfigurableEnvironment environment) {
    
        }
    
        @Override
        public void contextPrepared(ConfigurableApplicationContext context) {
    
        }
    
        @Override
        public void contextLoaded(ConfigurableApplicationContext context) {
    
        }
    
        @Override
        public void started(ConfigurableApplicationContext context) {
    
        }
    
        //run方法在刚刚启动的时候会调用一次,然后整体服务启动后还会被调用一次
        @Override
        public void running(ConfigurableApplicationContext context) {
            //获取eureka服务端配置项
            String eurekaServiceUrls = context.getEnvironment().getProperty("eureka.client.service-url.defaultZone");
            if (StringUtils.isEmpty(eurekaServiceUrls)) {
                log.error("not found eureka service for manual register!!!!");
                return;
            }
            //第一次调用时上下文并没有被构造因此找bean时失败,才是会抛异常,因此此处捕获并忽略!!
            EurekaInstanceConfigBean eurekaInstanceConfigBean;
            try {
                eurekaInstanceConfigBean = context.getBean(EurekaInstanceConfigBean.class);
            } catch (Exception ignore) {
                return;
            }
    
            //eureka的配置项支持多个地址并用逗号隔开,因此此处也做了兼容
            String[] serviceUrlArr = eurekaServiceUrls.split(",");
            for (String serviceUrl : serviceUrlArr) {
                //轮询地址,构造restTemplate
                EurekaHttpClient eurekaHttpClient = new RestTemplateTransportClientFactory().newClient(new DefaultEndpoint(serviceUrl));
                //获取eureka根据配置文件构造出的实例对象
                InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(eurekaInstanceConfigBean).get();
                //此时直接将状态更该为UP,默认为STARTING虽然注册但是不可用
                instanceInfo.setStatus(InstanceInfo.InstanceStatus.UP);
                //发送rest请求去注册
                EurekaHttpResponse<Void> register = eurekaHttpClient.register(instanceInfo);
                //判断当前地址是成功注册
                if (register.getStatusCode() == 204) {
                    log.info("success manual register eureka!!!");
                    return;
                }
            }
        }
    
        @Override
        public void failed(ConfigurableApplicationContext context, Throwable exception) {
            //启动失败时下线eureka实例,eureka内部实现直接拿过来用!
            DiscoveryManager.getInstance().shutdownComponent();
        }
    }
    

    说明: SpringApplicationRunListener使用spring的spi机制,因此需要在resource目录下增加META-INF目录,增加spring.factories文件,文件内容为: org.springframework.boot.SpringApplicationRunListener=
    com.xxx.EurekaRegisterListener

    总结

    1. 调整eureka.client.initialInstanceInfoReplicationIntervalSeconds时间,让服务启动后刚刚好到达任务执行时间
    2. 扩展SpringApplicationRunListener,将在服务启动后立即触发eureka的注册逻辑

    相关文章

      网友评论

          本文标题:sping boot eureka客户端延迟注册实现

          本文链接:https://www.haomeiwen.com/subject/wvguwktx.html