美文网首页
Eureka源码学习(三)— Eureka Server源码解析

Eureka源码学习(三)— Eureka Server源码解析

作者: RealityVibe | 来源:发表于2021-03-15 14:42 被阅读0次

    InstanceRegistry

    InstanceRegistry是Eureka Server中注册表管理的核心接口。在根据类图可以发现它实现了LookUpService和LeaseManager接口。

    LeaseManager主要用于维护实例的注册、续租、下线和清理,而LookupService提供对服务实例进行检索的功能。

    实例的居住证Lease

    初始化

    初始化赋值注册时间和上次更新时间

    public Lease(T r, int durationInSecs) {
        holder = r;
        registrationTimestamp = System.currentTimeMillis();
        lastUpdateTimestamp = registrationTimestamp;
        duration = (durationInSecs * 1000);
    }
    

    服务下线

    // 下线实例
    public void cancel() {
      if (evictionTimestamp <= 0) {
        evictionTimestamp = System.currentTimeMillis();
      }
    }
    
    // 判断是否已经清理
    public boolean isExpired() {
        return isExpired(0l);
    }
    
    // 判断是否已经清理
    public boolean isExpired(long additionalLeaseMs) {
            // 在cancel中可以发现evictionTimestamp被设置为大于0的清理时时间戳
                // additionalLeaseMs
                // ① 清理定时任务中入参为0
                // 
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
    

    Eureka Server服务器用ConcurrentHashMap维护实例注册信息。

    // ConcurrentHashMap<应用名, Map<实例id, Lease<实例信息>>>
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    

    租约更新

    @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        // 是否是同步节点来的,避免
        boolean isFromReplicaNode = "true".equals(isReplication);
        // 进行续租操作
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    
        // Not found in the registry, immediately ask for a register
        if (!isSuccess) {
            // 续租失败,该实例未注册,返回404
            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        // Check if we need to sync based on dirty time stamp, the client
        // instance might have changed some value
        Response response;
        // 防止并发冲突
        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            // Store the overridden status since the validation found out the node that replicates wins
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                    && (overriddenStatus != null)
                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                    && isFromReplicaNode) {
                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            // 成功续租
            response = Response.ok().build();
        }
        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
        return response;
    }
    

    AbstractInstanceRegistry#renew()

    对于续租的方法,不想注册一样需要整个实例信息,只需要实例名称和对应的实例id即可完成续租。

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取实例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                //  如果得到的服务实例最后状态是UNKNOW,取消续约直接return
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                // 服务状态不一致。
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
    
                }
            }
            // 统计每分钟续租的次数,用于自我保护机制
            renewsLastMin.increment();
            // 更新上次更新时间
            leaseToRenew.renew();
            return true;
        }
    }
    

    Lease#renew()

    // 更新上次更新时间
    public void renew() {
      lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
    

    过期租约清理

    在AbstractInstanceRegistry中存在EvictionTask用于定时执行exict(0)来服务剔除,默认为60秒一次。但是如果Eureka Server处于自我保护状态,则无法进行清理操作。在进入自我保护状态后,在Eureka Client处,如果向Eureka Server注册失败,将快速超时并尝试与其他的Eureka Server进行通信。“自我保护机制”的设计大大提高了Eureka的可用性。

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
            // 判断是否打开了自我保护机制
        // isLeaseExpirationEnabled == true 代表未开启自我保护机制
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
    
        // 遍历resgistry保存的注册信息,用expiredLeases收集所有过期的实例信息
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            // 对每个应用的实例列表判断是否需要清理
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
    
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize(); // 获取注册的实例数
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); // 获得阈值 0.85(default)*size
        // 计算清理数量限制 evictionLimit = 
        // 注册实例数 - 注册实例数 * 0.85(default) = 0.15 * 注册实例数
        int evictionLimit = registrySize - registrySizeThreshold; 
    
        // 在limit和需要清理的数量中取较小值
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        // 存在需要清理的实例,这里使用随机算法进行删除
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
            // 用随机算法cancel实例
            // 根据前面的保护机制,最大下线Math.min(expiredLeases.size(), evictionLimit)个实例
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                // !! 注意,服务是逐个清理的
                internalCancel(appName, id, false);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Eureka源码学习(三)— Eureka Server源码解析

          本文链接:https://www.haomeiwen.com/subject/fqykcltx.html