Eureka源码分析(2.1.4.Release)
首先源码切忌一行一行debug,需先了解eureka主要功能后,再分析其功能如何实现。
由于上一篇文章提及到客户端如何从注册中心中获取服务列表eureka(一)-功能介绍与客户端之服务获取
客户端之服务续约
服务续约任务与服务列表获取一样,都是通过TimedSupervisorTask实现动态时间间隔执行任务。TimedSupervisorTask是如何实现动态间隔时间执行任务的,可看上文提到的文章。
继续从上文提到的initScheduledTasks出发:
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
……上部分代码为客户端获取服务列表任务……
if (clientConfig.shouldRegisterWithEureka()) {
//eureka.instance.leaseRenewallIntervalInSeconds=30,默认每30秒执行心跳续约
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//eureka.client.heartbeatExecutorExponentialBackOffBound=10,如果发生超时的重试延迟的最大乘数。
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//定义个名为heartbeat的线程,执行心跳续约任务
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
……下部分代码为实例更新并通知eureka注册中心任务……
由于TimedSupervisorTask在上篇文章已有介绍,现在则直接查看续约线程逻辑
com.netflix.discovery.DiscoveryClient.HeartbeatThread:
image.pngcom.netflix.discovery.DiscoveryClient.HeartbeatThread#run:
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
com.netflix.discovery.DiscoveryClient#renew:
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//发起heartbeat请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//如果返回的是无法找到,则重新注册
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
//设置该实例的dirty标志位true,并更新dirty时间
long timestamp = instanceInfo.setIsDirtyWithTime();
//重新注册
boolean success = register();
//如果成功,则将实例ditry的标志改为false
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
实例的dirty标志isInstanceInfoDirty的作用:在执行服务注册任务时,如果dirty标志为true,则将实例重新注册到远程eureka注册中心。
客户端之服务注册
还是从上文提到的initScheduledTasks出发:
com.netflix.discovery.DiscoveryClient#initScheduledTasks:
if (clientConfig.shouldRegisterWithEureka()) {
……忽略心跳续约代码……
// InstanceInfo replicator
//实例更新后,需要将更新信息发送到远程eureka中心的线程,其实现了Runnable接口
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),//instanceInfoReplicationIntervalSeconds=30,默认每30秒检查是否有更新,并注册到注册中心
2); //当前时间允许的最大请求数
//状态监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//状态更新后,立即更新到eureka注册中心(这里涉及到令牌限流算法)
instanceInfoReplicator.onDemandUpdate();
}
};
//通过ApplicationInfoManager进行的本地状态更新将立即触发对远程eureka服务器的注册/更新(速率受限)。默认为true
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//eureka.client.initialInstanceInfoReplicationIntervalSeconds=40s,默认延时40s,实例更新或注册线程启动
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
从上面的代码可以知道,instanceInfoReplicator的run方法实现了具体的逻辑代码
com.netflix.discovery.InstanceInfoReplicator#start:
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
//第一次启动:设置dirty标志为true,触发第一次注册。
instanceInfo.setIsDirty();
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
com.netflix.discovery.InstanceInfoReplicator#run:
public void run() {
try {
//检查实例是否需要更新,如果需要更新,设置dirty标志为true,更新dirty时间。
discoveryClient.refreshInstanceInfo();
//如果dirty标志为true,则返回dirty时间,如果标志为false则返回null
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//dirty时间不为null,则需要更新到注册中心,重新注册
discoveryClient.register();
//设置dirty标志为false
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
//replicationIntervalSeconds秒后(即30秒),又重新执行该方法。
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
com.netflix.discovery.DiscoveryClient#refreshInstanceInfo:
/**
* Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
* isDirty flag on the instanceInfo is set to true
*/
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
refreshInstanceInfo方法具体实现流程图如下:
image.png
服务注册任务整体流程如下:
image.png
附加算法介绍:令牌桶限流算法(简要)
instanceInfoReplicator.onDemandUpdate():实例状态变更后,马上更新到远程eureka远程中心。
该方法用到了RateLimiter来实现限流。RateLimiter实现了令牌桶限流算法。为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。
image.png
网友评论