之前我们分析了eureka client的配置,那么现在我们再来分析一下EurekaClient。DiscoveryClient,实现 EurekaClient 接口,用于与 Eureka-Server 交互。实现如下方法:
向 Eureka-Server 注册自身服务
向 Eureka-Server 续约自身服务
向 Eureka-Server 取消自身服务,当关闭时
从 Eureka-Server 查询应用集合和应用实例信息
我们来看一下DiscoveryClient的构造方法
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// 【3.2.1】赋值 AbstractDiscoveryClientOptionalArgs
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
// 【3.2.2】赋值 ApplicationInfoManager、EurekaClientConfig
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); // 无实际业务用途,用于打 logger
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
// 【3.2.3】赋值 BackupRegistry
this.backupRegistryProvider = backupRegistryProvider;
// 【3.2.4】初始化 InstanceInfoBasedUrlRandomizer TODO 芋艿:什么用途
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
// 【3.2.5】初始化应用集合在本地的缓存
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
// 【3.2.6】获取哪些 Region 集合的注册信息
remoteRegionsToFetch = new AtomicReference<>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// 【3.2.7】初始化拉取、心跳的监控
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 【3.2.8】结束初始化,当无需和 Eureka-Server 交互
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// 【3.2.9】初始化线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 【3.2.10】初始化 Eureka 网络通信相关
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
// 【3.2.11】初始化 InstanceRegionChecker
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// 【3.2.13】执行向 Eureka-Server 注册之前的处理器
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 【3.2.14】初始化定时任务
initScheduledTasks();
// 【3.2.15】向 Servo 注册监控
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// 【3.2.16】初始化完成
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
在DiscoveryClient的构造方法里完成了整个初始化的过程。BackupRegistry,备份注册中心接口。当 Eureka-Client 启动时,无法从 Eureka-Server 读取注册信息,从备份注册中心读取注册信息。
public class NotImplementedRegistryImpl implements BackupRegistry {
@Override
public Applications fetchRegistry() {
return null;
}
@Override
public Applications fetchRegistry(String[] includeRemoteRegions) {
return null;
}
}
从 NotImplementedRegistryImpl可以看出,目前 Eureka-Client 未提供合适的默认实现。
TransportClientFactories,生成 Jersey 客户端工厂的工厂接口。目前有 Jersey1TransportClientFactories 、Jersey2TransportClientFactories 两个实现。TransportClientFactories有一个newTransportClientFactory方法
TransportClientFactory newTransportClientFactory(final EurekaClientConfig clientConfig,
final Collection<F> additionalFilters,
final InstanceInfo myInstanceInfo);
通过该方法生产TransportClientFactory。
EurekaEventListener,Eureka 事件监听器。代码为
public interface EurekaEventListener {
/**
* Notification of an event within the {@link EurekaClient}.
*
* {@link EurekaEventListener#onEvent} is called from the context of an internal eureka thread
* and must therefore return as quickly as possible without blocking.
*
* @param event
*/
void onEvent(EurekaEvent event);
}
我们来看一下DiscoveryClient初始化定时任务的方法
private void initScheduledTasks() {
// 从 Eureka-Server 拉取注册信息执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); // 系数
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 创建 应用实例信息复制器
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 创建 应用实例状态变更监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注册 应用实例状态变更监听器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启 应用实例信息复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
向 Servo 注册监控
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
初始化完成
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
DiscoveryClient的分析就到这里了。
网友评论