一. 前言
eureka的client端主要完成几件事情:
- 服务实例的注册
- 服务实例的续约
- 拉取server端的注册表
整个源码有几个重点类值得关注:
类名 | 说明 |
---|---|
EurekaClientConfigBean | 配置参数的bean,保存eureka.client.xxx的所有参数 |
EurekaInstanceConfigBean | 配置参数的bean,保存eureka.instance.xxx的所有参数 |
InstanceInfo | 服务实例信息的封装类 |
ApplicationInfoManager | 封装注册到eureka的应用信息,包括实例信息,参数配置,实例状态等,并管理实例的各种操作 |
CloudEurekaClient | 发起操作的客户端类,如register、fetch、heartbeat |
二. 源码分析
1. 启动配置bean注入
EurekaClientAutoConfiguration
EurekaClientConfigBean
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
EurekaClientConfigBean 对应的配置前缀是eureka.client,从配置文件读取的配置会保存到此bean中
常用配置:
参数 | 说明 |
---|---|
defaultZone | 默认zone的地址 |
transport | 各种通信配置 |
registryFetchIntervalSeconds | 每次从eureka服务端拉取服务列表的时间间隔(默认30s) |
instanceInfoReplicationIntervalSeconds | 复制实例变化信息到eureka服务端的时间间隔(默认30s) |
initialInstanceInfoReplicationIntervalSeconds | 初始时复制实例信息到eureka服务端所需的时间(默认40s) |
eurekaServiceUrlPollIntervalSeconds | 拉取eureka服务端地址更改的时间间隔(默认300s) |
registerWithEureka | 本实例是否注册到eureka服务端 |
EurekaInstanceConfigBean
@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
ManagementMetadataProvider managementMetadataProvider) {
String hostname = getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = getProperty("eureka.instance.ip-address");
boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
String serverContextPath = env.getProperty("server.servlet.context-path", "/");
int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));
Integer managementPort = env.getProperty("management.server.port", Integer.class);
String managementContextPath = env.getProperty("management.server.servlet.context-path");
Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(serverPort);
instance.setInstanceId(getDefaultInstanceId(env));
instance.setPreferIpAddress(preferIpAddress);
instance.setSecurePortEnabled(isSecurePortEnabled);
if (StringUtils.hasText(ipAddress)) {
instance.setIpAddress(ipAddress);
}
if (isSecurePortEnabled) {
instance.setSecurePort(serverPort);
}
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
managementContextPath, managementPort);
if (metadata != null) {
instance.setStatusPageUrl(metadata.getStatusPageUrl());
instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
if (instance.isSecurePortEnabled()) {
instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
}
Map<String, String> metadataMap = instance.getMetadataMap();
metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
}
else {
// without the metadata the status and health check URLs will not be set
// and the status page and health check url paths will not include the
// context path so set them here
if (StringUtils.hasText(managementContextPath)) {
instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
}
}
setupJmxPort(instance, jmxPort);
return instance;
}
EurekaInstanceConfigBean 对应的配置前缀是eureka.instance,从配置文件读取的实例信息配置会保存到此bean中
常用配置:
参数 | 说明 |
---|---|
instanceId | 注册到eureka服务端实例id,默认是ip:服务名:端口 |
hostname | 注册的主机名, 默认使用主机名注册 |
preferIpAddress | 是否开启ip注册 |
ipAddress | 注册的ip地址 |
leaseRenewalIntervalInSeconds | 向eureka服务端续约的时间间隔(默认30s) |
leaseExpirationDurationInSeconds | 向eureka服务端设置续约到期的时间间隔(默认90s),超过此间隔服务端会下掉该服务 |
initialStatus | 注册到eureka服务端的初始化状态,默认UP |
EurekaServiceRegistry
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
EurekaServiceRegistry 实现了ServiceRegistry,是实例注册的具体实现类,内部通过register完成服务注册事件的发送动作。
EurekaRegistration
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
@Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
.with(healthCheckHandler).build();
}
EurekaRegistration 是真正被注册的对象,内部封装了客户端所有的注册信息,是org.springframework.cloud.client.serviceregistry.ServiceInstance此标准的具体实现类
ApplicationInfoManager
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka
// server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {
namespace = namespace + ".";
}
builder.setNamespace(namespace).setAppName(config.getAppname()).setInstanceId(config.getInstanceId())
.setAppGroupName(config.getAppGroupName()).setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
.setPort(config.getNonSecurePort())
.enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName()).setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(),
config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + initialStatus);
}
builder.setStatus(initialStatus);
}
else {
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP
+ ". This may be too early for the instance to advertise itself as available. "
+ "You would instead want to control this via a healthcheck handler.");
}
}
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
首先创建了初始化状态的InstanceInfo,根据配置对InstanceInfo的大量属性赋值。
构建ApplicationInfoManager内部封装了注册到eureka的应用信息,包括实例信息,参数配置,实例状态等。
InstanceInfo对象也会被设置到到ApplicationInfoManager中。
CloudEurekaClient
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a problem
// when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
这个bean非常重要,在内部完成了几乎所有客户端操作的定义,是发起操作的客户端类。重点在构造方法中实现,下文会具体分析。
2. SmartLifecycle的启动点
eureka客户端的注册动作,通过SmartLifecycle来触发,对应的实现类是EurekaAutoServiceRegistration
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);
private AtomicBoolean running = new AtomicBoolean(false);
private int order = 0;
private AtomicInteger port = new AtomicInteger(0);
private ApplicationContext context;
private EurekaServiceRegistry serviceRegistry;
private EurekaRegistration registration;
public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
EurekaRegistration registration) {
this.context = context;
this.serviceRegistry = serviceRegistry;
this.registration = registration;
}
@Override
public void start() {
...
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
// getNonSecurePort() 就是项目的启动端口
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
直接看start方法,根据初始化参数,直接进入if方法。有两个关键方法,分别是register()开始注册, publishEvent 发布当前实例注册的事件。
EurekaServiceRegistry
public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {
private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler()
.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
// 这个方法的实际作用是让ApplicationInfoManager和CloudEurekaClient被使用,来触发bean注入,这两个bean都是lazy的
private void maybeInitializeClient(EurekaRegistration reg) {
// force initialization of possibly scoped proxies
reg.getApplicationInfoManager().getInfo();
reg.getEurekaClient().getApplications();
}
maybeInitializeClient 触发了后续两个重要的bean:ApplicationInfoManager
和EurekaClient
的实例化过程。
ApplicationInfoManager
存放了InstanceInfo的信息和InstanceConfig的所有配置信息。
EurekaClient
(CloudEurekaClient) 在构造过程中,完成了整个客户端的注册、向服务端进行数据同步,schedule任务的定义和开启。
依次介绍这两个类的创建过程。
new ApplicationInfoManager()
bean的定义在上面的AutoConfiguration中
@Inject
public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
this.config = config;
this.instanceInfo = instanceInfo;
this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
if (optionalArgs != null) {
this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
} else {
this.instanceStatusMapper = NO_OP_MAPPER;
}
// Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
instance = this;
}
这里首先通过工厂创建了一个InstanceInfo的对象,create方法将EurekaInstanceConfig里的所有配置通过builder模式赋值到InstanceInfo中。
并在内部初始化了两个属性,listeners和instanceStatusMapper。
new CloudEurekaClient()
bean的定义在上面的AutoConfiguration中
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
先调用父类的super构造方法
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
在构造方法的初始逻辑前,先初始化了BackupRegistry的对象,这个是当eureka所有的url都无法连接时,会按照backupRegistryImpl指定的自定义类来实现,默认使用NotImplementedRegistryImpl,内部什么都不处理。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 是否用自定义参数来覆盖
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
// ------------- 将构造方法的参数 给全局参数赋值 -----------------
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// ------------- 将构造方法的参数 给全局参数赋值 -----------------
// 如果开启了拉取注册表,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 如果开启了注册到eurkea,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
//如果既不需要拉取,又不需要注,则直接结束
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
return; // no need to setup up an network tasks and we are done
}
// 否则就完成整个client端初始化的动作。
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 线程池创建两个线程,分别给心跳和刷新cache使用
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 用于执行心跳任务的线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 用于执行刷新缓存任务的线程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 完成eurekaTransport的创建和内部属性的初始化
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
// 初始化默认的az和region的对应关系
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
// 构建instanceRegionChecker 用于查询InstanceInfo对应的region,判断是否localRegion等
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry()) {
try {
// 重要方法,具体说明看fetchRegistry的分析
boolean primaryFetchRegistryResult = fetchRegistry(false);
// 如果拉取失败,则会输出如下异常
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
// 拉取失败则调用backup的实现类
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
// 提供了一个扩展点,需要在发起注册之前,进行一些操作。
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 在初始化阶段强制注册,这里shouldEnforceRegistrationAtInit()默认值是false,如果注册失败会抛出异常
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
// 关键方法,初始化各种schedule任务,包括心跳、拉取注册表等,具体说明看initScheduledTasks的分析
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// Application 内部封装了从eureka服务端返回的所有注册表信息,第一次启动时这里时空的,在拉取全量或增量时更新
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量拉取注册表信息
getAndStoreFullRegistry();
} else {
// 增量拉取注册表信息
getAndUpdateDelta(applications);
}
// 这里的hashCode用来判断拉取跟上次相比是否发生过变更
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
这个fetchRegistry方法会在启动时调用,以及schedule任务执行时,每次刷新客户端缓存时调用。主要做了几件事情。
- 如果是启动时,会全量拉取eureka的注册信息,调用/eureka/apps
private void getAndStoreFullRegistry() throws Throwable {
...
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
...
这里的apps就是从eureka服务端拉取的全量注册表信息,如下图
- 如果是schedule任务会增量拉取,调用getAndUpdateDelta(),内部通过http请求/eureka/apps/delta,这里的delta就是从eureka服务端拉取的增量注册表信息。
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 获取到增量列表后,依次遍历每一个application,更新到本地cache中
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
}
...
- 调用onCacheRefreshed(),刷新客户端缓存。子类CloudEurekaClient重写了onCacheRefresh,由eventListeners来处理CacheRefreshedEvent,并发出HeartbeatEvent事件。
@Override
protected void onCacheRefreshed() {
super.onCacheRefreshed();
if (this.cacheRefreshedCount != null) { // might be called during construction and
// will be null
long newCount = this.cacheRefreshedCount.incrementAndGet();
log.trace("onCacheRefreshed called with count: " + newCount);
this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
}
}
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener : eventListeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
}
}
}
- 调用updateInstanceRemoteStatus将当前实例的状态更新到eureka服务端。
private synchronized void updateInstanceRemoteStatus() {
// Determine this instance's status for this app and set to UNKNOWN if not found
InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
if (instanceInfo.getAppName() != null) {
Application app = getApplication(instanceInfo.getAppName());
if (app != null) {
InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
if (remoteInstanceInfo != null) {
// 本次拉取的服务状态
currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
}
}
}
if (currentRemoteInstanceStatus == null) {
currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
}
// Notify if status changed
// 本次拉取的服务状态 和 上次拉取的服务状态不一致
if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
lastRemoteInstanceStatus = currentRemoteInstanceStatus;
}
}
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
fireEvent(new StatusChangeEvent(oldStatus, newStatus));
}
先将服务远程的当前状态跟上一次拉取的状态进行比较,如果一致,则结束,不一致则发出一个StatusChangeEvent事件。
如果成功从eureka 服务端拉取注册表成功,则fetchRegistry返回true。
initScheduledTasks()
// 初始化所有的schedule任务
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 定义cacheRefreshTask,schedule每隔registryFetchIntervalSeconds来刷新客户端缓存
cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
// 定义heartbeatTask,schedule每隔renewalIntervalInSecs来发起心跳(续约)
heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 实例信息的复制器,内部包含schedule,同步到eureka服务端
instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
// 定义了状态变更的listener,并注册到applicationInfoManager中
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启schedule任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
- 首先给cache和heartbeat创建了两个TimedSupervisorTask,定义如下,scheduler会每隔一定时间来执行task。
- 创建了instanceInfoReplicator,是实例信息变化的复制器,当发现出现变化时,内部通过register方法将新的信息同步到eureka服务端。
- 创建了statusChangeListener,当触发状态变更回调时,调用instanceInfoReplicator.onDemandUpdate(); 并将其添加到applicationInfoManager中。
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
...
}
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
这里还需要 重点说明下run的实现:
scheduler.schedule在调度任务时,只传了delay的时间,并没有传周期时间,而执行周期其实是会动态变化的,这里的动态变化是通过scheduler.schedule的递归来完成。
可以看到在try块中,delay的值会被重置,并且如果出现超时,则将newDelay可能设置成2倍的时间,再在finally中再次执行schedule,延迟的时间就是2倍的delay,这是一种动态周期的schedule实现方案。
cacheRefreshTask
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
...
//remoteRegionsModified
// 前文判断根据remoteRegions是否发生变化来决定是全量拉取还是增量拉取
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
// 这里的localRegionApps是拉取注册表信息内容的本地缓存
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
...
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
这个task其实就是调用了fetchRegistry,拉取最新的注册表,刷新本地cache。
heartbeatTask
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
... //这里判断如果是404的请求,则再发起一次注册请求,同步实例信息
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
这个task就是发起了一个http请求,调用了sendHearBeat方法,发起了一次心跳续约。
InstanceInfoReplicator
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
InstanceInfoReplicator 是实例信息的同步器,作用是当发生变更时(状态、地址、ip、配置参数值等),将新的InstanceInfo同步给eureka服务端。
这里的三个方法:
start()
:在初始化时,调用start开启schedule任务,每隔一段时间来调用run()检测变更。
onDemandUpdate()
:在触发状态变更时,作为listener来调用,也会调用run()检测变更。
run()
:检测当前InstanceInfo是否跟上次相同,如果出现变更,则调用discoveryClient.register();
将新的信息同步到eureka服务端。依据则是instanceInfo是否被标记为isDirty
而标记的依据在discoveryClient.refreshInstanceInfo();
中实现。
/**
* Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
* isDirty flag on the instanceInfo is set to true
*/
void refreshInstanceInfo() {
// 判断地址、ip、实例所在的部署节点是否变更
applicationInfoManager.refreshDataCenterInfoIfRequired();
// 判断各种续约配置值(LeaseInfo) 是否变更
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
先根据两个refresh方法来判断是否变更,如果变更,内部会调用instanceInfo.setIsDirty();
来进行标记。
public synchronized void setIsDirty() {
isInstanceInfoDirty = true;
lastDirtyTimestamp = System.currentTimeMillis();
}
至此,initScheduledTasks分析完成。内部分别启动了三个schedule,分别用于将新的注册表刷新本地cache、发送心跳续约、发送客户端最新的实例信息。
再回到DiscoveryClient的构造方法,在initScheduledTasks执行完成后,就是注册JMX bean,设置一些全局属性。
CloudEurekaClient 在构造完成后,会根据是否配置了healthCheckHandler(默认的EurekaHealthCheckHandler实现需要配置springboot actuator),如果配置了会将健康状态同步到eureka 服务端一次。
@Override
public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
if (instanceInfo == null) {
logger.error("Cannot register a healthcheck handler when instance info is null!");
}
if (healthCheckHandler != null) {
this.healthCheckHandlerRef.set(healthCheckHandler);
// schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
if (instanceInfoReplicator != null) {
instanceInfoReplicator.onDemandUpdate();
}
}
}
至此,CloudEurekaClient的整个构建过程完成的所有内容也分析完成。
再回到EurekaServiceRegistry的register方法
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler()
.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
在完成整个过程的定义之后,先调用setInstanceStatus
设置Instance的状态,启动成功这里就是UP。
接着,如果配置了健康检查,还会触发一个发送健康检查同步的事件。
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
还会发出一个StatusChangeEvent,通知listener来向eureka服务端做同步。
register至此分析完成。
再回到EurekaAutoServiceRegistration(SmartLifecycle)的start方法。
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
register完成后,context发出了一个InstanceRegisteredEvent事件,这个事件不是eureka的事件,而是一个ApplicationEvent。目的是提供一个扩展点,来通知eureka实例注册完成。用户可以自定义listener来处理。
至此,整个eureka客户端的启动流程,注册原理分析完成。
3. 下线分析
在SmartLifecycle的定义中,当实例停止前,会调用stop方法,eureka也因此触发下线流程。
@Override
public void stop() {
this.serviceRegistry.deregister(this.registration);
this.running.set(false);
}
@Override
public void deregister(EurekaRegistration reg) {
if (reg.getApplicationInfoManager().getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status DOWN");
}
reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
// shutdown of eureka client should happen with EurekaRegistration.close()
// auto registration will create a bean which will be properly disposed
// manual registrations will need to call close()
}
}
可以看到下线的处理流程非常简单,就是设置了ApplicationInfoManager
中的instanceStatus
为DOWN的状态,设置down的同时,会触发StatusChangeEvent
事件的发送,由statusChangeListener
来向eureka服务端发送新的下线通知。
总结
- eureka的客户端通过
SmartLifecycle
作为入口点,开启整个eureka的bean注入; - 所有的核心代码都在
CloudEurekaClient
的父类构造方法中完成。
a. 定义refreshCacheTask来定时拉取注册表
b. 定义heartbeatTask来定时发送续约请求
c. 定义instanceInfoReplicator来向eureka服务端发送实例信息
d. 定义statusChangeListener 来接受实例变化事件,发送register请求。 - 每当实例信息发生变更时,就发送StatusChangeEvent,判定变更的条件包括InstanceInfo实例属性变化、InstanceConfig配置发生变更。
网友评论