美文网首页
Eureka源码分析(五) ApplicationInfoMan

Eureka源码分析(五) ApplicationInfoMan

作者: skyguard | 来源:发表于2018-11-10 10:45 被阅读0次

    在分析了EurekaServer启动的流程后,我们来分析一下Eureka注册服务的流程。先来介绍一个类ApplicationInfoManager,这个类是用来管理应用实例信息的。在DiscoveryClient初始化的时候,会向EurekaServer注册服务,

    • 调用refreshInstanceInfo()方法,刷新应用实例信息。

    • 调用register() 方法,Eureka-Client 向 Eureka-Server 注册应用实例

    • 调用 ScheduledExecutorService的schedule方法,再次延迟执行任务,并设置 scheduledPeriodicRef。通过这样的方式,不断循环定时执行任务。
      而在ApplicationInfoManager有一个内部类,其实是一个接口StatusChangeListener,监听注册节点状态的变化,并发送通知。
      业务里,调用 ApplicationInfoManager的setInstanceStatus方法,设置应用实例信息的状态,从而通知 InstanceInfoReplicator的onDemandUpdate方法的调用。看下具体的实现

      public synchronized void setInstanceStatus(InstanceStatus status) {
      InstanceStatus next = instanceStatusMapper.map(status);
      if (next == null) {
          return;
      }
      InstanceStatus prev = instanceInfo.setStatus(next);
      if (prev != null) {
          for (StatusChangeListener listener : listeners.values()) {
              try {
                  listener.notify(new StatusChangeEvent(prev, next));
              } catch (Exception e) {
                  logger.warn("failed to notify listener: {}", listener.getId(), e);
              }
          }
      }
      }
      

    InstanceInfoReplicator的onDemandUpdate方法

    public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    // 取消任务
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }
                    // 再次调用
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }
    

    调用 DiscoveryClient的refreshInstanceInfo方法,刷新实例信息。看下具体的实现

    void refreshInstanceInfo() {
        // 刷新 数据中心信息
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        // 刷新 租约信息
        applicationInfoManager.refreshLeaseInfoIfRequired();
        // 健康检查
        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }
        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }
    

    调用 ApplicationInfoManager的refreshDataCenterInfoIfRequired方法,刷新数据中心相关信息,看下具体的实现

    public void refreshDataCenterInfoIfRequired() {
        // hostname
        String existingAddress = instanceInfo.getHostName();
        String newAddress;
        if (config instanceof RefreshableInstanceConfig) {
            // Refresh data center info, and return up to date address
            newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
        } else {
            newAddress = config.getHostName(true);
        }
        // ip
        String newIp = config.getIpAddress();
        if (newAddress != null && !newAddress.equals(existingAddress)) {
            logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
            // :( in the legacy code here the builder is acting as a mutator.
            // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
            // We will most likely re-write the client at sometime so not fixing for now.
            InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
            builder.setHostName(newAddress) // hostname
                    .setIPAddr(newIp) // ip
                    .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
            instanceInfo.setIsDirty();
        }
    }
    

    调用 ApplicationInfoManager的refreshLeaseInfoIfRequired方法,刷新租约相关信息,看下具体的实现

    public void refreshLeaseInfoIfRequired() {
        LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
        if (leaseInfo == null) {
            return;
        }
        int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
        int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
        if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变
                || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变
            LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(currentLeaseRenewal)
                    .setDurationInSecs(currentLeaseDuration)
                    .build();
            instanceInfo.setLeaseInfo(newLeaseInfo);
            instanceInfo.setIsDirty();
        }
    }
    

    调用 DiscoveryClient的register方法,Eureka-Client 向 Eureka-Server 注册应用实例,看下具体的实现

    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    

    调用AbstractJerseyEurekaHttpClient的register方法发送请求

    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        // 设置 请求地址
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            // 设置 请求头
            addExtraHeaders(resourceBuilder);
            // 请求 Eureka-Server
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip") // GZIP
                    .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
                    .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
                    .post(ClientResponse.class, info); // 请求参数
            // 创建 EurekaHttpResponse
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    EurekaServer通过ApplicationResource的addInstance方法注册实例信息的请求

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        // 校验参数是否合法
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }
    
        // AWS 相关,跳过
        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
    
        // 注册应用实例信息
        registry.register(info, "true".equals(isReplication));
    
        // 返回 204 成功
        return Response.status(204).build();  // 204 to be backwards compatible
    }
    

    调用 PeerAwareInstanceRegistryImpl的register方法,注册应用实例信息,看下具体的实现

    public void register(final InstanceInfo info, final boolean isReplication) {
        // 租约过期时间
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 注册应用实例信息
        super.register(info, leaseDuration, isReplication);
        // Eureka-Server 复制
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    

    在看具体的注册应用实例信息的逻辑之前,我们先来看下 com.netflix.eureka.lease.Lease,租约。先来看看Lease类都有哪些属性

    /**
     * 操作
     */
    enum Action {
        /**
         * 注册
         */
        Register,
        /**
         * 取消注册
         */
        Cancel,
        /**
         * 续约
         */
        Renew
    };
    
    /**
     * 默认租约持续时长,单位:秒
     */
    public static final int DEFAULT_DURATION_IN_SECS = 90;
    
    /**
     * 实体
     */
    private T holder;
    /**
     * 取消注册时间戳
     */
    private long evictionTimestamp;
    /**
     * 注册时间戳
     */
    private long registrationTimestamp;
    /**
     * 开始服务时间戳
     */
    private long serviceUpTimestamp;
    /**
     * 最后更新时间戳
     */
    // Make it volatile so that the expiration task would see this quicker
    private volatile long lastUpdateTimestamp;
    /**
     * 租约持续时长,单位:毫秒
     */
    private long duration;
    

    调用 AbstractInstanceRegistry的register方法,注册应用实例信息

     public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 获取读锁
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 增加 注册次数 到 监控
            REGISTER.increment(isReplication);
            // 获得 应用实例信息 对应的 租约
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
                if (gMap == null) { // 添加 应用 成功
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin`
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 创建 租约
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 添加到 租约映射
            gMap.put(registrant.getId(), lease);
            // 添加到 最近注册的调试队列
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            // 设置 应用实例覆盖状态
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            // 获得 应用实例状态
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            // 设置 应用实例状态
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            // 设置 租约的开始服务的时间戳(只有第一次有效)
            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            // 设置 应用实例信息的操作类型 为 添加
            registrant.setActionType(ActionType.ADDED);
            // 添加到 最近租约变更记录队列
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // 设置 租约的最后更新时间戳
            registrant.setLastUpdatedTimestamp();
            // 设置 响应缓存 过期
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            // 释放锁
            read.unlock();
        }
    }
    

    这里有几个队列,一个是最近注册的调试队列,一个是最近租约变更记录队列。设置租约的最近更新时间,以及响应缓存周期,EurekaServer的注册实例信息就完成了。
    ApplicationInfoManager的分析就到这里了。

    相关文章

      网友评论

          本文标题:Eureka源码分析(五) ApplicationInfoMan

          本文链接:https://www.haomeiwen.com/subject/cppwxqtx.html