在说三级缓存之前,需要说一些结构。
ResponseCache
大家都知道eureka-client的相关服务,默认是会从注册中心定时拉取注册表的相关信息。
那么eureka-server是如何处理拉取注册表的请求的呢?
就是通过ResponseCache接口实现去获取对应的缓存数据,最后返回。
这个接口比较简单。我们可以关注一下invalidate,以及get方法。具体在对应的实现类中再去详细的了解,简单粗暴一些来说,invalidate就是使某些数据失效,get就是通过key获取某些数据。
public interface ResponseCache {
void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
AtomicLong getVersionDelta();
AtomicLong getVersionDeltaWithRegions();
String get(Key key);
byte[] getGZIP(Key key);
}
接下来康康ResponseCache对应的实现类ResponseCacheImpl
ResponseCacheImpl
public class ResponseCacheImpl implements ResponseCache {
用于只读的map
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
用于更新的map
private final LoadingCache<Key, Value> readWriteCacheMap;
使用只读map的开关
private final boolean shouldUseReadOnlyResponseCache;
实例注册器
private final AbstractInstanceRegistry registry;
用于封装eureka-server配置的一个成员变量
private final EurekaServerConfig serverConfig;
}
在ResponseCacheImpl中只看到一个用于读的map,一个用于写的map,那么为什么说eureka有三级缓存。
因为在InstanRegistry中还有一个注册表。
由注册表-读-写map构成的缓存,就是三级缓存。
那么这三级缓存是如何工作的?
我们先分别看看这三个三级缓存的使用场景。
注册表
对应的位置为com.netflix.eureka.registry.AbstractInstanceRegistry.registry
由于在前面讲过,所以这里就不展开讲了。注册表的工作原理
readOnlyCacheMap
这个Map是用于存储只读的数据,eureka为什么弄了一个读,一个写的map。说到底还是为了读写分离,在写的时候,也不影响读,提高读取的效率。
那么先来单独说说readOnlyCacheMap的工作原理。
首先我这里debug到的位置是注册中心给客户端返回注册表数据的地方
@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
省略若干代码。但是响应里的业务数据,都是从responseCache.get去获取的
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
}
下面看看responseCache.get(cacheKey)的实现
public class ResponseCacheImpl implements ResponseCache {
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
如果开启了使用只读缓存,那么会优先从只读缓存中获取数据,只读缓存中,若不存在,则从写缓存中获取数据
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
}
那么readOnlyCacheMap的数据是从哪里获取的?
首先我们先回到ResponseCacheImpl的构造方法,代码如下
public class ResponseCacheImpl implements ResponseCache {
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
//默认周期为30秒,定时的刷新只读缓存。可以通过配置进行调整。
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
//如果开启了使用只读缓存
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
省略部分代码....
}
}
从上面的代码看,如果开启了使用只读缓存shouldUseReadOnlyResponseCache=true,那么就会执行一个定时任务。
周期为responseCacheUpdateIntervalMs=30秒,可以通过配置修改。
那么关键还是看看定时任务具体实现的内容,内容比较简单,就是从readWriteCacheMap获取数据,对readOnlyCacheMap的数据进行覆盖。
public class ResponseCacheImpl implements ResponseCache {
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
}
那么大概就知道readOnlyCacheMap的作用了,无非就是在开启了使用只读缓存之后,则会优先从readOnlyCacheMap中获取,readOnlyCacheMap没有再从readWriteCacheMap获取数据。
而readOnlyCacheMap则是通过定时任务,每个30秒与readWriteCacheMap进行一次数据的同步。
那么我们就知道readOnlyCacheMap的数据是从哪里来的。
那么readWriteCacheMap的数据是从哪里来的呢?readWriteCacheMap是如何工作的?
readWriteCacheMap
还是先回到ResponseCacheImpl的构造方法。
public class ResponseCacheImpl implements ResponseCache {
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
//从配置中读取,使用只读缓存的开关
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
//默认周期为30秒,定时的刷新只读缓存。可以通过配置进行调整。
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
//写缓存的初始化,serverConfig.getResponseCacheAutoExpirationInSeconds()默认为180秒,如果180秒内没有被更新,则该缓存失效。
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
当缓存获取不到的时候,会从注册表中获取。
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
省略部分代码....
}
}
上面的代码很多,但是核心还是在于CacheLoader的load方法。
public class ResponseCacheImpl implements ResponseCache {
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
从代码中看,都是通过registry来获取apps。
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
从代码中看,都是通过registry来获取apps。
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
从代码中看,都是通过registry来获取apps。
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
从代码中看,都是通过registry来获取apps。
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
private String getPayLoad(Key key, Applications apps) {
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}
}
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
-----------------------------apps的生成是从注册表中获取keys----------------------------------------------------------------------------------------
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
}
上面代码有点长,但是可以看出,最后readWriteCacheMap是从registry的注册表(com.netflix.eureka.registry.AbstractInstanceRegistry.registry)中获取数据,最后放入到缓存中。
总结一下整个流程:
客户端注册以及拉取注册表的流程如下
三级缓存的同步
配置相关
eureka:
client:
register-with-eureka: false
fetch-registry: false
server:
use-read-only-response-cache: true #开启读缓存
response-cache-auto-expiration-in-seconds: 180 #写缓存的过期时间 默认180秒
response-cache-update-interval-ms: 30 #读写缓存的同步时间,默认30秒
网友评论