美文网首页
Spring Cloud Eureka 源码分析 —— Clie

Spring Cloud Eureka 源码分析 —— Clie

作者: 想起个帅气的头像 | 来源:发表于2021-01-09 22:21 被阅读0次

    一. 前言

    eureka的client端主要完成几件事情:

    1. 服务实例的注册
    2. 服务实例的续约
    3. 拉取server端的注册表

    整个源码有几个重点类值得关注:

    类名 说明
    EurekaClientConfigBean 配置参数的bean,保存eureka.client.xxx的所有参数
    EurekaInstanceConfigBean 配置参数的bean,保存eureka.instance.xxx的所有参数
    InstanceInfo 服务实例信息的封装类
    ApplicationInfoManager 封装注册到eureka的应用信息,包括实例信息,参数配置,实例状态等,并管理实例的各种操作
    CloudEurekaClient 发起操作的客户端类,如register、fetch、heartbeat

    二. 源码分析

    1. 启动配置bean注入

    EurekaClientAutoConfiguration

    EurekaClientConfigBean
        @Bean
        @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
        public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
            return new EurekaClientConfigBean();
        }
    

    EurekaClientConfigBean 对应的配置前缀是eureka.client,从配置文件读取的配置会保存到此bean中

    常用配置:
    参数 说明
    defaultZone 默认zone的地址
    transport 各种通信配置
    registryFetchIntervalSeconds 每次从eureka服务端拉取服务列表的时间间隔(默认30s)
    instanceInfoReplicationIntervalSeconds 复制实例变化信息到eureka服务端的时间间隔(默认30s)
    initialInstanceInfoReplicationIntervalSeconds 初始时复制实例信息到eureka服务端所需的时间(默认40s)
    eurekaServiceUrlPollIntervalSeconds 拉取eureka服务端地址更改的时间间隔(默认300s)
    registerWithEureka 本实例是否注册到eureka服务端
    EurekaInstanceConfigBean
        @Bean
        @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
        public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
                ManagementMetadataProvider managementMetadataProvider) {
            String hostname = getProperty("eureka.instance.hostname");
            boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
            String ipAddress = getProperty("eureka.instance.ip-address");
            boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
    
            String serverContextPath = env.getProperty("server.servlet.context-path", "/");
            int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));
    
            Integer managementPort = env.getProperty("management.server.port", Integer.class);
            String managementContextPath = env.getProperty("management.server.servlet.context-path");
            Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
            EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
    
            instance.setNonSecurePort(serverPort);
            instance.setInstanceId(getDefaultInstanceId(env));
            instance.setPreferIpAddress(preferIpAddress);
            instance.setSecurePortEnabled(isSecurePortEnabled);
            if (StringUtils.hasText(ipAddress)) {
                instance.setIpAddress(ipAddress);
            }
    
            if (isSecurePortEnabled) {
                instance.setSecurePort(serverPort);
            }
    
            if (StringUtils.hasText(hostname)) {
                instance.setHostname(hostname);
            }
            String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
            String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
    
            if (StringUtils.hasText(statusPageUrlPath)) {
                instance.setStatusPageUrlPath(statusPageUrlPath);
            }
            if (StringUtils.hasText(healthCheckUrlPath)) {
                instance.setHealthCheckUrlPath(healthCheckUrlPath);
            }
    
            ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
                    managementContextPath, managementPort);
    
            if (metadata != null) {
                instance.setStatusPageUrl(metadata.getStatusPageUrl());
                instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
                if (instance.isSecurePortEnabled()) {
                    instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
                }
                Map<String, String> metadataMap = instance.getMetadataMap();
                metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
            }
            else {
                // without the metadata the status and health check URLs will not be set
                // and the status page and health check url paths will not include the
                // context path so set them here
                if (StringUtils.hasText(managementContextPath)) {
                    instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
                    instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
                }
            }
    
            setupJmxPort(instance, jmxPort);
            return instance;
        }
    

    EurekaInstanceConfigBean 对应的配置前缀是eureka.instance,从配置文件读取的实例信息配置会保存到此bean中

    常用配置:
    参数 说明
    instanceId 注册到eureka服务端实例id,默认是ip:服务名:端口
    hostname 注册的主机名, 默认使用主机名注册
    preferIpAddress 是否开启ip注册
    ipAddress 注册的ip地址
    leaseRenewalIntervalInSeconds 向eureka服务端续约的时间间隔(默认30s)
    leaseExpirationDurationInSeconds 向eureka服务端设置续约到期的时间间隔(默认90s),超过此间隔服务端会下掉该服务
    initialStatus 注册到eureka服务端的初始化状态,默认UP
    EurekaServiceRegistry
        @Bean
        public EurekaServiceRegistry eurekaServiceRegistry() {
            return new EurekaServiceRegistry();
        }
    

    EurekaServiceRegistry 实现了ServiceRegistry,是实例注册的具体实现类,内部通过register完成服务注册事件的发送动作。

    EurekaRegistration
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
                CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
                @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
            return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
                        .with(healthCheckHandler).build();
        }
    

    EurekaRegistration 是真正被注册的对象,内部封装了客户端所有的注册信息,是org.springframework.cloud.client.serviceregistry.ServiceInstance此标准的具体实现类

    ApplicationInfoManager
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }
    
    public InstanceInfo create(EurekaInstanceConfig config) {
            LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                    .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
    
            // Builder the instance information to be registered with eureka
            // server
            InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
    
            String namespace = config.getNamespace();
            if (!namespace.endsWith(".")) {
                namespace = namespace + ".";
            }
            builder.setNamespace(namespace).setAppName(config.getAppname()).setInstanceId(config.getInstanceId())
                    .setAppGroupName(config.getAppGroupName()).setDataCenterInfo(config.getDataCenterInfo())
                    .setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
                    .setPort(config.getNonSecurePort())
                    .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled())
                    .setSecurePort(config.getSecurePort())
                    .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
                    .setVIPAddress(config.getVirtualHostName()).setSecureVIPAddress(config.getSecureVirtualHostName())
                    .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                    .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
                    .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(),
                            config.getSecureHealthCheckUrl())
                    .setASGName(config.getASGName());
    
            // Start off with the STARTING state to avoid traffic
            if (!config.isInstanceEnabledOnit()) {
                InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
                if (log.isInfoEnabled()) {
                    log.info("Setting initial instance status as: " + initialStatus);
                }
                builder.setStatus(initialStatus);
            }
            else {
                if (log.isInfoEnabled()) {
                    log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP
                            + ". This may be too early for the instance to advertise itself as available. "
                            + "You would instead want to control this via a healthcheck handler.");
                }
            }
    
            // Add any user-specific metadata information
            for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
                String key = mapEntry.getKey();
                String value = mapEntry.getValue();
                // only add the metadata if the value is present
                if (value != null && !value.isEmpty()) {
                    builder.add(key, value);
                }
            }
    
            InstanceInfo instanceInfo = builder.build();
            instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
            return instanceInfo;
        }
    

    首先创建了初始化状态的InstanceInfo,根据配置对InstanceInfo的大量属性赋值。
    构建ApplicationInfoManager内部封装了注册到eureka的应用信息,包括实例信息,参数配置,实例状态等。
    InstanceInfo对象也会被设置到到ApplicationInfoManager中。

    CloudEurekaClient
            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            @org.springframework.cloud.context.config.annotation.RefreshScope
            @Lazy
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
                    EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
                // If we use the proxy of the ApplicationInfoManager we could run into a problem
                // when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
                // requested but wont be allowed because we are shutting down. To avoid this
                // we use the object directly.
                ApplicationInfoManager appManager;
                if (AopUtils.isAopProxy(manager)) {
                    appManager = ProxyUtils.getTargetObject(manager);
                }
                else {
                    appManager = manager;
                }
                CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
                        this.context);
                cloudEurekaClient.registerHealthCheck(healthCheckHandler);
                return cloudEurekaClient;
            }
    

    这个bean非常重要,在内部完成了几乎所有客户端操作的定义,是发起操作的客户端类。重点在构造方法中实现,下文会具体分析。

    2. SmartLifecycle的启动点

    eureka客户端的注册动作,通过SmartLifecycle来触发,对应的实现类是EurekaAutoServiceRegistration

    public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
    
        private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);
    
        private AtomicBoolean running = new AtomicBoolean(false);
    
        private int order = 0;
    
        private AtomicInteger port = new AtomicInteger(0);
    
        private ApplicationContext context;
    
        private EurekaServiceRegistry serviceRegistry;
    
        private EurekaRegistration registration;
    
        public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
                EurekaRegistration registration) {
            this.context = context;
            this.serviceRegistry = serviceRegistry;
            this.registration = registration;
        }
    
        @Override
        public void start() {
            ...
            // only initialize if nonSecurePort is greater than 0 and it isn't already running
            // because of containerPortInitializer below
            // getNonSecurePort() 就是项目的启动端口
            if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
    
                this.serviceRegistry.register(this.registration);
    
                this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
                this.running.set(true);
            }
        }
    

    直接看start方法,根据初始化参数,直接进入if方法。有两个关键方法,分别是register()开始注册, publishEvent 发布当前实例注册的事件。

    EurekaServiceRegistry

    public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
    
        private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);
    
        @Override
        public void register(EurekaRegistration reg) {
            maybeInitializeClient(reg);
    
            if (log.isInfoEnabled()) {
                log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                        + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
            }
    
            reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    
            reg.getHealthCheckHandler()
                    .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
        }
    
        // 这个方法的实际作用是让ApplicationInfoManager和CloudEurekaClient被使用,来触发bean注入,这两个bean都是lazy的
        private void maybeInitializeClient(EurekaRegistration reg) {
            // force initialization of possibly scoped proxies
            reg.getApplicationInfoManager().getInfo();
            reg.getEurekaClient().getApplications();
        }
    

    maybeInitializeClient 触发了后续两个重要的bean:ApplicationInfoManagerEurekaClient的实例化过程。
    ApplicationInfoManager存放了InstanceInfo的信息和InstanceConfig的所有配置信息。
    EurekaClient(CloudEurekaClient) 在构造过程中,完成了整个客户端的注册、向服务端进行数据同步,schedule任务的定义和开启。

    依次介绍这两个类的创建过程。

    new ApplicationInfoManager()

    bean的定义在上面的AutoConfiguration中

        @Inject
        public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
            this.config = config;
            this.instanceInfo = instanceInfo;
            this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
            if (optionalArgs != null) {
                this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
            } else {
                this.instanceStatusMapper = NO_OP_MAPPER;
            }
    
            // Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
            instance = this;
        }
    

    这里首先通过工厂创建了一个InstanceInfo的对象,create方法将EurekaInstanceConfig里的所有配置通过builder模式赋值到InstanceInfo中。
    并在内部初始化了两个属性,listeners和instanceStatusMapper。

    new CloudEurekaClient()

    bean的定义在上面的AutoConfiguration中

    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
                AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
            super(applicationInfoManager, config, args);
            this.applicationInfoManager = applicationInfoManager;
            this.publisher = publisher;
            this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
            ReflectionUtils.makeAccessible(this.eurekaTransportField);
        }
    

    先调用父类的super构造方法

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
            this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
                private volatile BackupRegistry backupRegistryInstance;
    
                @Override
                public synchronized BackupRegistry get() {
                    if (backupRegistryInstance == null) {
                        String backupRegistryClassName = config.getBackupRegistryImpl();
                        if (null != backupRegistryClassName) {
                            try {
                                backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                                logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                            } catch (InstantiationException e) {
                                logger.error("Error instantiating BackupRegistry.", e);
                            } catch (IllegalAccessException e) {
                                logger.error("Error instantiating BackupRegistry.", e);
                            } catch (ClassNotFoundException e) {
                                logger.error("Error instantiating BackupRegistry.", e);
                            }
                        }
    
                        if (backupRegistryInstance == null) {
                            logger.warn("Using default backup registry implementation which does not do anything.");
                            backupRegistryInstance = new NotImplementedRegistryImpl();
                        }
                    }
    
                    return backupRegistryInstance;
                }
            }, randomizer);
        }
    

    在构造方法的初始逻辑前,先初始化了BackupRegistry的对象,这个是当eureka所有的url都无法连接时,会按照backupRegistryImpl指定的自定义类来实现,默认使用NotImplementedRegistryImpl,内部什么都不处理。

    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            // 是否用自定义参数来覆盖
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            // -------------   将构造方法的参数 给全局参数赋值   -----------------
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.endpointRandomizer = endpointRandomizer;
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
            // -------------   将构造方法的参数 给全局参数赋值   -----------------
    
            // 如果开启了拉取注册表,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
            
            // 如果开启了注册到eurkea,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
            //如果既不需要拉取,又不需要注,则直接结束
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                initRegistrySize = this.getApplications().size();
                registrySize = initRegistrySize;
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, initRegistrySize);
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            // 否则就完成整个client端初始化的动作。
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                // 线程池创建两个线程,分别给心跳和刷新cache使用
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                // 用于执行心跳任务的线程池
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
                 
               // 用于执行刷新缓存任务的线程池
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                // 完成eurekaTransport的创建和内部属性的初始化
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                // 初始化默认的az和region的对应关系
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                // 构建instanceRegionChecker 用于查询InstanceInfo对应的region,判断是否localRegion等
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry()) {
                try {
                    // 重要方法,具体说明看fetchRegistry的分析
                    boolean primaryFetchRegistryResult = fetchRegistry(false);
                    // 如果拉取失败,则会输出如下异常
                    if (!primaryFetchRegistryResult) {
                        logger.info("Initial registry fetch from primary servers failed");
                    }
                    boolean backupFetchRegistryResult = true;
                    // 拉取失败则调用backup的实现类
                    if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                        backupFetchRegistryResult = false;
                        logger.info("Initial registry fetch from backup servers failed");
                    }
                    if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                        throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                    }
                } catch (Throwable th) {
                    logger.error("Fetch registry error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            // 提供了一个扩展点,需要在发起注册之前,进行一些操作。
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            // 在初始化阶段强制注册,这里shouldEnforceRegistrationAtInit()默认值是false,如果注册失败会抛出异常
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            // 关键方法,初始化各种schedule任务,包括心跳、拉取注册表等,具体说明看initScheduledTasks的分析
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);
        }
    
    fetchRegistry()
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                // Application 内部封装了从eureka服务端返回的所有注册表信息,第一次启动时这里时空的,在拉取全量或增量时更新
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    // 全量拉取注册表信息
                    getAndStoreFullRegistry();
                } else {
                    // 增量拉取注册表信息
                    getAndUpdateDelta(applications);
                }
                // 这里的hashCode用来判断拉取跟上次相比是否发生过变更
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                        appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
    

    这个fetchRegistry方法会在启动时调用,以及schedule任务执行时,每次刷新客户端缓存时调用。主要做了几件事情。

    1. 如果是启动时,会全量拉取eureka的注册信息,调用/eureka/apps
      private void getAndStoreFullRegistry() throws Throwable {
            ...
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            ...
    

    这里的apps就是从eureka服务端拉取的全量注册表信息,如下图


    1. 如果是schedule任务会增量拉取,调用getAndUpdateDelta(),内部通过http请求/eureka/apps/delta,这里的delta就是从eureka服务端拉取的增量注册表信息。
       private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
            if (delta == null) {
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        // 获取到增量列表后,依次遍历每一个application,更新到本地cache中
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } 
        ...
    
    1. 调用onCacheRefreshed(),刷新客户端缓存。子类CloudEurekaClient重写了onCacheRefresh,由eventListeners来处理CacheRefreshedEvent,并发出HeartbeatEvent事件。
        @Override
        protected void onCacheRefreshed() {
            super.onCacheRefreshed();
    
            if (this.cacheRefreshedCount != null) { // might be called during construction and
                // will be null
                long newCount = this.cacheRefreshedCount.incrementAndGet();
                log.trace("onCacheRefreshed called with count: " + newCount);
                this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
            }
        }
    
        protected void onCacheRefreshed() {
            fireEvent(new CacheRefreshedEvent());
        }
    
        protected void fireEvent(final EurekaEvent event) {
            for (EurekaEventListener listener : eventListeners) {
                try {
                    listener.onEvent(event);
                } catch (Exception e) {
                    logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
                }
            }
        }
    
    1. 调用updateInstanceRemoteStatus将当前实例的状态更新到eureka服务端。
      private synchronized void updateInstanceRemoteStatus() {
            // Determine this instance's status for this app and set to UNKNOWN if not found
            InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
            if (instanceInfo.getAppName() != null) {
                Application app = getApplication(instanceInfo.getAppName());
                if (app != null) {
                    InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                    if (remoteInstanceInfo != null) {
                        // 本次拉取的服务状态
                        currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                    }
                }
            }
            if (currentRemoteInstanceStatus == null) {
                currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
            }
    
            // Notify if status changed
            // 本次拉取的服务状态 和 上次拉取的服务状态不一致
            if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
                onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
                lastRemoteInstanceStatus = currentRemoteInstanceStatus;
            }
        }
    
      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
            fireEvent(new StatusChangeEvent(oldStatus, newStatus));
        }
    

    先将服务远程的当前状态跟上一次拉取的状态进行比较,如果一致,则结束,不一致则发出一个StatusChangeEvent事件。

    如果成功从eureka 服务端拉取注册表成功,则fetchRegistry返回true。

    initScheduledTasks()
        // 初始化所有的schedule任务
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                // 定义cacheRefreshTask,schedule每隔registryFetchIntervalSeconds来刷新客户端缓存
                cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
                scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                // 定义heartbeatTask,schedule每隔renewalIntervalInSecs来发起心跳(续约)
                heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
                scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                // 实例信息的复制器,内部包含schedule,同步到eureka服务端
                instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
    
                // 定义了状态变更的listener,并注册到applicationInfoManager中
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
                // 开启schedule任务
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    
    1. 首先给cache和heartbeat创建了两个TimedSupervisorTask,定义如下,scheduler会每隔一定时间来执行task。
    2. 创建了instanceInfoReplicator,是实例信息变化的复制器,当发现出现变化时,内部通过register方法将新的信息同步到eureka服务端。
    3. 创建了statusChangeListener,当触发状态变更回调时,调用instanceInfoReplicator.onDemandUpdate(); 并将其添加到applicationInfoManager中。
    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                                   int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
            this.name = name;
            this.scheduler = scheduler;
            this.executor = executor;
            this.timeoutMillis = timeUnit.toMillis(timeout);
            this.task = task;
            this.delay = new AtomicLong(timeoutMillis);
            this.maxDelay = timeoutMillis * expBackOffBound;
            ...
        }
    
    @Override
        public void run() {
            Future<?> future = null;
            try {
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();
            } catch (TimeoutException e) {
                logger.warn("task supervisor timed out", e);
                timeoutCounter.increment();
    
                long currentDelay = delay.get();
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                delay.compareAndSet(currentDelay, newDelay);
            } finally {
                if (future != null) {
                    future.cancel(true);
                }
    
                if (!scheduler.isShutdown()) {
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }
        }
    

    这里还需要 重点说明下run的实现
    scheduler.schedule在调度任务时,只传了delay的时间,并没有传周期时间,而执行周期其实是会动态变化的,这里的动态变化是通过scheduler.schedule的递归来完成。

    可以看到在try块中,delay的值会被重置,并且如果出现超时,则将newDelay可能设置成2倍的时间,再在finally中再次执行schedule,延迟的时间就是2倍的delay,这是一种动态周期的schedule实现方案

    cacheRefreshTask
    class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    
        @VisibleForTesting
        void refreshRegistry() {
            try {
                ...
                //remoteRegionsModified  
                // 前文判断根据remoteRegions是否发生变化来决定是全量拉取还是增量拉取
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    // 这里的localRegionApps是拉取注册表信息内容的本地缓存
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
                ...
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
    

    这个task其实就是调用了fetchRegistry,拉取最新的注册表,刷新本地cache。

    heartbeatTask
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                ... //这里判断如果是404的请求,则再发起一次注册请求,同步实例信息
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    

    这个task就是发起了一个http请求,调用了sendHearBeat方法,发起了一次心跳续约。

    InstanceInfoReplicator
       public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                instanceInfo.setIsDirty();  // for initial register
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    
         public boolean onDemandUpdate() {
            if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
                if (!scheduler.isShutdown()) {
                    scheduler.submit(new Runnable() {
                        @Override
                        public void run() {
                            logger.debug("Executing on-demand update of local InstanceInfo");
        
                            Future latestPeriodic = scheduledPeriodicRef.get();
                            if (latestPeriodic != null && !latestPeriodic.isDone()) {
                                logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                                latestPeriodic.cancel(false);
                            }
        
                            InstanceInfoReplicator.this.run();
                        }
                    });
                    return true;
                } else {
                    logger.warn("Ignoring onDemand update due to stopped scheduler");
                    return false;
                }
            } else {
                logger.warn("Ignoring onDemand update due to rate limiter");
                return false;
            }
        }
    
        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

    InstanceInfoReplicator 是实例信息的同步器,作用是当发生变更时(状态、地址、ip、配置参数值等),将新的InstanceInfo同步给eureka服务端。
    这里的三个方法:
    start():在初始化时,调用start开启schedule任务,每隔一段时间来调用run()检测变更。
    onDemandUpdate():在触发状态变更时,作为listener来调用,也会调用run()检测变更。
    run():检测当前InstanceInfo是否跟上次相同,如果出现变更,则调用discoveryClient.register();将新的信息同步到eureka服务端。依据则是instanceInfo是否被标记为isDirty

    而标记的依据在discoveryClient.refreshInstanceInfo();中实现。

    /**
         * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
         * isDirty flag on the instanceInfo is set to true
         */
        void refreshInstanceInfo() {
            // 判断地址、ip、实例所在的部署节点是否变更
            applicationInfoManager.refreshDataCenterInfoIfRequired();
            // 判断各种续约配置值(LeaseInfo) 是否变更
            applicationInfoManager.refreshLeaseInfoIfRequired();
    
            InstanceStatus status;
            try {
                status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
            } catch (Exception e) {
                logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
                status = InstanceStatus.DOWN;
            }
    
            if (null != status) {
                applicationInfoManager.setInstanceStatus(status);
            }
        }
    

    先根据两个refresh方法来判断是否变更,如果变更,内部会调用instanceInfo.setIsDirty();来进行标记。

    public synchronized void setIsDirty() {
            isInstanceInfoDirty = true;
            lastDirtyTimestamp = System.currentTimeMillis();
        }
    

    至此,initScheduledTasks分析完成。内部分别启动了三个schedule,分别用于将新的注册表刷新本地cache发送心跳续约发送客户端最新的实例信息

    再回到DiscoveryClient的构造方法,在initScheduledTasks执行完成后,就是注册JMX bean,设置一些全局属性。

    CloudEurekaClient 在构造完成后,会根据是否配置了healthCheckHandler(默认的EurekaHealthCheckHandler实现需要配置springboot actuator),如果配置了会将健康状态同步到eureka 服务端一次。

        @Override
        public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
            if (instanceInfo == null) {
                logger.error("Cannot register a healthcheck handler when instance info is null!");
            }
            if (healthCheckHandler != null) {
                this.healthCheckHandlerRef.set(healthCheckHandler);
                // schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
                if (instanceInfoReplicator != null) {
                    instanceInfoReplicator.onDemandUpdate();
                }
            }
        }
    

    至此,CloudEurekaClient的整个构建过程完成的所有内容也分析完成。

    再回到EurekaServiceRegistry的register方法

      public void register(EurekaRegistration reg) {
            maybeInitializeClient(reg);
    
            if (log.isInfoEnabled()) {
                log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                        + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
            }
    
            reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    
            reg.getHealthCheckHandler()
                    .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
        }
    

    在完成整个过程的定义之后,先调用setInstanceStatus设置Instance的状态,启动成功这里就是UP。
    接着,如果配置了健康检查,还会触发一个发送健康检查同步的事件。

    public synchronized void setInstanceStatus(InstanceStatus status) {
            InstanceStatus next = instanceStatusMapper.map(status);
            if (next == null) {
                return;
            }
    
            InstanceStatus prev = instanceInfo.setStatus(next);
            if (prev != null) {
                for (StatusChangeListener listener : listeners.values()) {
                    try {
                        listener.notify(new StatusChangeEvent(prev, next));
                    } catch (Exception e) {
                        logger.warn("failed to notify listener: {}", listener.getId(), e);
                    }
                }
            }
        }
    

    还会发出一个StatusChangeEvent,通知listener来向eureka服务端做同步。

    register至此分析完成。

    再回到EurekaAutoServiceRegistration(SmartLifecycle)的start方法。

    @Override
        public void start() {
            // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
            if (this.port.get() != 0) {
                if (this.registration.getNonSecurePort() == 0) {
                    this.registration.setNonSecurePort(this.port.get());
                }
    
                if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                    this.registration.setSecurePort(this.port.get());
                }
            }
    
            // only initialize if nonSecurePort is greater than 0 and it isn't already running
            // because of containerPortInitializer below
            if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
    
                this.serviceRegistry.register(this.registration);
    
                this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
                this.running.set(true);
            }
        }
    

    register完成后,context发出了一个InstanceRegisteredEvent事件,这个事件不是eureka的事件,而是一个ApplicationEvent。目的是提供一个扩展点,来通知eureka实例注册完成。用户可以自定义listener来处理。

    至此,整个eureka客户端的启动流程,注册原理分析完成。

    3. 下线分析

    在SmartLifecycle的定义中,当实例停止前,会调用stop方法,eureka也因此触发下线流程。

        @Override
        public void stop() {
            this.serviceRegistry.deregister(this.registration);
            this.running.set(false);
        }
    
    @Override
        public void deregister(EurekaRegistration reg) {
            if (reg.getApplicationInfoManager().getInfo() != null) {
    
                if (log.isInfoEnabled()) {
                    log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                            + " with eureka with status DOWN");
                }
    
                reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
    
                // shutdown of eureka client should happen with EurekaRegistration.close()
                // auto registration will create a bean which will be properly disposed
                // manual registrations will need to call close()
            }
        }
    

    可以看到下线的处理流程非常简单,就是设置了ApplicationInfoManager中的instanceStatus为DOWN的状态,设置down的同时,会触发StatusChangeEvent事件的发送,由statusChangeListener来向eureka服务端发送新的下线通知。

    总结

    1. eureka的客户端通过SmartLifecycle作为入口点,开启整个eureka的bean注入;
    2. 所有的核心代码都在CloudEurekaClient的父类构造方法中完成。
      a. 定义refreshCacheTask来定时拉取注册表
      b. 定义heartbeatTask来定时发送续约请求
      c. 定义instanceInfoReplicator来向eureka服务端发送实例信息
      d. 定义statusChangeListener 来接受实例变化事件,发送register请求。
    3. 每当实例信息发生变更时,就发送StatusChangeEvent,判定变更的条件包括InstanceInfo实例属性变化、InstanceConfig配置发生变更。

    相关文章

      网友评论

          本文标题:Spring Cloud Eureka 源码分析 —— Clie

          本文链接:https://www.haomeiwen.com/subject/ccqaoktx.html