美文网首页编程技巧
Spring Cloud——Eureka Client启动流程分

Spring Cloud——Eureka Client启动流程分

作者: 小波同学 | 来源:发表于2021-09-05 19:18 被阅读0次

    Eureka Client启动流程分析

    @EnableDiscoveryClient注解作用

    • autoRegister()方法返回true则注册到注册中心,如果你配置为false,那么就不会自动注册
    • 导入EnableDiscoveryClientImportSelector类
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableDiscoveryClientImportSelector.class)
    public @interface EnableDiscoveryClient {
    
        boolean autoRegister() default true;
    
    }
    

    自动装载核心配置类

    根据自动装载原则可以在spring-cloud-netflix-eureka-client-2.2.5.RELEASE.jar下的META-INF目录下找到 spring.factories文件


    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
    org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
    org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
    org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
    
    org.springframework.cloud.bootstrap.BootstrapConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
    

    EurekaDiscoveryClientConfigServiceBootstrapConfiguration

    @ConditionalOnClass(ConfigServicePropertySourceLocator.class)
    @ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled",
            matchIfMissing = false)
    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    public class EurekaConfigServerBootstrapConfiguration {
    
    }
    

    上方两个注解则是这个配置类是否能够开启的条件,这里就不再展开,直接看它引入的配置类吧

    EurekaDiscoveryClientConfiguration

    • 1、细心的读者可能会发现这里又注册了一个Marker类,可以猜测也是某个地方的开关。
    • 2、EurekaClientConfigurationRefresher这个类看名字就知道这是当配置被动态刷新时的一个处理器,这里也不再展开了。
    • 3、EurekaHealthCheckHandlerConfiguration这里面注册了一个Eureka健康检查的处理类。

    EurekaClientAutoConfiguration

    这个类里面全是重点,也是我们本文的核心

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
            CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
    @AutoConfigureAfter(name = {
            "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
            "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
            "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
            "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
    public class EurekaClientAutoConfiguration {
    
    }
    

    首先可以看到这个类一共包含这些注解,我们来一一解析比较重要的几个注解吧

    • @Import(DiscoveryClientOptionalArgsConfiguration.class)引入了两个bean,RestTemplateDiscoveryClientOptionalArgs和MutableDiscoveryClientOptionalArgs ,这两个类的作用暂且不说。

    • @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)刚才说的Marker类的作用出来了。

    • @AutoConfigureBefore
      既然必须在这三个类完成自动装配之后才能进行装配,那就代表着这三个类肯定大有用途,研究一下吧

    • NoopDiscoveryClientAutoConfiguration
      故名思意,负责服务发现的类,咱们重点关注一下其中的几个方法

    • 1、init()方法

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnMissingBean(DiscoveryClient.class)
    @Deprecated
    public class NoopDiscoveryClientAutoConfiguration
            implements ApplicationListener<ContextRefreshedEvent> {
    
        @PostConstruct
        public void init() {
            String host = "localhost";
            try {
                host = InetAddress.getLocalHost().getHostName();
            }
            catch (UnknownHostException e) {
                this.log.warn("Cannot get host info: (" + e.getMessage() + ")");
            }
            int port = findPort();
            this.serviceInstance = new DefaultServiceInstance(
                    this.environment.getProperty("spring.application.name", "application"),
                    host, port, false);
        }
    }   
    

    这里构造了一个DefaultServiceInstance对象,这个对象包含了当前项目的ip+端口+项目名称。

    • 2、注入beanNoopDiscoveryClient
    @Bean
    public DiscoveryClient discoveryClient() {
        return new NoopDiscoveryClient(this.serviceInstance);
    }
    
    @Deprecated
    public class NoopDiscoveryClient implements DiscoveryClient {
    
        public NoopDiscoveryClient(ServiceInstance instance) {
        }
    
        @Override
        public String description() {
            return "Spring Cloud No-op DiscoveryClient";
        }
    
        @Override
        public List<ServiceInstance> getInstances(String serviceId) {
            return Collections.emptyList();
        }
    
        @Override
        public List<String> getServices() {
            return Collections.emptyList();
        }
    
    }
    

    这个类包含了获取当前实例以及当前服务的方法,其类图如下,在使用时肯定使用的是EurekaDiscoveryClient

    EurekaClientAutoConfiguration作为自动配置类,看看它主要配置了哪些东西

    配置当前实例信息

    配置实例信息包含很多,不过核心的无非就是名称、唯一标识、IP地址、端口等等

    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class,
            search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
            ManagementMetadataProvider managementMetadataProvider) {
        String hostname = getProperty("eureka.instance.hostname");
        boolean preferIpAddress = Boolean
                .parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
        String ipAddress = getProperty("eureka.instance.ip-address");
        boolean isSecurePortEnabled = Boolean
                .parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
    
        String serverContextPath = env.getProperty("server.servlet.context-path", "/");
        int serverPort = Integer.parseInt(
                env.getProperty("server.port", env.getProperty("port", "8080")));
    
        Integer managementPort = env.getProperty("management.server.port", Integer.class);
        String managementContextPath = env
                .getProperty("management.server.servlet.context-path");
        Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",
                Integer.class);
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
    
        instance.setNonSecurePort(serverPort);
        instance.setInstanceId(getDefaultInstanceId(env));
        instance.setPreferIpAddress(preferIpAddress);
        instance.setSecurePortEnabled(isSecurePortEnabled);
        if (StringUtils.hasText(ipAddress)) {
            instance.setIpAddress(ipAddress);
        }
    
        if (isSecurePortEnabled) {
            instance.setSecurePort(serverPort);
        }
    
        if (StringUtils.hasText(hostname)) {
            instance.setHostname(hostname);
        }
        String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
        String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
    
        if (StringUtils.hasText(statusPageUrlPath)) {
            instance.setStatusPageUrlPath(statusPageUrlPath);
        }
        if (StringUtils.hasText(healthCheckUrlPath)) {
            instance.setHealthCheckUrlPath(healthCheckUrlPath);
        }
    
        ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
                serverContextPath, managementContextPath, managementPort);
    
        if (metadata != null) {
            instance.setStatusPageUrl(metadata.getStatusPageUrl());
            instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
            if (instance.isSecurePortEnabled()) {
                instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
            }
            Map<String, String> metadataMap = instance.getMetadataMap();
            metadataMap.computeIfAbsent("management.port",
                    k -> String.valueOf(metadata.getManagementPort()));
        }
        else {
            // without the metadata the status and health check URLs will not be set
            // and the status page and health check url paths will not include the
            // context path so set them here
            if (StringUtils.hasText(managementContextPath)) {
                instance.setHealthCheckUrlPath(
                        managementContextPath + instance.getHealthCheckUrlPath());
                instance.setStatusPageUrlPath(
                        managementContextPath + instance.getStatusPageUrlPath());
            }
        }
    
        setupJmxPort(instance, jmxPort);
        return instance;
    }
    

    负责注册的Bean

    @Bean
    public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
    }
    

    自动注册调用的Bean

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
            value = "spring.cloud.service-registry.auto-registration.enabled",
            matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
            ApplicationContext context, EurekaServiceRegistry registry,
            EurekaRegistration registration) {
        return new EurekaAutoServiceRegistration(context, registry, registration);
    }
    

    Eureka待注册的对象

    这个对象会包含上面的eurekaInstanceIConfigBean

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
            value = "spring.cloud.service-registry.auto-registration.enabled",
            matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig,
            ApplicationInfoManager applicationInfoManager, @Autowired(
                    required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                .with(eurekaClient).with(healthCheckHandler).build();
    }
    

    Eureka client配置

    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class,
            search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
    }
    

    EurekaClient

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,
            search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,
                this.context);
    }
    

    其中CloudEurekaClient是DiscoveryClient的子类,而DiscoveryClient则是EurekaClient的核心类

    new CloudEurekaClient会调用父类DiscoveryClient的构造方法

    public class CloudEurekaClient extends DiscoveryClient {
    
        public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
                ApplicationEventPublisher publisher) {
            //这里会调用父类DiscoveryClient的构造方法
            super(applicationInfoManager, config, args);
            this.applicationInfoManager = applicationInfoManager;
            this.publisher = publisher;
            this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                    "eurekaTransport");
            ReflectionUtils.makeAccessible(this.eurekaTransportField);
        }
    }
    

    在DiscoveryClient中最终会调用到@Inject注解修饰的DiscoveryClient构造方法

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.endpointRandomizer = endpointRandomizer;
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                initRegistrySize = this.getApplications().size();
                registrySize = initRegistrySize;
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, initRegistrySize);
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry()) {
                try {
                    boolean primaryFetchRegistryResult = fetchRegistry(false);
                    if (!primaryFetchRegistryResult) {
                        logger.info("Initial registry fetch from primary servers failed");
                    }
                    boolean backupFetchRegistryResult = true;
                    if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                        backupFetchRegistryResult = false;
                        logger.info("Initial registry fetch from backup servers failed");
                    }
                    if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                    }
                } catch (Throwable th) {
                    logger.error("Fetch registry error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            //初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息)
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);
        }
    }
    

    initScheduledTasks()

    初始化调度任务(例如群集解析程序、心跳、服务实例同步、获取注册信息)

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        /**
         * 初始化所有计划的任务
         */
        private void initScheduledTasks() {
            //获取注册信息的定时任务
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                //心跳定时任务
                // Heartbeat timer
                heartbeatTask = new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                );
                scheduler.schedule(
                        heartbeatTask,
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                //服务实例同步定时任务
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                //初始化定时服务注册任务
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    }
    

    CacheRefreshThread——定时更新服务注册列表信息

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        class CacheRefreshThread implements Runnable {
            public void run() {
                缓存刷新(refreshRegistry)
                refreshRegistry();
            }
        }
    }
    
    缓存刷新(refreshRegistry)
    • 系统默认是每隔30秒刷新本地存储的注册表
    @Singleton
    public class DiscoveryClient implements EurekaClient {
        
        //缓存刷新
        @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
    
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }   
    }
    
    获取注册表——fetchRegistry
    @Singleton
    public class DiscoveryClient implements EurekaClient {
        
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            //用Stopwatch做耗时分析
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // 取出本地缓存的,之前获取的服务列表信息
                Applications applications = getApplications();
    
                //判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新:
                //1. 是否禁用增量更新;
                //2. 是否对某个region特别关注;
                //3. 外部调用时是否通过入参指定全量更新;
                //4. 本地还未缓存有效的服务列表信息;
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    //这些详细的日志可以看出触发全量更新的原因
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    //全量更新
                    getAndStoreFullRegistry();
                } else {
                    //增量更新
                    getAndUpdateDelta(applications);
                }
                //重新计算和设置一致性hash码
                applications.setAppsHashCode(applications.getReconcileHashCode());
                //日志打印所有应用的所有实例数之和
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
            onCacheRefreshed();
    
            //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
            //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
            //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    }
    

    全量更新本地缓存的服务列表

    • getAndStoreFullRegistry方法负责全量更新,代码如下所示,非常简单的逻辑:
    @Singleton
    public class DiscoveryClient implements EurekaClient {
        
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
            Applications apps = null;
            //由于并没有配置特别关注的region信息,
            //因此会调用eurekaTransport.queryClient.getApplications方法从服务端获取服务列表
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                //返回对象就是服务列表
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
                
            //考虑到多线程同步,只有CAS成功的线程,才会把自己从Eureka server获取的数据来替换本地缓存   
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                //localRegionApps就是本地缓存,是个AtomicReference实例
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    }
    

    getAndStoreFullRegistry方法中并无复杂逻辑,只有eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())这段需要展开细看,和Eureka server交互的逻辑都在这里面,方法getApplications的具体实现是在EurekaHttpClientDecorator类:

    public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
    
        @Override
        public EurekaHttpResponse<Applications> getApplications(final String... regions) {
            return execute(new RequestExecutor<Applications>() {
                @Override
                public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
                    return delegate.getApplications(regions);
                }
    
                @Override
                public RequestType getRequestType() {
                    //本次向Eureka server请求的类型:获取服务列表
                    return RequestType.GetApplications;
                }
            });
        }
    }
    

    EurekaHttpClientDecorator类从名字看是个装饰者模式的实现,看它的其他代码,发现各类远程服务都在此被封装成API了,例如注册的:

    @Override
    public EurekaHttpResponse<Void> register(final InstanceInfo info) {
        return execute(new RequestExecutor<Void>() {
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                return delegate.register(info);
            }
    
            @Override
            public RequestType getRequestType() {
                return RequestType.Register;
            }
        });
    }
    

    还有续租的:

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
                                                          final String id,
                                                          final InstanceInfo info,
                                                          final InstanceStatus overriddenStatus) {
        return execute(new RequestExecutor<InstanceInfo>() {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
                return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
            }
    
            @Override
            public RequestType getRequestType() {
                return RequestType.SendHeartBeat;
            }
        });
    }
    

    再继续追踪 delegate.register(info),进入了AbstractJerseyEurekaHttpClient类,这里面是各种网络请求的具体实现,EurekaHttpClientDecorator类中的getApplications、register、sendHeartBeat等方法对应的网络请求响应逻辑在AbstractJerseyEurekaHttpClient中都有具体实现,篇幅所限我们只关注getApplications:

    public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
    
        @Override
        public EurekaHttpResponse<Applications> getApplications(String... regions) {
            //取全量数据的path是""apps"
            return getApplicationsInternal("apps/", regions);
        }
    
        @Override
        public EurekaHttpResponse<Applications> getDelta(String... regions) {
            //取增量数据的path是""apps/delta"
            return getApplicationsInternal("apps/delta", regions);
        }
        
        //具体的请求响应处理都在此方法中
        private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
            ClientResponse response = null;
            String regionsParamValue = null;
            try {
                //jersey、resource这些关键词都预示着这是个restful请求  
                WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
                if (regions != null && regions.length > 0) {
                    regionsParamValue = StringUtil.join(regions);
                    webResource = webResource.queryParam("regions", regionsParamValue);
                }
                Builder requestBuilder = webResource.getRequestBuilder();
                addExtraHeaders(requestBuilder);
                //发起网络请求,将响应封装成ClientResponse实例
                response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
    
                Applications applications = null;
                if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
                    //取得全部应用信息
                    applications = response.getEntity(Applications.class);
                }
                return anEurekaHttpResponse(response.getStatus(), Applications.class)
                        .headers(headersOf(response))
                        .entity(applications)
                        .build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
                            serviceUrl, urlPath,
                            regionsParamValue == null ? "" : "regions=" + regionsParamValue,
                            response == null ? "N/A" : response.getStatus()
                    );
                }
                if (response != null) {
                    response.close();
                }
            }
        }   
    }
    

    上述代码中,利用jersey-client库的API向Eureka server发起restful请求,并将响应数据封装到EurekaHttpResponse实例中返回;

    小结:获取全量数据,是通过jersey-client库的API向Eureka server发起restful请求实现的,并将响应的服务列表数据放在一个成员变量中作为本地缓存;

    获取服务列表信息的增量更新

    获取服务列表信息的增量更新是通过getAndUpdateDelta方法完成的,具体分析请看下面的中文注释:

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            //增量信息是通过eurekaTransport.queryClient.getDelta方法完成的
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                //delta中保存了Eureka server返回的增量更新
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                //如果增量信息为空,就直接发起一次全量更新
                getAndStoreFullRegistry();
            } 
            //考虑到多线程同步问题,这里通过CAS来确保请求发起到现在是线程安全的,
            //如果这期间fetchRegistryGeneration变了,就表示其他线程也做了类似操作,因此放弃本次响应的数据
            else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        //用Eureka返回的增量数据和本地数据做合并操作,这个方法稍后会细说
                        updateDelta(delta);
                        //用合并了增量数据之后的本地数据来生成一致性哈希码
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                //Eureka server在返回增量更新数据时,也会返回服务端的一致性哈希码,
                //理论上每次本地缓存数据经历了多次增量更新后,计算出的一致性哈希码应该是和服务端一致的,
                //如果发现不一致,就证明本地缓存的服务列表信息和Eureka server不一致了,需要做一次全量更新
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    //一致性哈希码不同,就在reconcileAndLogDifference方法中做全量更新
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }
    }
    

    上述代码中有几处需要注意:

    • a、获取增量更新数据使用的方法是:eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    • b、将增量更新的数据和本地缓存合并的方法是: updateDelta(delta);
    • c、通过检查一致性哈希码可以确定历经每一次增量更新后,本地的服务列表信息和Eureka server上的是否还保持一致,若不一致就要做一次全量更新,通过调用reconcileAndLogDifference方法来完成;

    上述a、b、c三点,接下来依次展开:

    • 1、向Eureka server发起网络请求的逻辑和前面全量更新的差不多,也是EurekaHttpClientDecorator和AbstractJerseyEurekaHttpClient这两个类合作实现的,先看EurekaHttpClientDecorator部分:
    @Override
    public EurekaHttpResponse<Applications> getDelta(final String... regions) {
        return execute(new RequestExecutor<Applications>() {
            @Override
            public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
                return delegate.getDelta(regions);
            }
    
            @Override
            public RequestType getRequestType() {
                return RequestType.GetDelta;
            }
        });
    }
    
    • 2、再看AbstractJerseyEurekaHttpClient类中的getDelta方法,居然和全量获取服务列表数据调用了相同的方法getApplicationsInternal,只是ur参数不一样而已;
    @Override
    public EurekaHttpResponse<Applications> getDelta(String... regions) {
        return getApplicationsInternal("apps/delta", regions);
    }
    

    由上述代码可见,从Eureka server的获取增量更新,和一些常见的方式略有区别:

    • a、一般的增量更新是在请求中增加一个时间戳或者上次更新的tag号等参数,由服务端根据参数来判断哪些数据是客户端没有的;

    • b、而这里的Eureka client却没有这类参数,联想到前面官方文档中提到的“Eureka会把更新数据保留三分钟”,就可以理解了:Eureka把最近的变更数据保留三分钟,这三分钟内每个Eureka client来请求增量更新时,server都返回同样的缓存数据,只要client能保证三分钟之内有一次请求,就能保证自己的数据和Eureka server端的保持一致;

    • c、那么如果client有问题,导致超过三分钟才来获取增量更新数据,那就有可能client和server数据不一致了,此时就要有一种方式来判断是否不一致,如果不一致,client就会做一次全量更新,这种判断就是一致性哈希码;

    • 3、Eureka client获取到增量更新后,通过updateDelta方法将增量更新数据和本地数据做合并:

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        private void updateDelta(Applications delta) {
            int deltaCount = 0;
            //遍历所有服务
            for (Application app : delta.getRegisteredApplications()) {
                //遍历当前服务的所有实例
                for (InstanceInfo instance : app.getInstances()) {
                    //取出缓存的所有服务列表,用于合并
                    Applications applications = getApplications();
                    String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                    //判断正在处理的实例和当前应用是否在同一个region
                    if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                        //如果不是同一个region,接下来合并的数据就换成专门为其他region准备的缓存
                        Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                        if (null == remoteApps) {
                            remoteApps = new Applications();
                            remoteRegionVsApps.put(instanceRegion, remoteApps);
                        }
                        applications = remoteApps;
                    }
    
                    ++deltaCount;
                    //对新增的实例的处理
                    if (ActionType.ADDED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                    } 
                    //对修改实例的处理
                    else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                    } 
                    //对删除实例的处理
                    else if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp != null) {
                            logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                            existingApp.removeInstance(instance);
                            /*
                             * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                             * if instance list is empty, we remove the application.
                             */
                            if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                                applications.removeApplication(existingApp);
                            }
                        }
                    }
                }
            }
            logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
            getApplications().setVersion(delta.getVersion());
            //整理数据,使得后续使用过程中,这些应用的实例总是以相同顺序返回
            getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            //和当前应用不在同一个region的应用,其实例数据也要整理
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.setVersion(delta.getVersion());
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
        }
    }
    

    上述代码有几点需要注意:

    • a、检查每个服务的region,如果跨region的,就合并到另一个专门存放跨region服务的缓存中;
    • b、增量数据中,对每个应用下实例的变动,分为新增、修改、删除三种,合并的过程就是对这三种数据在本地缓存中做不同的处理;
    • c、合并过程中还会对缓存数据做整理,这样后续每次使用时,获取的多个实例其顺序是一样的;

    前面曾经提到,如果Eureka client不及时做增量更新,那么有可能会错过Eureka server上的数据变化,导致两边的服务列表信息不一致,这个问题会通过一致性哈希码对比发现,发现后如何处理呢?先看增量更新的getAndUpdateDelta方法中的一个注释,如下图红框所示,个人觉得这个注释写得很好,内容既简洁又重要:


    上图红框中提醒:此处会发生一次远程调用,这说明发现Eureka server和Eureka client保存的服务列表数据不一致时会向Eureka server发起一次请求,打开reconcileAndLogDifference方法看详情:

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {
            logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",
                    reconcileHashCode, delta.getAppsHashCode());
    
            RECONCILE_HASH_CODES_MISMATCH.increment();
    
            long currentUpdateGeneration = fetchRegistryGeneration.get();
            //从Eureka server获取全量数据
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            Applications serverApps = httpResponse.getEntity();
    
            if (serverApps == null) {
                logger.warn("Cannot fetch full registry from the server; reconciliation failure");
                return;
            }
            //CAS成功就把全量数据更新到本地缓存中
            if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(serverApps));
                getApplications().setVersion(delta.getVersion());
                logger.debug(
                        "The Reconcile hashcodes after complete sync up, client : {}, server : {}.",
                        getApplications().getReconcileHashCode(),
                        delta.getAppsHashCode());
            } else {
                logger.warn("Not setting the applications map as another thread has advanced the update generation");
            }
        }
    }
    

    上述代码较简单:从Eureka server获取全量数据,再尝试CAS,如果成功就更新本地缓存数据;

    至此,全量和增量更新的源码都看过了,接下来看看更新完数据后的两次广播:更新缓存和状态变化(有变化才广播);

    广播:更新缓存

    更新缓存的广播是在onCacheRefreshed方法中执行的,该方法在扩展类CloudEurekaClient中被覆盖:

    public class CloudEurekaClient extends DiscoveryClient {
    
        @Override
        protected void onCacheRefreshed() {
            super.onCacheRefreshed();
    
            if (this.cacheRefreshedCount != null) { // might be called during construction and
                // will be null
                long newCount = this.cacheRefreshedCount.incrementAndGet();
                log.trace("onCacheRefreshed called with count: " + newCount);
                
                //spring容器内的广播
                this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
            }
        }
    }
    

    上述代码显示,这是个spring容器内的广播,this.publisher的类型是ApplicationEventPublisher。

    广播:本地状态变化

    从Eureka server中取得的服务列表,自然也包括当前应用自己的信息,这个信息会保存在成员变量lastRemoteInstanceStatus中,每次更新了缓存后,都会用缓存中的信息和lastRemoteInstanceStatus对比,如果不一致,就表示在Eureka server端记录的当前应用状态发生了变化,此时就广播一次;

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        private synchronized void updateInstanceRemoteStatus() {
            // Determine this instance's status for this app and set to UNKNOWN if not found
            InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
            if (instanceInfo.getAppName() != null) {
                Application app = getApplication(instanceInfo.getAppName());
                if (app != null) {
                    InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                    if (remoteInstanceInfo != null) {
                        currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                    }
                }
            }
            if (currentRemoteInstanceStatus == null) {
                currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
            }
    
            // Notify if status changed
            if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
                //这里发起广播
                onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
                lastRemoteInstanceStatus = currentRemoteInstanceStatus;
            }
        }
    }
    

    最准确的说明信息来自Netflix的官方文档,地址:https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#fetch-registry

    小结:

    • 1、官方文档对整个过程做了准确的总结,围绕着这些总结去看代码,能够事半功倍,重要是整个过程都保持的正确的方向,不会由于细节的干扰而偏离主线;
    • 2、Eureka的注册中心设计,尽管多个client轮询请求会增加服务器压力,但使用增量更新再加上Server自身缓存3分钟数据的方式,可以有效的减少数据量和相关的计算,再加上一致性哈希码来弥补增量更新的弊端,在性能和完整性方面都有了保证,另外增量更新不需要client的时间戳,这样既节省性能又简化了实现逻辑,这种设计方式值得我们学习;

    HeartbeatThread——服务定时续约线程

    HeartbeatThread类中,通过调用renew方法实现续租,如下代码所示,方法注释已说明是Restfult请求来实现的,对应Eureka server的返回信息httpResponse,除了检查返回码是否等于200就没有任何作用了,想想也是如此,30秒一次的心跳,不论是请求还是响应都应该尽量简洁,降低服务器和网络的压力:

    @Singleton
    public class DiscoveryClient implements EurekaClient {
    
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
        
        //服务续约
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                //发Restful请求,即心跳
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                //404错误会触发注册逻辑
                if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                //返回码200表示心跳成功
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }   
    }
    

    继续展开上面代码段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源码在EurekaHttpClientDecorator类中:

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
                                                          final String id,
                                                          final InstanceInfo info,
                                                          final InstanceStatus overriddenStatus) {
        return execute(new RequestExecutor<InstanceInfo>() {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
                //网络处理委托给代理类完成
                return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
            }
    
            @Override
            public RequestType getRequestType() {
                //请求类型为心跳
                return RequestType.SendHeartBeat;
            }
        });
    }
    

    继续展开delegate.sendHeartBeat,多层调用一路展开,最终由JerseyApplicationClient类来完成操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息PUT到Eureka server,注意:这里不是POST,也不是GET,而是PUT:

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            //请求参数有两个:Eureka client自身状态、自身关键信息(状态、元数据等)最后一次变化的时间
            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
             //注意:这里不是POST,也不是GET,而是PUT
            response = requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity() &&
                    !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    至此,Eureka client向服务续租的源码就分析完毕了,过程相对简单,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等实例各司其职,定时发送PUT请求到Eureka server。

    最准确的说明信息来自Netflix的官方文档,地址:https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew

    参考:
    https://www.cnblogs.com/zhixiang-org-cn/p/11689212.html

    https://blog.csdn.net/u010647035/article/details/83245433

    https://xinchen.blog.csdn.net/article/details/82915355

    https://blog.csdn.net/boling_cavalry/article/details/82813180

    相关文章

      网友评论

        本文标题:Spring Cloud——Eureka Client启动流程分

        本文链接:https://www.haomeiwen.com/subject/sywjwltx.html