美文网首页springcloud
EurekaClient如何更新注册信息

EurekaClient如何更新注册信息

作者: go4it | 来源:发表于2017-07-01 00:53 被阅读98次

    eureka server定时任务去判断lease是否过期,然后设置lease对应的instance的状态,然后更新到ResponseCache,供客户端来获取刷新,这里讲下client端的刷新过程中是如何剔除实例的。

    client更新task

    eureka-client-1.4.12-sources.jar!/com/netflix/discovery/DiscoveryClient.java

    if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    

    CacheRefreshThread

    /**
         * The task that fetches the registry information at specified intervals.
         *
         */
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    

    refreshRegistry

    @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
    
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes.toString());
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }        
        }
    

    fetchRegistry

    /**
         * Fetches the registry information.
         *
         * <p>
         * This method tries to get only deltas after the first fetch unless there
         * is an issue in reconciling eureka server and client registry information.
         * </p>
         *
         * @param forceFullRegistryFetch Forces a full registry fetch.
         *
         * @return true if the registry was fetched
         */
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    

    getAndUpdateDelta

    /**
         * Get the delta registry information from the eureka server and update it locally.
         * When applying the delta, the following flow is observed:
         *
         * if (update generation have not advanced (due to another thread))
         *   atomically try to: update application with the delta and get reconcileHashCode
         *   abort entire processing otherwise
         *   do reconciliation if reconcileHashCode clash
         * fi
         *
         * @return the client response
         * @throws Throwable on error
         */
        private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }
    
            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                        + "Hence got the full registry.");
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }
    

    updateDelta

    /**
         * Updates the delta information fetches from the eureka server into the
         * local cache.
         *
         * @param delta
         *            the delta information received from eureka server in the last
         *            poll cycle.
         */
        private void updateDelta(Applications delta) {
            int deltaCount = 0;
            for (Application app : delta.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    Applications applications = getApplications();
                    String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                    if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                        Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                        if (null == remoteApps) {
                            remoteApps = new Applications();
                            remoteRegionVsApps.put(instanceRegion, remoteApps);
                        }
                        applications = remoteApps;
                    }
    
                    ++deltaCount;
                    if (ActionType.ADDED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                    } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                    } else if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                    }
                }
            }
            logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
            getApplications().setVersion(delta.getVersion());
            getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.setVersion(delta.getVersion());
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
        }
    

    获取application信息,判断是否删除,如果是则删除实例

    if (ActionType.DELETED.equals(instance.getActionType())) {
                        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                        if (existingApp == null) {
                            applications.addApplication(app);
                        }
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                    }
    

    doc

    相关文章

      网友评论

        本文标题:EurekaClient如何更新注册信息

        本文链接:https://www.haomeiwen.com/subject/wohwcxtx.html