美文网首页Eureka
【SpringCloud Eureka源码】从Eureka Cl

【SpringCloud Eureka源码】从Eureka Cl

作者: Trust_FreeDom | 来源:发表于2019-01-04 10:14 被阅读27次

    本文使用Spring Cloud Eureka分析

    Spring Cloud版本: Dalston.SR5

    spring-cloud-starter-eureka版本: 1.3.6.RELEASE

    netflix eureka版本: 1.6.2

    Eureka Client启动并调用Eureka Server的注册接口

    Spring Cloud Eureka的自动配置

    @EnableDiscoveryClient

    首先从使用Eureka Client必须引入的@EnableDiscoveryClient注解说起

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableDiscoveryClientImportSelector.class)
    public @interface EnableDiscoveryClient {
    
        /**
         * If true, the ServiceRegistry will automatically register the local server.
         */
        boolean autoRegister() default true;
    }
    

    @EnableDiscoveryClient注解的作用:

    • autoRegister默认值为true,即服务发现客户端默认会自动注册到服务端

    • Import导入EnableDiscoveryClientImportSelector.class,其作用是

      • 导入了 spring-cloud-eureka-client.jar!\META-INF\spring.factories 中的 EurekaDiscoveryClientConfiguration

        org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
        org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
        
      • 由于autoRegister默认为true,故还会导入AutoServiceRegistrationConfiguration,即启用自动服务注册的配置,等同于在配置文件中spring.cloud.service-registry.auto-registration.enabled = true

    EurekaDiscoveryClientConfiguration

    • 向Spring容器注册EurekaDiscoveryClientConfiguration.Marker.class,使得真正导入DiscoveryClient的EurekaClientAutoConfiguration配置类满足启用条件
    • 创建监听RefreshScopeRefreshedEvent事件的监听器,满足在使用RefreshScope刷新时可以重建EurekaClient(不是本文重点)
    • 在配置eureka.client.healthcheck.enabled=true的前提下,向Spring容器注册EurekaHealthCheckHandler用于健康检查(不是本文重点)

    所以,EurekaDiscoveryClientConfiguration的主要作用是向Spring容器注册EurekaDiscoveryClientConfiguration.Marker.class,使得EurekaClientAutoConfiguration配置类满足启用条件

    EurekaClientAutoConfiguration

    EurekaClientAutoConfiguration配置类中涉及的内容比较多,主要内容:

    • 1、注册了spring cloud包下的EurekaClientConfigBean,这是个对netflix的EurekaClientConfig客户端配置接口的实现
    • 2、注册了spring cloud包下的EurekaInstanceConfigBean,这是个对netflix的EurekaInstanceConfig实例信息配置接口的实现
    • 3、注册了一些AutoServiceRegistration,即客户端自动注册的组件,如
      • EurekaRegistration: Eureka实例的服务注册信息(在开启客户端自动注册时才会注册)
      • EurekaServiceRegistry: Eureka服务注册器
      • EurekaAutoServiceRegistration: Eureka服务自动注册器,实现了SmartLifecycle,会在Spring容器的refresh的最后阶段被调用,通过EurekaServiceRegistry注册器注册EurekaRegistration信息
    • 4、注册netflix的EurekaClientApplicationInfoManager,注册时分为两种情况,即是否满足RefreshScope,如果满足,注入的Bean是带有 @Lazy + @RefreshScope 注解
      • ApplicationInfoManager: 管理并初始化当前Instance实例的注册信息,并提供了实例状态监听机制
      • EurekaClient : netflix的接口类,用于和Eureka Server交互的客户端,而netflix的默认实现是DiscoveryClient,也是本文分析的重点
    • 5、注册EurekaHealthIndicator,为/health端点提供Eureka相关信息,主要有Status当前实例状态和applications服务列表,在从Eureka Server获取服务列表正常的情况下,Status使用Eureka Server上的InstanceRemoteStatus,不正常情况下,代码中有一些判断逻辑
    public class EurekaClientAutoConfiguration {
        ...省略
    
        /**
         * 1、注册EurekaClientConfigBean
         */
        @Bean
        @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
        public EurekaClientConfigBean eurekaClientConfigBean() {
            EurekaClientConfigBean client = new EurekaClientConfigBean();
            if ("bootstrap".equals(propertyResolver.getProperty("spring.config.name"))) {
                // We don't register during bootstrap by default, but there will be another
                // chance later.
                client.setRegisterWithEureka(false);
            }
            return client;
        }
    
        /**
         * 2、注册EurekaInstanceConfigBean
         */
        @Bean
        @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
        public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) throws MalformedURLException {
            PropertyResolver eurekaPropertyResolver = new RelaxedPropertyResolver(this.env, "eureka.instance.");
            String hostname = eurekaPropertyResolver.getProperty("hostname");
    
            boolean preferIpAddress = Boolean.parseBoolean(eurekaPropertyResolver.getProperty("preferIpAddress"));
            int nonSecurePort = Integer.valueOf(propertyResolver.getProperty("server.port", propertyResolver.getProperty("port", "8080")));
            int managementPort = Integer.valueOf(propertyResolver.getProperty("management.port", String.valueOf(nonSecurePort)));
            String managementContextPath = propertyResolver.getProperty("management.contextPath", propertyResolver.getProperty("server.contextPath", "/"));
            EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
            instance.setNonSecurePort(nonSecurePort);
            instance.setInstanceId(getDefaultInstanceId(propertyResolver));
            instance.setPreferIpAddress(preferIpAddress);
            if (managementPort != nonSecurePort && managementPort != 0) {
                if (StringUtils.hasText(hostname)) {
                    instance.setHostname(hostname);
                }
                String statusPageUrlPath = eurekaPropertyResolver.getProperty("statusPageUrlPath");
                String healthCheckUrlPath = eurekaPropertyResolver.getProperty("healthCheckUrlPath");
                if (!managementContextPath.endsWith("/")) {
                    managementContextPath = managementContextPath + "/";
                }
                if (StringUtils.hasText(statusPageUrlPath)) {
                    instance.setStatusPageUrlPath(statusPageUrlPath);
                }
                if (StringUtils.hasText(healthCheckUrlPath)) {
                    instance.setHealthCheckUrlPath(healthCheckUrlPath);
                }
                String scheme = instance.getSecurePortEnabled() ? "https" : "http";
                URL base = new URL(scheme, instance.getHostname(), managementPort, managementContextPath);
                instance.setStatusPageUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getStatusPageUrlPath(), '/')).toString());
                instance.setHealthCheckUrl(new URL(base, StringUtils.trimLeadingCharacter(instance.getHealthCheckUrlPath(), '/')).toString());
            }
            return instance;
        }
    
        /**
         * 3、注册客户端自动注册相关组件
         *     EurekaRegistration: Eureka实例的服务注册信息(在开启客户端自动注册时才会注册)
         *     EurekaServiceRegistry: Eureka服务注册器
         *     EurekaAutoServiceRegistration: Eureka服务自动注册器,
         *                                     通过EurekaServiceRegistry注册器注册EurekaRegistration信息
         */
        @Bean
        public EurekaServiceRegistry eurekaServiceRegistry() {
            return new EurekaServiceRegistry();
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager) {
            return EurekaRegistration.builder(instanceConfig)
                    .with(applicationInfoManager)
                    .with(eurekaClient)
                    .with(healthCheckHandler)
                    .build();
        }
    
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
            return new EurekaAutoServiceRegistration(context, registry, registration);
        }
    
    
        /**
         * 4、注册netflix的 EurekaClient 和 ApplicationInfoManager
         */
        // 如果禁用客户端自动注册,在此方法debug打断点会触发服务注册,状态为STARTING
        @Bean
        public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
            return new EurekaDiscoveryClient(config, client);
        }
        
        // 普通的EurekaClient配置(不可刷新)
        @Configuration
        @ConditionalOnMissingRefreshScope
        protected static class EurekaClientConfiguration {
    
            @Autowired
            private ApplicationContext context;
    
            @Autowired(required = false)
            private DiscoveryClientOptionalArgs optionalArgs;
    
            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }
    
            @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
            public ApplicationInfoManager eurekaApplicationInfoManager(
                    EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }
        }
    
         // 可刷新的EurekaClient配置类
        @Configuration
        @ConditionalOnRefreshScope  //满足@ConditionalOnClass(RefreshScope.class)         
                                    //    @ConditionalOnBean(RefreshAutoConfiguration.class)
        protected static class RefreshableEurekaClientConfiguration {
    
            @Autowired
            private ApplicationContext context;
    
            @Autowired(required = false)
            private DiscoveryClientOptionalArgs optionalArgs;
    
            // 注册CloudEurekaClient,是com.netflix.discovery.EurekaClient接口的实现类
            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            @org.springframework.cloud.context.config.annotation.RefreshScope
            @Lazy
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
                manager.getInfo(); // force initialization
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }
    
            // 注册ApplicationInfoManager
            @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
            @org.springframework.cloud.context.config.annotation.RefreshScope
            @Lazy
            public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }
        }
    
        
        /**
         * 5、注册 EurekaHealthIndicator
         */
        @Configuration
        @ConditionalOnClass(Endpoint.class)
        protected static class EurekaHealthIndicatorConfiguration {
            @Bean
            @ConditionalOnMissingBean
            public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
                                                               EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
                return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
            }
        }
    }
    

    如上,在满足一系列Conditional条件后,会向Spring容器中注册CloudEurekaClient,它是com.netflix.discovery.EurekaClient接口的实现类,具体继承实现关系如下

    DiscoveryClient继承实现关系

    1545187443351.png

    如上图所示,刚刚创建的CloudEurekaClientcom.netflix.discovery.DiscoveryClient的子类,它们都实现了com.netflix.discovery.EurekaClient接口

    EurekaClient是Netflix对服务发现客户端抽象的接口,包含很多方法,而DiscoveryClient是其默认实现,也是本文分析的重点,CloudEurekaClient是spring cloud的实现,根据类上注释,其主要重写了onCacheRefreshed()方法,这个方法主要是从Eureka Server fetchRegistry()获取服务列表之后用于以广播方式通知缓存刷新事件的,其实DiscoveryClient也有onCacheRefreshed()方法的实现,但由于DiscoveryClient是Netflix的类,只发送了com.netflix.discovery.EurekaEvent,而CloudEurekaClient使用Spring的ApplicationEventPublisher,发送了HeartbeatEvent

    注意:

    上面说的都是netflix的DiscoveryClient

    还有另一个DiscoveryClient,是 org.springframework.cloud.client.discovery.DiscoveryClient

    是Spring对服务发现客户端的抽象

    创建DiscoveryClient的过程

    DiscoveryClient构造方法

    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, 
                    EurekaClientConfig config, 
                    AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        /**
         * AbstractDiscoveryClientOptionalArgs 是DiscoveryClient的可选参数,可理解为扩展点
         * 包含healthCheckHandlerProvider、healthCheckCallbackProvider、eventListeners等
         * spring cloud默认实现为MutableDiscoveryClientOptionalArgs,但此处相关成员变量赋值后认为空
         */
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
    
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
    
        this.backupRegistryProvider = backupRegistryProvider;
    
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
    
        fetchRegistryGeneration = new AtomicLong(0);
    
        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
        // 如果 shouldFetchRegistry=true,注册netflix servo监控
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        // 如果 shouldRegisterWithEureka=true,注册netflix servo监控
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
        // 如果既不要向eureka server注册,又不要获取服务列表,就什么都不用初始化
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
    
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
            return;  // no need to setup up an network tasks and we are done
        }
    
        // 【重点】创建各种Executor 和 eurekaTransport、instanceRegionChecker
        try {
            // 执行定时任务的定时器,定时线程名为 DiscoveryClient-%d
            // 在定时器中用于定时执行TimedSupervisorTask监督任务,监督任务会强制超时 和 记录监控数据
            scheduler = Executors.newScheduledThreadPool(3,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
    
            // 执行heartbeat心跳任务的执行器,默认最大线程数=2,线程名为:DiscoveryClient-HeartbeatExecutor-%d
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            // 执行服务列表缓存刷新的执行器,默认最大线程数=2,线程名为:DiscoveryClient-CacheRefreshExecutor-%d
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            eurekaTransport = new EurekaTransport();
            // 初始化eurekaTransport在服务注册,获取服务列表时的client
            scheduleServerEndpointTask(eurekaTransport, args);
    
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
    
        // 如果需要从eureka server获取服务列表,并且尝试fetchRegistry(false)失败,调用BackupRegistry
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
        // 【重点】初始化所有定时任务
        initScheduledTasks();
        
        // 添加servo监控
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
    
        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
    }
    

    上面的DiscoveryClient构造方法代码比较多,但多数都是一些赋值,本次分析的重点在注释中已经标出,创建了各种Executor 和 eurekaTransport、instanceRegionChecker,之后又调用initScheduledTasks()方法初始化所有这些定时任务

    【重点】initScheduledTasks() 初始化定时任务

    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        // 1、如果要从Eureka Server获取服务列表
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            
            // 从eureka服务器获取注册表信息的频率(默认30s)
            // 同时也是单次获取服务列表的超时时间
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            // 如果缓存刷新超时,下一次执行的delay最大是registryFetchIntervalSeconds的几倍(默认10),默认每次执行是上一次的2倍
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            
            /**
             * 【#### 执行CacheRefreshThread,服务列表缓存刷新任务 ####】
             * 执行TimedSupervisorTask监督任务的定时器,具体执行器为cacheRefreshExecutor,任务为CacheRefreshThread
             */
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",               //监控名
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds, //指定具体任务的超时时间
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    
        
        // 2、如果要注册到Eureka Server
        if (clientConfig.shouldRegisterWithEureka()) {
            // 续租的时间间隔(默认30s)
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            // 如果心跳任务超时,下一次执行的delay最大是renewalIntervalInSecs的几倍(默认10),默认每次执行是上一次的2倍
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
    
            // Heartbeat timer
            /**
             * 【#### 执行HeartbeatThread,发送心跳数据 ####】
             * 执行TimedSupervisorTask监督任务的定时器,具体执行器为heartbeatExecutor,任务为HeartbeatThread
             */
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
    
            // InstanceInfo replicator
            /** 
             * 【#### InstanceInfo复制器 ####】
             * 启动后台定时任务scheduler,线程名为 DiscoveryClient-InstanceInfoReplicator-%d
             * 默认每30s执行一次定时任务,查看Instance信息(DataCenterInfo、LeaseInfo、InstanceStatus)是否有变化
             * 如果有变化,执行 discoveryClient.register()
             */
            instanceInfoReplicator = new InstanceInfoReplicator(
                   this,            //当前DiscoveryClient
                   instanceInfo,    //当前实例信息
                   clientConfig.getInstanceInfoReplicationIntervalSeconds(),//InstanceInfo的复制间隔(默认30s)
                   2); // burstSize
    
            /**
             * 【StatusChangeListener 状态改变监听器】
             */
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
    
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    
                    //使用InstanceInfo复制器 scheduler.submit()一个Runnable任务
                    //后台马上执行 discoveryClient.register()
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
    
            /**
             * 是否关注Instance状态变化,使用后台线程将状态同步到eureka server(默认true)
             * 调用 ApplicationInfoManager#setInstanceStatus(status) 会触发
             * 将 StatusChangeListener 注册到 ApplicationInfoManager
             */
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
    
            // 启动InstanceInfo复制器
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } 
        // 当前服务实例不注册到Eureka Server
        else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    总的来说initScheduledTasks()做了以下几件事:

    • 如果shouldFetchRegistry=true,即要从Eureka Server获取服务列表
      • 启动刷新服务列表定时线程(DiscoveryClient-CacheRefreshExecutor-%d),默认registryFetchIntervalSeconds=30s执行一次,任务为CacheRefreshThread,即从Eureka Server获取服务列表,也刷新客户端缓存
    • 如果shouldRegisterWithEureka=true,即要注册到Eureka Server
      • 启动heartbeat心跳定时线程(DiscoveryClient-HeartbeatExecutor-%d),默认renewalIntervalInSecs=30s续约一次,任务为HeartbeatThread,即客户端向Eureka Server发送心跳
      • 启动InstanceInfo复制器定时线程(DiscoveryClient-InstanceInfoReplicator-%d),开启定时线程检查当前Instance的DataCenterInfo、LeaseInfo、InstanceStatus,如果发现变更就执行discoveryClient.register(),将实例信息同步到Server端

    Eureka Client复制InstanceInfo,发起注册

    由创建DiscoveryClient的过程可知,创建了很多定时执行线程,如定时从Server端刷新服务列表的CacheRefreshThread,定时报心跳续约的HeartbeatThread,还有用于更新并复制本地实例状态到Server端的InstanceInfo复制器定时线程,而正是InstanceInfoReplicator#run()中的discoveryClient.register()发起了注册

    那么怎么可以触发注册行为呢?

    // InstanceInfoReplicator#run()
    public void run() {
        try {
            /**
             * 刷新 InstanceInfo
             * 1、刷新 DataCenterInfo
             * 2、刷新 LeaseInfo 租约信息
             * 3、根据HealthCheckHandler获取InstanceStatus,并更新,如果状态发生变化会触发所有StatusChangeListener
             */
            discoveryClient.refreshInstanceInfo();
    
            // 如果isInstanceInfoDirty=true,返回dirtyTimestamp,否则是null
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();  //发起注册
                instanceInfo.unsetIsDirty(dirtyTimestamp);  //isInstanceInfoDirty置为false
            }
        } 
        catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } 
        finally { // 继续下次任务
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    

    如上,先刷新InstanceInfo,刷新后如果发现有脏数据,即实例发生了变更,还未同步给Server的数据,就发起注册

    那么在Eureka Client启动的这种场景下,怎样会触发有脏数据下的注册?

    • 由InstanceInfoReplicator复制器的自动定时任务在刷新InstanceInfo时发现有脏数据,并更新
    • InstanceInfoReplicator复制器提供onDemandUpdate()按需更新方法,一旦调用,马上会submit()任务,其中会cancel自动更新任务,马上执行InstanceInfoReplicator#run()

    InstanceInfoReplicator复制器自动定时更新

    InstanceInfoReplicator复制器在启动创建DiscoveryClient时被创建并start()启动

    // InstanceInfoReplicator#start()
    public void start(int initialDelayMs) { // 默认40s
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register 初始化时会将instanceInfo设置为dirty
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    

    所以当自动更新启动时会设置InstanceInfo为脏数据,因为要触发第一次向Server同步,那么在40s后会调用InstanceInfoReplicator#run(),假设InstanceInfo并没有其它变更,那么也会发起discoveryClient.register()

    注意:

    正常情况下是不会由延迟40s的第一次执行定时任务发起注册,而是下面的onDemandUpdate() 主动按需更新发起注册

    如果设置@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false,即放弃自动注册,并在EurekaClientAutoConfiguration的如下方法打断点

    @Bean
    public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
      return new EurekaDiscoveryClient(config, client);
    }
    

    会在断点生效时触发EurekaClient的实例化,而此EurekaClient就是一个DiscoveryClient,会启动InstanceInfoReplicator自动定时更新线程,但由于new InstanceInfoFactory().create(config)时本地实例状态为STARTING,所以注册到Server端的状态也是STARTING

    onDemandUpdate() 主动按需更新

    目前只有在ApplicationInfoManager#setInstanceStatus()更新实例状态,且实例状态真的发生变更,触发StatusChangeListener状态变更监听器时,会调用onDemandUpdate马上submit任务执行InstanceInfoReplicator#run(),再发起注册

    由于Spring Cloud默认是启用服务自动注册AutoServiceRegistration的,所以在EurekaClientAutoConfiguration自动配置时会注册服务自动注册相关组件(EurekaRegistration、EurekaServiceRegistry、EurekaAutoServiceRegistration),其中EurekaAutoServiceRegistration实现了Spring的SmartLifecycle接口,会在Spring容器refresh要完毕时触发生命周期方法start(),其中会使用EurekaServiceRegistry服务注册器注册EurekaRegistration这个本地实例信息

    // EurekaServiceRegistry#register()
    public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);
    
        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getInstanceConfig().getAppname()
                    + " with eureka with status "
                    + reg.getInstanceConfig().getInitialStatus());
        }
    
        // 设置初始化状态
        reg.getApplicationInfoManager()
                .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    
        if (reg.getHealthCheckHandler() != null) {
            reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
        }
    }
    

    主要是设置初始化状态步骤,而EurekaInstanceConfigBean本地实例信息的initialStatus初始化状态为 InstanceStatus.UP,所以状态与new InstanceInfo()时的STARTING不同,发生了状态变更,触发在创建DiscoveryClient时设置的StatusChangeListener

    ...省略
    
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
        @Override
        public String getId() {
            return "statusChangeListener";
        }
    
        @Override
        public void notify(StatusChangeEvent statusChangeEvent) {
            if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                // log at warn level if DOWN was involved
                logger.warn("Saw local status change event {}", statusChangeEvent);
            } else {
                logger.info("Saw local status change event {}", statusChangeEvent);
            }
            instanceInfoReplicator.onDemandUpdate();
        }
    };
    
    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
        applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }
    
    ...省略
    

    其中会调用 InstanceInfoReplicator.onDemandUpdate() 实例信息复制器做按需更新,马上将UP状态更新/注册到Server端

    所以,以我判断,Eureka Client启动时的自动注册大多数应该是Spring Cloud的服务自动注册机制,在Spring容器基本启动完毕时,触发服务自动注册操作,其中会使用ApplicationInfoManager更新实例状态为初始状态UP,一旦实例状态变更会被马上监听到,执行复制器的InstanceInfoReplicator.onDemandUpdate()按需更新,马上执行一次discoveryClient.register()操作

    所以,下面就是分析 discoveryClient.register() 是怎么注册服务的

    DiscoveryClient#register() 注册

    // DiscoveryClient#register()
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    

    如上,注册方法使用eurekaTransport的注册客户端registrationClient调用了register(instanceinfo)方法

    EurekaTransportDiscoveryClient的内部类,其中包含

    • registrationClient 和 registrationClientFactory: 负责注册、续约相关工作的EurekaHttpClientEurekaHttpClientFactory的实现类
    • queryClient 和 queryClientFactory: 负责获取Server端服务列表的EurekaHttpClientEurekaHttpClientFactory的实现类
    • TransportClientFactory: 负责传输消息的客户端工厂(底层用于和Server交互的http框架是 Jersey,此处的工厂就和Jersey相关)

    那么EurekaTransport的相关组件,尤其是registrationClient注册客户端是如何初始化的呢?

    registrationClient - 服务注册相关的EurekaHttpClient

    初始化是在DiscoveryClient的构造方法中

    eurekaTransport = new EurekaTransport();
    scheduleServerEndpointTask(eurekaTransport, args);
    

    主要是scheduleServerEndpointTask()方法

    private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {
    
        Collection<?> additionalFilters = args == null
                ? Collections.emptyList()
                : args.additionalFilters;
    
        EurekaJerseyClient providedJerseyClient = args == null
                ? null
                : args.eurekaJerseyClient;
        
        TransportClientFactories argsTransportClientFactories = null;
        if (args != null && args.getTransportClientFactories() != null) {
            argsTransportClientFactories = args.getTransportClientFactories();
        }
        
        // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
        @SuppressWarnings("rawtypes")
        TransportClientFactories transportClientFactories = argsTransportClientFactories == null
                ? new Jersey1TransportClientFactories()
                : argsTransportClientFactories;
    
        // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
        // 1、参数中是否提供了transportClientFactory的实现,没有就使用Jersey1TransportClientFactories
        eurekaTransport.transportClientFactory = providedJerseyClient == null
                ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
                : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
    
        ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
            @Override
            public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
                long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
                long delay = getLastSuccessfulRegistryFetchTimePeriod();
                if (delay > thresholdInMs) {
                    logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                            thresholdInMs, delay);
                    return null;
                } else {
                    return localRegionApps.get();
                }
            }
        };
    
        eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
                clientConfig,
                transportConfig,
                eurekaTransport.transportClientFactory,
                applicationInfoManager.getInfo(),
                applicationsSource
        );
    
        /**
         * 是否要想Eureka Server注册
         * 2、创建RegistrationClient用于注册的客户端及工厂,并设置到eurekaTransport
         */ 
        if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            try {
                newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        transportConfig
                );
                newRegistrationClient = newRegistrationClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.registrationClient = newRegistrationClient;
        }
    
        // 
        /**
         * 是否要从Server端获取服务列表
         * 3、创建QueryClient用于查询服务列表的客户端及工厂,并设置到eurekaTransport
         */ 
        // new method (resolve from primary servers for read)
        // Configure new transport layer (candidate for injecting in the future)
        if (clientConfig.shouldFetchRegistry()) {
            EurekaHttpClientFactory newQueryClientFactory = null;
            EurekaHttpClient newQueryClient = null;
            try {
                newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        clientConfig,
                        transportConfig,
                        applicationInfoManager.getInfo(),
                        applicationsSource
                );
                newQueryClient = newQueryClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            eurekaTransport.queryClientFactory = newQueryClientFactory;
            eurekaTransport.queryClient = newQueryClient;
        }
    }
    

    所以,下面就是逐层深入分析RegistrationClient用于注册的客户端及工厂是如何创建的?

    由于RegistrationClient其实是一种EurekaHttpClient,而EurekaHttpClient是接口,其实现类很多

    EurekaHttpClient实现类

    查看源码发现,Netflix采用的是 Factory工厂 + 代理 的模式,从最外层创建的EurekaHttpClient工厂包含一个成员变量是另一个代理的EurekaHttpClient工厂,每个工厂生成的EurekaHttpClient功能不一样,在从外层执行一个操作时,最外层的工厂执行其相关功能后,使用代理的工厂新建EurekaHttpClient实例,再调用其相同的方法,也实现这个EurekaHttpClient的相关功能,就这样逐层深入,各司其职后,最后使用Jersey发送POST请求到Eureka Server发起注册,而这些EurekaHttpClient都是在com.netflix.discovery.shared.transport.decoratorEurekaHttpClient的包装类的包下的,由外到内大致是:

    • SessionedEurekaHttpClient: 强制在一定时间间隔后重连EurekaHttpClient,防止永远只连接特定Eureka Server,反过来保证了在Server端集群拓扑发生变化时的负载重分配
      • RetryableEurekaHttpClient: 带有重试功能,默认最多3次,在配置的所有候选Server地址中尝试请求,成功重用,失败会重试另一Server,并维护隔离清单,下次跳过,当隔离数量达到阈值,清空隔离清单,重新开始
        • RedirectingEurekaHttpClient: Server端返回302重定向时,客户端shutdown原EurekaHttpClient,根据response header中的Location新建EurekaHttpClient
          • MetricsCollectingEurekaHttpClient: 统计收集Metrics信息
            • JerseyApplicationClient: AbstractJerseyEurekaHttpClient的子类
              • AbstractJerseyEurekaHttpClient: 底层实现通过Jersery注册、发心跳等的核心类
                • jerseyClient: Jersery客户端

    SessionedEurekaHttpClient - 定时重连

    // SessionedEurekaHttpClient#execute()
    @Override
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        long now = System.currentTimeMillis();
        long delay = now - lastReconnectTimeStamp;
        // 如果上次重连时间到现在已经超过了currentSessionDurationMs,关闭当前EurekaHttpClient
        if (delay >= currentSessionDurationMs) {
            logger.debug("Ending a session and starting anew");
            lastReconnectTimeStamp = now;
            currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
            TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
        }
    
        // 如果EurekaHttpClient为空,clientFactory.newClient()重建
        EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
        if (eurekaHttpClient == null) {
            eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
        }
        
        // 继续执行后续
        return requestExecutor.execute(eurekaHttpClient);
    }
    

    RetryableEurekaHttpClient - 候选范围内失败重试

    // RetryableEurekaHttpClient#execute()
    @Override
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        List<EurekaEndpoint> candidateHosts = null;
        int endpointIdx = 0;
        
        // 最多重试numberOfRetries(默认:3)
        for (int retry = 0; retry < numberOfRetries; retry++) {
            EurekaHttpClient currentHttpClient = delegate.get();//从AtomicReference<EurekaHttpClient>获取当前EurekaHttpClient
            EurekaEndpoint currentEndpoint = null;
            if (currentHttpClient == null) {
                if (candidateHosts == null) {
                    candidateHosts = getHostCandidates(); //返回候选集合 排除 已经失败隔离的Host集合
                    if (candidateHosts.isEmpty()) {
                        throw new TransportException("There is no known eureka server; cluster server list is empty");
                    }
                }
                if (endpointIdx >= candidateHosts.size()) {
                    throw new TransportException("Cannot execute request on any known server");
                }
    
                // 根据当前的下标获取Endpoint,并新建 JerseyClient
                currentEndpoint = candidateHosts.get(endpointIdx++);
                currentHttpClient = clientFactory.newClient(currentEndpoint);
            }
    
            try {
                // 继续后续执行
                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                // 如果根据当前操作类型 和 返回状态码,满足状态计算器,记录currentHttpClient可用,下次继续使用
                // 返回状态码是:200、300、302,或者Register、SendHeartBeat情况下是404
                if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                    delegate.set(currentHttpClient);
                    if (retry > 0) {
                        logger.info("Request execution succeeded on retry #{}", retry);
                    }
                    return response;
                }
                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
            } 
            catch (Exception e) {
                logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
            }
    
            // Connection error or 5xx from the server that must be retried on another server
            // 请求失败 或 报5xx错误,将delegate清空,重试另一个Server,并将当前Endpoint放到隔离集合
            delegate.compareAndSet(currentHttpClient, null);
            if (currentEndpoint != null) {
                quarantineSet.add(currentEndpoint);
            }
        }
        
        // 多次重试后仍无法成功返回结果,上抛异常
        throw new TransportException("Retry limit reached; giving up on completing the request");
    }
    
    
    //########## RetryableEurekaHttpClient#getHostCandidates() 
    // 返回 所有候选的Host节点数据 与 隔离集合 的数据差集
    private List<EurekaEndpoint> getHostCandidates() {
        List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints(); //所有候选节点数据
        quarantineSet.retainAll(candidateHosts); //确保quarantineSet隔离集合中的数据都在candidateHosts中
                                                 //当candidateHosts发生变化时也能及时清理quarantineSet隔离集合
    
        // If enough hosts are bad, we have no choice but start over again
        // 默认:0.66百分比
        int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
        
        // 隔离集合为空
        if (quarantineSet.isEmpty()) {
            // no-op
        } 
        // 隔离数据已经大于阀值,不得已要重新开始,清空隔离集合
        else if (quarantineSet.size() >= threshold) { 
            logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
            quarantineSet.clear();
        } 
        // 隔离集合不为空,也不大于阀值,排除隔离集合中的Endpoint后返回
        else { 
            List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
            for (EurekaEndpoint endpoint : candidateHosts) {
                if (!quarantineSet.contains(endpoint)) {
                    remainingHosts.add(endpoint);
                }
            }
            candidateHosts = remainingHosts;
        }
    
        return candidateHosts;
    }
    
    
    //########## ServerStatusEvaluators#LEGACY_EVALUATOR
    // Eureka Server返回状态的计算器,计算不同场景下的不同状态码是否代表成功
    private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {
        @Override
        public boolean accept(int statusCode, RequestType requestType) {
            if (statusCode >= 200 && statusCode < 300 || statusCode == 302) {
                return true;
            } else if (requestType == RequestType.Register && statusCode == 404) {
                return true;
            } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {
                return true;
            } else if (requestType == RequestType.Cancel) {  // cancel is best effort
                return true;
            } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) {
                return true;
            }
            return false;
        }
    };
    

    RedirectingEurekaHttpClient - 按Server端要求重定向到新Server

    //########## RedirectingEurekaHttpClient#executeOnNewServer
    // Server端返回302重定向时,客户端shutdown原EurekaHttpClient,根据response header中的Location新建EurekaHttpClient
    private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor,
                                                    AtomicReference<EurekaHttpClient> currentHttpClientRef) {
        URI targetUrl = null;
        // 最多重定向默认10次
        for (int followRedirectCount = 0; followRedirectCount < MAX_FOLLOWED_REDIRECTS; followRedirectCount++) {
            EurekaHttpResponse<R> httpResponse = requestExecutor.execute(currentHttpClientRef.get());
            
            // 如果返回的不是302重定向,返回response
            if (httpResponse.getStatusCode() != 302) {
                if (followRedirectCount == 0) {
                    logger.debug("Pinning to endpoint {}", targetUrl);
                } else {
                    logger.info("Pinning to endpoint {}, after {} redirect(s)", targetUrl, followRedirectCount);
                }
                return httpResponse;
            }
    
            // 从response中获取Location,用于重建EurekaHttpClient
            targetUrl = getRedirectBaseUri(httpResponse.getLocation());
            if (targetUrl == null) {
                throw new TransportException("Invalid redirect URL " + httpResponse.getLocation());
            }
    
            currentHttpClientRef.getAndSet(null).shutdown();
            currentHttpClientRef.set(factory.newClient(new DefaultEndpoint(targetUrl.toString())));
        }
        String message = "Follow redirect limit crossed for URI " + serviceEndpoint.getServiceUrl();
        logger.warn(message);
        throw new TransportException(message);
    }
    

    MetricsCollectingEurekaHttpClient - 统计收集执行情况

    // MetricsCollectingEurekaHttpClient#execute()
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
        Stopwatch stopwatch = requestMetrics.latencyTimer.start(); //统计执行延时
        try {
            EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
            requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment(); //按状态统计
            return httpResponse;
        } catch (Exception e) {
            requestMetrics.connectionErrors.increment(); //统计错误
            exceptionsMetric.count(e); //按异常名统计
            throw e;
        } finally {
            stopwatch.stop();
        }
    }
    

    AbstractJerseyEurekaHttpClient - 底层通过Jersery发送注册、心跳请求

    public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
        protected final Client jerseyClient; //真正处理请求的Jersery客户端
        protected final String serviceUrl; //连接的Server端地址
        
        protected AbstractJerseyEurekaHttpClient(Client jerseyClient, String serviceUrl) {
            this.jerseyClient = jerseyClient;
            this.serviceUrl = serviceUrl;
            logger.debug("Created client for url: {}", serviceUrl);
        }
        
        /**
         * 注册方法
         */
        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName(); //请求Eureka Server的【/apps/应用名】接口地址
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);  //实例InstanceInfo数据,通过Post请求body发过去
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } 
            finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
        
        ...省略
    

    Eureka Server接收到的注册请求详情

    经过上面的步骤,客户端已经可以通过Jersery发送Http请求给Eureka Server端注册,具体请求如下:

    POST /eureka/apps/应用名 HTTP/1.1
    Accept-Encoding: gzip
    Content-Type: application/json
    Accept: application/json
    DiscoveryIdentity-Name: DefaultClient
    DiscoveryIdentity-Version: 1.4
    DiscoveryIdentity-Id: 192.168.70.132
    Transfer-Encoding: chunked
    Host: localhost:8001
    Connection: Keep-Alive
    User-Agent: Java-EurekaClient/v1.6.2

    1a0
    {"instance":{
    ​ "instanceId":"192.168.70.132:应用名:10001",
    ​ "hostName":"192.168.70.132",
    ​ "app":"应用名",
    ​ "ipAddr":"192.168.70.132",
    ​ "status":"UP",
    ​ "overriddenstatus":"UNKNOWN",
    ​ "port": { "":10001, "@enabled" : "true" }, ​ "securePort": { "":443, "@enabled" : "false"},
    ​ "countryId":1,
    ​ "dataCenterInfo":{"@class":"com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
    ​ "name":"MyOwn"
    }

    Eureka Client注册流程总结

    大体来说,Eureka Client的注册是由Spring Cloud的AutoServiceRegistration自动注册发起,在设置应用实例Instance初始状态为UP时,触发了InstanceInfoReplicator#onDemandUpdate()按需更新方法,将实例Instance信息通过DiscoveryClient注册到Eureka Server,期间经过了一些EurekaHttpClient的装饰类,实现了诸如定期重连失败重试注册重定向、统计收集Metrics信息等功能,最后由JerseryClient发送POST请求调用Eureka Server的【/eureka/apps/应用名】端点,请求体携带InstanceInfo实例信息,完成注册

    • EurekaAutoServiceRegistration#start(): 实现Spring的SmartLifecycle,在Spring容器refresh()最后一步finishRefresh()会调用生命周期的start()方法

      • EurekaServiceRegistry#register(EurekaRegistration): 使用服务注册器注册服务信息

        • ApplicationInfoManager#setInstanceStatus(初始状态): 应用实例信息管理器更新初始状态为 UP

          • StatusChangeListener: 触发实例状态监听(此Listener是在DiscoveryClient#initScheduledTasks()方法中设置的)

            • InstanceInfoReplicator.onDemandUpdate(): 实例状态复制器执行按需状态更新

              • DiscoveryClient#register(): DiscoveryClient发起注册实例信息

                • EurekaHttpClientDecorator#execute(): 执行EurekaHttpClient的装饰类,实现其各自功能

                  SessionedEurekaHttpClient: 定时重连

                  RetryableEurekaHttpClient: 候选范围内失败重试

                  RedirectingEurekaHttpClient: 按Eureka Server端要求重定向到新Server

                  MetricsCollectingEurekaHttpClient: 统计收集执行情况

                  • JerseyApplicationClient#register(): 封装注册请求数据
                    • JerseyClient发送Post注册请求

    相关文章

      网友评论

        本文标题:【SpringCloud Eureka源码】从Eureka Cl

        本文链接:https://www.haomeiwen.com/subject/zkchrqtx.html