美文网首页
Spring Cloud之Eureka源码分析

Spring Cloud之Eureka源码分析

作者: loveFXX | 来源:发表于2020-03-05 21:21 被阅读0次

    本章主要介绍Eureka server端源码分析。
    在分析eureka源码源码之前,需要首先了解eureka的作用。
    主要包括服务注册与发现,心跳续约,自动保护机制,服务剔除,集群同步等功能。

    @EnableEurekaServer注解

    Eureka的服务端开启是通过这个注解。


    image.png

    这个注解创建了EurekaServerMarkerConfiguration类


    image.png
    EurekaServerMarkerConfiguration类声明这个类是个配置类,并通过@Bean创建一个Marker类用来进行标记。在spring自动装配过程中,将会对EurekaServerAutoConfiguration类进行装配条件判断
    image.png

    EurekaServerAutoConfiguration

    加载条件是判断检查标记类Marker是否存在。


    image.png

    如果存在,就表明加了@EnableEurekaServer注解。

    EurekaServerAutoConfiguration类的作用是:

    1、导入EurekaServerInitializerConfiguration类。
    服务剔除、自我保护机制、初始化自我保护机制阈值
    2、创建类并注入spring容器中
    EurekaServerAutoConfiguration配置类需要创建的类包括PeerAwareInstanceRegistry、PeerEurekaNodes、EurekaServerContext、EurekaServerBootstrap、FilterRegistrationBean、javax.ws.rs.core.Application
    其中FilterRegistrationBean和Application:查找资源文件并设置到Jersey过滤器中。
    Jersey过滤器的作用是在"com.netflix.discovery","com.netflix.eureka" 两个包下找到所有加了Path或Provider的注解,并返回的资源为Application实例对象。


    image.png
    image.png

    FilterRegistrationBean设置的过滤器对象是Application实例


    image.png
    所以,在eureka client与server端交互就是通过这些资源文件Application方法调用的。这些资源访问与MVC模型中Controller层的访问原理类似。

    服务注册

    由客户端发起Application资源ApplicationResource类的调用

    1、addInstance

    com.netflix.eureka.resources.ApplicationResource#addInstance方法
    首先检查添加实例信息是否完整,然后调用registry.register方法进行实例注册

       @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
           //验证一些实例数据是否完整....
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
    
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
    

    在查看添加实例具体的调用链之前,需要了解下InstanceRegistry类的继承关系


    InstanceRegistry.png
    2、register

    org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, int, boolean)


    register.png

    2.1、首先通过handleRegistration方法发布EurekaInstanceRegisteredEvent事件


    image.png
    2.2、调用父类register方法
    3、register

    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#register
    注册后进行同步,通过isReplication参数控制。如果注册是从其他复制节点进行复制,则不同步复制


    image.png
    4、register

    com.netflix.eureka.registry.AbstractInstanceRegistry#register

     public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
                REGISTER.increment(isReplication);
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
                // Retain the last dirty timestamp without overwriting it, if there is already a lease
                if (existingLease != null && (existingLease.getHolder() != null)) {
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();
                    }
                } else {
                    // The lease does not exist and hence it is a new registration
                    synchronized (lock) {
                        if (this.expectedNumberOfRenewsPerMin > 0) {
                            // Since the client wants to cancel it, reduce the threshold
                            // (1
                            // for 30 seconds, 2 for a minute)
                            this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                            this.numberOfRenewsPerMinThreshold =
                                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                gMap.put(registrant.getId(), lease);
                synchronized (recentRegisteredQueue) {
                    recentRegisteredQueue.add(new Pair<Long, String>(
                            System.currentTimeMillis(),
                            registrant.getAppName() + "(" + registrant.getId() + ")"));
                }
                // This is where the initial state transfer of overridden status happens
                if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                    if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                        overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    registrant.setOverriddenStatus(overriddenStatusFromMap);
                }
    
                // Set the status based on the overridden status rules
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
                registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // If the lease is registered with UP status, set lease service up timestamp
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                    lease.serviceUp();
                }
                registrant.setActionType(ActionType.ADDED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                registrant.setLastUpdatedTimestamp();
                invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }
    

    在这个方法中:
    使用ReentrantReadWriteLock的读锁进行加锁,所以在客户端调用资源进行注册时,可能不止一个线程调用到这里
    注册信息registry使用ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 结构,表示的意义是对于一个高可用微服务集群。如下图对于服务a有a1、a2、a3相同的微服务。则ConcurrentHashMap中的String表示微服务组名称a,Map<String, Lease<InstanceInfo>>>是三个微服务实例,其中Map中的String表示a1或a2或a3微服务名称,InstanceInfo表示三个服务实例信息


    image.png

    根据instanceId查找existingLease实例对象,跟当前registrant注册实例比较。使用时间戳较大的实例对registrant变量赋值。并设置实例的serviceUpTimestamp服务上线时间戳。
    放入到当前微服务集合中gMap.put(registrant.getId(), lease);到这里则完成服务注册
    总之,执行注册实例的结果就是把appName的微服务加入到ConcurrentHashMap集合中。

    心跳续约

    客户端更新续约通过put方法。通过客户端发起请求
    com.netflix.eureka.resources.InstanceResource#renewLease


    image.png

    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#renew
    调用父类续约,续约成功则同步集群


    image.png
    com.netflix.eureka.registry.AbstractInstanceRegistry#renew
    public boolean renew(String appName, String id, boolean isReplication) {
            RENEW.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToRenew = null;
            if (gMap != null) {
                leaseToRenew = gMap.get(id);
            }
            if (leaseToRenew == null) {
                RENEW_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
                return false;
            } else {
                InstanceInfo instanceInfo = leaseToRenew.getHolder();
                if (instanceInfo != null) {
                    // touchASGCache(instanceInfo.getASGName());
                    InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                            instanceInfo, leaseToRenew, isReplication);
                    if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                        logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                                + "; re-register required", instanceInfo.getId());
                        RENEW_NOT_FOUND.increment(isReplication);
                        return false;
                    }
                    if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                        Object[] args = {
                                instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId()
                        };
                        logger.info(
                                "The instance status {} is different from overridden instance status {} for instance {}. "
                                        + "Hence setting the status to overridden status", args);
                        instanceInfo.setStatus(overriddenInstanceStatus);
                    }
                }
                renewsLastMin.increment();
                leaseToRenew.renew();
                return true;
            }
        }
    

    获取注册信息中的具体实例租债器对象,执行租债器的renew()方法
    com.netflix.eureka.lease.Lease#renew


    image.png

    续约成功,则更改lastUpdateTimestamp的值是当前系统时间戳与duration的和

    服务下架

    服务剔除是eureka服务端对过期的服务(长时间没有心跳的服务)进行定时清除。是服务端开启的定时任务,具体源码实现是在导入EurekaServerInitializerConfiguration类中实现。服务下架是客户端主动下架
    EurekaServerInitializerConfiguration实现了ServletContextAware, SmartLifecycle, Ordered接口。

    start方法执行时机

    SmartLifecycle的start方法调用链(在spring的生命周期中):
    AbstractApplicationContext#refresh
    AbstractApplicationContext#finishRefresh
    DefaultLifecycleProcessor#onRefresh

    1、start

    EurekaServerInitializerConfiguration#start
    在一个线程内执行这个方法,原因是执行eureka上下文环境不会影响spring boot正常启动过程。
    这个方法功能是eureka服务端上下文的初始化,发布了EurekaRegistryAvailableEvent和EurekaServerStartedEvent事件。


    start.png
    2、contextInitialized

    org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap#contextInitialized


    contextInitialized.png
    3、initEurekaServerContext

    EurekaServerBootstrap#initEurekaServerContext
    包括从邻近节点复制注册表


    initEurekaServerContext.png
    4、openForTraffic

    org.springframework.cloud.netflix.eureka.server.InstanceRegistry#openForTraffic


    openForTraffic.png
    5、openForTraffic

    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


    openForTraffic.png
    6、postInit

    com.netflix.eureka.registry.AbstractInstanceRegistry#postInit


    postInit.png
    7、EvictionTask

    com.netflix.eureka.registry.AbstractInstanceRegistry.EvictionTask
    创建EvictionTask类,EvictionTask继承了TimerTask定时器任务类


    EvictionTask.png
    8、evict

    com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)

    public void evict(long additionalLeaseMs) {
            logger.debug("Running the evict task");
    
            if (!isLeaseExpirationEnabled()) {
                logger.debug("DS: lease expiration is currently disabled.");
                return;
            }
    
            // We collect first all expired items, to evict them in random order. For large eviction sets,
            // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
            // the impact should be evenly distributed across all applications.
            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);
                        }
                    }
                }
            }
    
            // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
            // triggering self-preservation. Without that we would wipe out full registry.
            int registrySize = (int) getLocalRegistrySize();
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            int evictionLimit = registrySize - registrySizeThreshold;
    
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    internalCancel(appName, id, false);
                }
            }
        }
    

    首先判断是否开启了自我保护机制,遍历注册表类并判断每个租债器实例调用isExpired方法判断是否过期。对应过期的实例加入到expiredLeases集合中。isExpired方法根据当前时间戳与最后更新时间与duration和additionalLeaseMs比较得到。lastUpdateTimestamp最后更新时间是上次系统时间与duration 的和,最后更新时间注册、心跳续约、服务下架都会对这个参数进行更新。

    public boolean isExpired(long additionalLeaseMs) {
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
        }
    

    对于需要剔除的expiredLeases实例集合,并不会一下把所有的都会删除。会根据registrySize和RenewalPercentThreshold计算出一个evictionLimit剔除最大数量。例如一共有registrySize=100。RenewalPercentThreshold=0.85,则evictionLimit=100-100*0.85=15个。如果expiredLeases超过15个则会任意选出15个进行剔除,对于剩下的下次剔除任务中删除。如果下次还会超过则会触发自动保护机制isLeaseExpirationEnabled,直接返回。

    集群同步

    调用方法replicateToPeers
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#replicateToPeers

    /**
         * Replicates all eureka actions to peer eureka nodes except for replication
         * traffic to this node.
         *
         */
        private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
                // If it is a replication already, do not replicate again as this will create a poison replication
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
    
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry = null;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            }
        }
    

    当前实例的行为包括Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;对当前实例的这些行为操作成功后,则需要把这些状态行为的操作信息实例同步到邻近的eureka服务节点,需不需要集群同步主要看isReplication参数,默认是false需要同步,能够执行到replicateInstanceActionsToPeers方法。

    同步操作

    1、注册后同步register


    register.png

    2、续约后同步renew


    renew.png
    3、状态更新后同步statusUpdate
    statusUpdate.png

    4、下架后同步cancel


    cancel.png
    5、删除状态后DeleteStatusOverride
    DeleteStatusOverride.png
    syncUp

    从邻近的节点复制节点信息
    com.netflix.eureka.registry.PeerAwareInstanceRegistry#syncUp


    syncUp.png
    public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
    
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    在注册同步重试次数RegistrySyncRetries和注册同步等待时间RegistrySyncRetryWaitMs内,对eureka集群的其他对等节点注册的eurekaclient实例同步注册到当前节点。

    自我保护机制

    在服务剔除时,会检查是否开启自我保护机制及是否超过阈值


    evict.png

    PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
    这个方法首先检查enableSelfPreservation参数是否开启(默认是true 开启的),然后判断当前得到的心跳数与自我保护阈值比较。


    isLeaseExpirationEnabled.png
    自我保护阈值计算

    numberOfRenewsPerMinThreshold =expectedNumberOfRenewsPerMin*
    renewalPercentThreshold
    自我保护阈值=每分钟期望续约的心跳数(所有注册上的实例*每分钟触发的心跳连接次数)*自我保护机制的触发百分比(85%)

    假如一共有100个实例,服务端在默认情况下每分钟连接刷新时间(expectedClientRenewalIntervalSeconds)是30s,所以一分钟有2次。
    numberOfRenewsPerMinThreshold = 100*2*0.85=170个

    所以,触发保护机制 当前得到的心跳连接数小于85%时,会触发
    另一种说法,在服务剔除的时候当剔除的数量大于15%时,会触发
    如果客户端续约刷新间隔与服务端续约刷新间隔不一致,则会造成自我保护机制不能正常工作

    阈值更改时机

    1、15分钟自动更改
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask


    scheduleRenewalThresholdUpdateTask.png
    updateRenewalThreshold.png

    2、注册


    image.png
    3、服务下架
    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
    image.png
    4、服务初始化

    com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic


    image.png
    为什么初始化和15分钟自动更改是数量的2倍,注册和下架是+2或-2。因为默认情况下服务端一次刷新心跳是30s,所以在60s内是2次。注册和下架是一个一个操作的

    服务发现

    查询服务时,涉及服务端的三层缓存架构。
    1、只读缓存层readOnlyCacheMap(ConcurrentMap类型)
    2、读写缓存层readWriteCacheMap(LoadingCache类型,使用google的guava框架)
    3、真实数据层
    com.netflix.eureka.resources.ApplicationResource#getApplication

     @GET
        public Response getApplication(@PathParam("version") String version,
                                       @HeaderParam("Accept") final String acceptHeader,
                                       @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
            if (!registry.shouldAllowAccess(false)) {
                return Response.status(Status.FORBIDDEN).build();
            }
    
            EurekaMonitors.GET_APPLICATION.increment();
    
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            if (acceptHeader == null || !acceptHeader.contains("json")) {
                keyType = Key.KeyType.XML;
            }
    
            Key cacheKey = new Key(
                    Key.EntityType.Application,
                    appName,
                    keyType,
                    CurrentRequestVersion.get(),
                    EurekaAccept.fromString(eurekaAccept)
            );
    
            String payLoad = responseCache.get(cacheKey);
    
            if (payLoad != null) {
                logger.debug("Found: {}", appName);
                return Response.ok(payLoad).build();
            } else {
                logger.debug("Not Found: {}", appName);
                return Response.status(Status.NOT_FOUND).build();
            }
        }
    

    com.netflix.eureka.registry.ResponseCacheImpl#get(com.netflix.eureka.registry.Key)

    public String get(final Key key) {
            return get(key, shouldUseReadOnlyResponseCache);
        }
    
        @VisibleForTesting
        String get(final Key key, boolean useReadOnlyCache) {
            Value payload = getValue(key, useReadOnlyCache);
            if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
                return null;
            } else {
                return payload.getPayload();
            }
        }
    
    @VisibleForTesting
        Value getValue(final Key key, boolean useReadOnlyCache) {
            Value payload = null;
            try {
                if (useReadOnlyCache) {
                    final Value currentPayload = readOnlyCacheMap.get(key);
                    if (currentPayload != null) {
                        payload = currentPayload;
                    } else {
                        payload = readWriteCacheMap.get(key);
                        readOnlyCacheMap.put(key, payload);
                    }
                } else {
                    payload = readWriteCacheMap.get(key);
                }
            } catch (Throwable t) {
                logger.error("Cannot get value for key :" + key, t);
            }
            return payload;
        }
    

    首先查看只读缓存层是否打开shouldUseReadOnlyResponseCache,与之对应的是getValue方法的useReadOnlyCache变量值。
    如果打开,则先从只读缓存readOnlyCacheMap查询,如果查询不到则从读写缓存是readWriteCacheMap查询,并放入到只读缓存

    缓存更新原理

    1、只读缓存

    只读缓存没有提供API更新,只能通过定时任务或查询时从读写缓存写到只读缓存
    在创建ResponseCacheImpl类时,开启定时任务


    shouldUseReadOnlyResponseCache.png
    getCacheUpdateTask.png

    如果从读写缓存得到的cacheValue与从只读缓存得到的currentCacheValue不相同,则把cacheValue替换到只读缓存中

    2、读写缓存

    2.1、构建ResponseCacheImpl类


    readWriteCacheMap.png

    如果缓存不存在,则通过generatePayload方法重新加载

    /*
         * Generate pay load for the given key.
         */
        private Value generatePayload(Key key) {
            Stopwatch tracer = null;
            try {
                String payload;
                switch (key.getEntityType()) {
                    case Application:
                        boolean isRemoteRegionRequested = key.hasRegions();
    
                        if (ALL_APPS.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeAllAppsWithRemoteRegionTimer.start();
                                payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeAllAppsTimer.start();
                                payload = getPayLoad(key, registry.getApplications());
                            }
                        } else if (ALL_APPS_DELTA.equals(key.getName())) {
                            if (isRemoteRegionRequested) {
                                tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                                versionDeltaWithRegions.incrementAndGet();
                                versionDeltaWithRegionsLegacy.incrementAndGet();
                                payload = getPayLoad(key,
                                        registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                            } else {
                                tracer = serializeDeltaAppsTimer.start();
                                versionDelta.incrementAndGet();
                                versionDeltaLegacy.incrementAndGet();
                                payload = getPayLoad(key, registry.getApplicationDeltas());
                            }
                        } else {
                            tracer = serializeOneApptimer.start();
                            payload = getPayLoad(key, registry.getApplication(key.getName()));
                        }
                        break;
                    case VIP:
                    case SVIP:
                        tracer = serializeViptimer.start();
                        payload = getPayLoad(key, getApplicationsForVip(key, registry));
                        break;
                    default:
                        logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");
                        payload = "";
                        break;
                }
                return new Value(payload);
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
        }
    
    private String getPayLoad(Key key, Applications apps) {
            EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
            String result;
            try {
                result = encoderWrapper.encode(apps);
            } catch (Exception e) {
                logger.error("Failed to encode the payload for all apps", e);
                return "";
            }
            if(logger.isDebugEnabled()) {
                logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
            }
            return result;
        }
    

    2.2、调用invalidateCache方法,使缓存失效
    com.netflix.eureka.registry.AbstractInstanceRegistry#invalidateCache


    invalidateCache.png

    com.netflix.eureka.registry.ResponseCacheImpl#invalidate


    invalidate.png
    com.netflix.eureka.registry.ResponseCacheImpl#invalidate(com.netflix.eureka.registry.Key...)
    invalidate.png
    调用invalidateCache方法时机

    1、注册后更新缓存
    com.netflix.eureka.registry.AbstractInstanceRegistry#register
    2、服务下架
    com.netflix.eureka.registry.AbstractInstanceRegistry#cancel
    3、statusUpdate
    com.netflix.eureka.registry.AbstractInstanceRegistry#statusUpdate
    4、deleteStatusOverride
    com.netflix.eureka.registry.AbstractInstanceRegistry#deleteStatusOverride

    总结:

    主要就是服务端使用的框架调用原理和Eureka Server端涉及的功能
    那些功能使用客户端调用(有注册,续约,下架)
    服务下架与服务剔除的区别
    eureka服务端涉及的服务同步,自我保护阈值
    服务查询的三级缓存使用原理

    相关文章

      网友评论

          本文标题:Spring Cloud之Eureka源码分析

          本文链接:https://www.haomeiwen.com/subject/gakkrhtx.html