美文网首页springcloudkafka
Spring Cloud Eureka源码分析之三级缓存的设计原

Spring Cloud Eureka源码分析之三级缓存的设计原

作者: 跟着Mic学架构 | 来源:发表于2021-12-16 17:43 被阅读0次
    Eureka源码

    Eureka Server 为了提供响应效率,提供了两层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中,实现原理如下图所示。

    image-20211121172504080

    第一层缓存:readOnlyCacheMap,本质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认时间为 30 秒。

    readOnlyCacheMap : 是一个 CurrentHashMap 只读缓存,这个主要是为了供客户端获取注册信息时使用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。

    第二层缓存:readWriteCacheMap,本质上是 Guava 缓存。

    readWriteCacheMap:readWriteCacheMap 的数据主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。

    readWriteCacheMap 缓存过期时间,默认为 180 秒,当服务下线、过期、注册、状态变更,都会来清除此缓存中的数据。

    Eureka Client 获取全量或者增量的数据时,会先从一级缓存中获取;如果一级缓存中不存在,再从二级缓存中获取;如果二级缓存也不存在,这时候先将存储层的数据同步到缓存中,再从缓存中获取。

    通过 Eureka Server 的二层缓存机制,可以非常有效地提升 Eureka Server 的响应时间,通过数据存储层和缓存层的数据切割,根据使用场景来提供不同的数据支持。

    多级缓存的意义

    这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。

    而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。

    获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存

    • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为30秒
    • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒 。

    缓存初始化

    readWriteCacheMap使用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api。创建方式如下

    LoadingCache<Long, String> cache = CacheBuilder.newBuilder()
        //缓存池大小,在缓存项接近该大小时, Guava开始回收旧的缓存项
        .maximumSize(10000)
        //设置时间对象没有被读/写访问则对象从内存中删除(在另外的线程里面不定期维护)
        .expireAfterAccess(10, TimeUnit.MINUTES)
        //移除监听器,缓存项被移除时会触发
        .removalListener(new RemovalListener <Long, String>() {
            @Override
            public void onRemoval(RemovalNotification<Long, String> rn) {
                //执行逻辑操作
            }
        })
        .recordStats()//开启Guava Cache的统计功能
        .build(new CacheLoader<String, Object>() {
            @Override
            public Object load(String key) {
                //从 SQL或者NoSql 获取对象
            }
        });//CacheLoader类 实现自动加载
    

    其中,CacheLoader是用来实现缓存自动加载的功能,当触发readWriteCacheMap.get(key)方法时,就会回调CacheLoader.load方法,根据key去服务注册信息中去查找实例数据进行缓存

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
    
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
            .removalListener(new RemovalListener<Key, Value>() {
                @Override
                public void onRemoval(RemovalNotification<Key, Value> notification) {
                    Key removedKey = notification.getKey();
                    if (removedKey.hasRegions()) {
                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                    }
                }
            })
            .build(new CacheLoader<Key, Value>() {
                @Override
                public Value load(Key key) throws Exception {
                    if (key.hasRegions()) {
                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                        regionSpecificKeys.put(cloneWithNoRegions, key);
                    }
                    Value value = generatePayload(key);  //注意这里
                    return value;
                }
            });
    

    而缓存的加载,是基于generatePayload方法完成的,代码如下。

    private Value generatePayload(Key key) {
        Stopwatch tracer = null;
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();
    
                    if (ALL_APPS.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeAllAppsWithRemoteRegionTimer.start();
                            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeAllAppsTimer.start();
                            payload = getPayLoad(key, registry.getApplications());
                        }
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } else {
                        tracer = serializeOneApptimer.start();
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                case VIP:
                case SVIP:
                    tracer = serializeViptimer.start();
                    payload = getPayLoad(key, getApplicationsForVip(key, registry));
                    break;
                default:
                    logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                    payload = "";
                    break;
            }
            return new Value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }
    

    此方法接受一个 Key 类型的参数,返回一个 Value 类型。 其中 Key 中重要的字段有:

    • KeyType ,表示payload文本格式,有 JSONXML 两种值。
    • EntityType ,表示缓存的类型,有 Application , VIP , SVIP 三种值。
    • entityName ,表示缓存的名称,可能是单个应用名,也可能是 ALL_APPSALL_APPS_DELTA

    Value 则有一个 String 类型的payload和一个 byte 数组,表示gzip压缩后的字节。

    缓存同步

    ResponseCacheImpl这个类的构造实现中,初始化了一个定时任务,这个定时任务每个

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        //省略...
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                           new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                    + responseCacheUpdateIntervalMs),
                           responseCacheUpdateIntervalMs);
        }
    }
    

    默认每30s从readWriteCacheMap更新有差异的数据同步到readOnlyCacheMap中

    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) { //遍历只读集合
                    if (logger.isDebugEnabled()) {
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                     key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) { //判断差异信息,如果有差异,则更新
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    } finally {
                        CurrentRequestVersion.remove();
                    }
                }
            }
        };
    }
    

    缓存失效

    在AbstractInstanceRegistry.register这个方法中,当完成服务信息保存后,会调用invalidateCache失效缓存

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        //....
         invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        //....
    }
    

    最终调用ResponseCacheImpl.invalidate方法,完成缓存的失效机制

    public void invalidate(Key... keys) {
        for (Key key : keys) {
            logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                         key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    
            readWriteCacheMap.invalidate(key);
            Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
            if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                for (Key keysWithRegion : keysWithRegions) {
                    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                    readWriteCacheMap.invalidate(keysWithRegion);
                }
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Spring Cloud Eureka源码分析之三级缓存的设计原

        本文链接:https://www.haomeiwen.com/subject/axvvfrtx.html