本章主要介绍Eureka server端源码分析。
在分析eureka源码源码之前,需要首先了解eureka的作用。
主要包括服务注册与发现,心跳续约,自动保护机制,服务剔除,集群同步等功能。
@EnableEurekaServer注解
Eureka的服务端开启是通过这个注解。
image.png
这个注解创建了EurekaServerMarkerConfiguration类
image.png
EurekaServerMarkerConfiguration类声明这个类是个配置类,并通过@Bean创建一个Marker类用来进行标记。在spring自动装配过程中,将会对EurekaServerAutoConfiguration类进行装配条件判断
image.png
EurekaServerAutoConfiguration
加载条件是判断检查标记类Marker是否存在。
image.png
如果存在,就表明加了@EnableEurekaServer注解。
EurekaServerAutoConfiguration类的作用是:
1、导入EurekaServerInitializerConfiguration类。
服务剔除、自我保护机制、初始化自我保护机制阈值
2、创建类并注入spring容器中
EurekaServerAutoConfiguration配置类需要创建的类包括PeerAwareInstanceRegistry、PeerEurekaNodes、EurekaServerContext、EurekaServerBootstrap、FilterRegistrationBean、javax.ws.rs.core.Application
其中FilterRegistrationBean和Application:查找资源文件并设置到Jersey过滤器中。
Jersey过滤器的作用是在"com.netflix.discovery","com.netflix.eureka" 两个包下找到所有加了Path或Provider的注解,并返回的资源为Application实例对象。
image.png
image.png
FilterRegistrationBean设置的过滤器对象是Application实例
image.png
所以,在eureka client与server端交互就是通过这些资源文件Application方法调用的。这些资源访问与MVC模型中Controller层的访问原理类似。
服务注册
由客户端发起Application资源ApplicationResource类的调用
1、addInstance
com.netflix.eureka.resources.ApplicationResource#addInstance方法
首先检查添加实例信息是否完整,然后调用registry.register方法进行实例注册
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
//验证一些实例数据是否完整....
// handle cases where clients may be registering with bad DataCenterInfo with missing data
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
在查看添加实例具体的调用链之前,需要了解下InstanceRegistry类的继承关系
InstanceRegistry.png
2、register
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, int, boolean)
register.png
2.1、首先通过handleRegistration方法发布EurekaInstanceRegisteredEvent事件
image.png
2.2、调用父类register方法
3、register
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
注册后进行同步,通过isReplication参数控制。如果注册是从其他复制节点进行复制,则不同步复制
image.png
4、register
com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
在这个方法中:
使用ReentrantReadWriteLock的读锁进行加锁,所以在客户端调用资源进行注册时,可能不止一个线程调用到这里
注册信息registry使用ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 结构,表示的意义是对于一个高可用微服务集群。如下图对于服务a有a1、a2、a3相同的微服务。则ConcurrentHashMap中的String表示微服务组名称a,Map<String, Lease<InstanceInfo>>>是三个微服务实例,其中Map中的String表示a1或a2或a3微服务名称,InstanceInfo表示三个服务实例信息
image.png
根据instanceId查找existingLease实例对象,跟当前registrant注册实例比较。使用时间戳较大的实例对registrant变量赋值。并设置实例的serviceUpTimestamp服务上线时间戳。
放入到当前微服务集合中gMap.put(registrant.getId(), lease);到这里则完成服务注册
总之,执行注册实例的结果就是把appName的微服务加入到ConcurrentHashMap集合中。
心跳续约
客户端更新续约通过put方法。通过客户端发起请求
com.netflix.eureka.resources.InstanceResource#renewLease
image.png
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
调用父类续约,续约成功则同步集群
image.png
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
获取注册信息中的具体实例租债器对象,执行租债器的renew()方法
com.netflix.eureka.lease.Lease#renew
image.png
续约成功,则更改lastUpdateTimestamp的值是当前系统时间戳与duration的和
服务下架
服务剔除是eureka服务端对过期的服务(长时间没有心跳的服务)进行定时清除。是服务端开启的定时任务,具体源码实现是在导入EurekaServerInitializerConfiguration类中实现。服务下架是客户端主动下架
EurekaServerInitializerConfiguration实现了ServletContextAware, SmartLifecycle, Ordered接口。
start方法执行时机
SmartLifecycle的start方法调用链(在spring的生命周期中):
AbstractApplicationContext#refresh
AbstractApplicationContext#finishRefresh
DefaultLifecycleProcessor#onRefresh
1、start
EurekaServerInitializerConfiguration#start
在一个线程内执行这个方法,原因是执行eureka上下文环境不会影响spring boot正常启动过程。
这个方法功能是eureka服务端上下文的初始化,发布了EurekaRegistryAvailableEvent和EurekaServerStartedEvent事件。
start.png
2、contextInitialized
org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized
contextInitialized.png
3、initEurekaServerContext
EurekaServerBootstrap#initEurekaServerContext
包括从邻近节点复制注册表
initEurekaServerContext.png
4、openForTraffic
org.springframework.cloud.netflix.eureka.server.InstanceRegistry#openForTraffic
openForTraffic.png
5、openForTraffic
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
openForTraffic.png
6、postInit
com.netflix.eureka.registry.AbstractInstanceRegistry#postInit
postInit.png
7、EvictionTask
com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask
创建EvictionTask类,EvictionTask继承了TimerTask定时器任务类
EvictionTask.png
8、evict
com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
首先判断是否开启了自我保护机制,遍历注册表类并判断每个租债器实例调用isExpired方法判断是否过期。对应过期的实例加入到expiredLeases集合中。isExpired方法根据当前时间戳与最后更新时间与duration和additionalLeaseMs比较得到。lastUpdateTimestamp最后更新时间是上次系统时间与duration 的和,最后更新时间注册、心跳续约、服务下架都会对这个参数进行更新。
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
对于需要剔除的expiredLeases实例集合,并不会一下把所有的都会删除。会根据registrySize和RenewalPercentThreshold计算出一个evictionLimit剔除最大数量。例如一共有registrySize=100。RenewalPercentThreshold=0.85,则evictionLimit=100-100*0.85=15个。如果expiredLeases超过15个则会任意选出15个进行剔除,对于剩下的下次剔除任务中删除。如果下次还会超过则会触发自动保护机制isLeaseExpirationEnabled,直接返回。
集群同步
调用方法replicateToPeers
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
当前实例的行为包括Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;对当前实例的这些行为操作成功后,则需要把这些状态行为的操作信息实例同步到邻近的eureka服务节点,需不需要集群同步主要看isReplication参数,默认是false需要同步,能够执行到replicateInstanceActionsToPeers方法。
同步操作
1、注册后同步register
register.png
2、续约后同步renew
renew.png
3、状态更新后同步statusUpdate
statusUpdate.png
4、下架后同步cancel
cancel.png
5、删除状态后DeleteStatusOverride
DeleteStatusOverride.png
syncUp
从邻近的节点复制节点信息
com.netflix.eureka.registry.PeerAwareInstanceRegistry#syncUp
syncUp.png
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
在注册同步重试次数RegistrySyncRetries和注册同步等待时间RegistrySyncRetryWaitMs内,对eureka集群的其他对等节点注册的eurekaclient实例同步注册到当前节点。
自我保护机制
在服务剔除时,会检查是否开启自我保护机制及是否超过阈值
evict.png
PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
这个方法首先检查enableSelfPreservation参数是否开启(默认是true 开启的),然后判断当前得到的心跳数与自我保护阈值比较。
isLeaseExpirationEnabled.png
自我保护阈值计算
numberOfRenewsPerMinThreshold =expectedNumberOfRenewsPerMin*
renewalPercentThreshold
自我保护阈值=每分钟期望续约的心跳数(所有注册上的实例*每分钟触发的心跳连接次数)*自我保护机制的触发百分比(85%)
假如一共有100个实例,服务端在默认情况下每分钟连接刷新时间(expectedClientRenewalIntervalSeconds)是30s,所以一分钟有2次。
numberOfRenewsPerMinThreshold = 100*2*0.85=170个
所以,触发保护机制 当前得到的心跳连接数小于85%时,会触发
另一种说法,在服务剔除的时候当剔除的数量大于15%时,会触发
如果客户端续约刷新间隔与服务端续约刷新间隔不一致,则会造成自我保护机制不能正常工作
阈值更改时机
1、15分钟自动更改
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask
scheduleRenewalThresholdUpdateTask.png
updateRenewalThreshold.png
2、注册
image.png
3、服务下架
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
image.png
4、服务初始化
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
image.png
为什么初始化和15分钟自动更改是数量的2倍,注册和下架是+2或-2。因为默认情况下服务端一次刷新心跳是30s,所以在60s内是2次。注册和下架是一个一个操作的
服务发现
查询服务时,涉及服务端的三层缓存架构。
1、只读缓存层readOnlyCacheMap(ConcurrentMap类型)
2、读写缓存层readWriteCacheMap(LoadingCache类型,使用google的guava框架)
3、真实数据层
com.netflix.eureka.resources.ApplicationResource#getApplication
@GET
public Response getApplication(@PathParam("version") String version,
@HeaderParam("Accept") final String acceptHeader,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
if (!registry.shouldAllowAccess(false)) {
return Response.status(Status.FORBIDDEN).build();
}
EurekaMonitors.GET_APPLICATION.increment();
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = Key.KeyType.XML;
}
Key cacheKey = new Key(
Key.EntityType.Application,
appName,
keyType,
CurrentRequestVersion.get(),
EurekaAccept.fromString(eurekaAccept)
);
String payLoad = responseCache.get(cacheKey);
if (payLoad != null) {
logger.debug("Found: {}", appName);
return Response.ok(payLoad).build();
} else {
logger.debug("Not Found: {}", appName);
return Response.status(Status.NOT_FOUND).build();
}
}
com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key :" + key, t);
}
return payload;
}
首先查看只读缓存层是否打开shouldUseReadOnlyResponseCache,与之对应的是getValue方法的useReadOnlyCache变量值。
如果打开,则先从只读缓存readOnlyCacheMap查询,如果查询不到则从读写缓存是readWriteCacheMap查询,并放入到只读缓存
缓存更新原理
1、只读缓存
只读缓存没有提供API更新,只能通过定时任务或查询时从读写缓存写到只读缓存
在创建ResponseCacheImpl类时,开启定时任务
shouldUseReadOnlyResponseCache.png
getCacheUpdateTask.png
如果从读写缓存得到的cacheValue与从只读缓存得到的currentCacheValue不相同,则把cacheValue替换到只读缓存中
2、读写缓存
2.1、构建ResponseCacheImpl类
readWriteCacheMap.png
如果缓存不存在,则通过generatePayload方法重新加载
/*
* Generate pay load for the given key.
*/
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
private String getPayLoad(Key key, Applications apps) {
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}
2.2、调用invalidateCache方法,使缓存失效
com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache
invalidateCache.png
com.netflix.eureka.registry.ResponseCacheImpl#invalidate
invalidate.png
com.netflix.eureka.registry.ResponseCacheImpl#invalidate(com.netflix.eureka.registry.Key...)
invalidate.png
调用invalidateCache方法时机
1、注册后更新缓存
com.netflix.eureka.registry.AbstractInstanceRegistry#register
2、服务下架
com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
3、statusUpdate
com.netflix.eureka.registry.AbstractInstanceRegistry#statusUpdate
4、deleteStatusOverride
com.netflix.eureka.registry.AbstractInstanceRegistry#deleteStatusOverride
总结:
主要就是服务端使用的框架调用原理和Eureka Server端涉及的功能
那些功能使用客户端调用(有注册,续约,下架)
服务下架与服务剔除的区别
eureka服务端涉及的服务同步,自我保护阈值
服务查询的三级缓存使用原理
网友评论