美文网首页spring cloud
eureka-客户端服务发现-06

eureka-客户端服务发现-06

作者: 愤怒的奶牛 | 来源:发表于2019-08-17 18:04 被阅读0次

    前面分析了 客户端发送http 请求到 server 端注册服务信息。现在我们来分析 客户端是如何发现服务的,也就是客户端时如何拿到注册中心的 服务列表的。首先我们来看一个 eureka client api:

    package com.example.cloudcustomer.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.discovery.DiscoveryClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.List;
    import java.util.stream.Collectors;
    
    @RestController
    public class DiscoveryClientApi {
    
     // 这个时spring cloud 的 DiscoveryClient,还有一个是Netflix 的  
     // com.netflix.discovery.DiscoveryClient
        @Autowired
        private DiscoveryClient discoveryClient; // org.springframework.cloud.client.discovery.DiscoveryClient;
    
        /**
         * 获取服务名
         * @return
         */
        @GetMapping("services")
        public List<String> getList() {
            List<String> services = discoveryClient.getServices();
            return services;
        }
    
        /**
         * 获取服务实例信息
         * @param serverId
         * @return
         */
        @GetMapping("/instance/{serverId}")
        public List<String> getInstanceInfo(@PathVariable String serverId) {
            List<ServiceInstance> instances = discoveryClient.getInstances(serverId);
            return instances.stream().map(ServiceInstance::getInstanceId).collect(Collectors.toList());
        }
        
    }
    
    • List<String> services = discoveryClient.getServices();

    我们就从这里开始。discoveryClient.getServices();

    • org.springframework.cloud.client.discovery.DiscoveryClient;
    package org.springframework.cloud.client.discovery;
    import java.util.List;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.core.Ordered;
    /**
     * Represents read operations commonly available to discovery services such as Netflix
     * Eureka or consul.io.
     *
     * @author Spencer Gibb
     * @author Olga Maciaszek-Sharma
     */
    public interface DiscoveryClient extends Ordered {
    
        /**
         * Default order of the discovery client.
         */
        int DEFAULT_ORDER = 0;
    
        /**
         * A human-readable description of the implementation, used in HealthIndicator.
         * @return The description.
         */
        String description();
    
        /**
         * Gets all ServiceInstances associated with a particular serviceId.
         * @param serviceId The serviceId to query.
         * @return A List of ServiceInstance.
         */
        List<ServiceInstance> getInstances(String serviceId);
    
        /**
         * @return All known service IDs.
         */
        List<String> getServices();
    
        /**
         * Default implementation for getting order of discovery clients.
         * @return order
         */
        @Override
        default int getOrder() {
            return DEFAULT_ORDER;
        }
    
    }
    
    

    这个就是一个顶层的接口,我们来看一下实现类:EurekaDiscoveryClient

    • EurekaDiscoveryClient
    package org.springframework.cloud.netflix.eureka;
    
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    
    import com.netflix.appinfo.EurekaInstanceConfig;
    import com.netflix.appinfo.InstanceInfo;
    import com.netflix.discovery.EurekaClient;
    import com.netflix.discovery.EurekaClientConfig;
    import com.netflix.discovery.shared.Application;
    import com.netflix.discovery.shared.Applications;
    
    import org.springframework.cloud.client.DefaultServiceInstance;
    import org.springframework.cloud.client.ServiceInstance;
    import org.springframework.cloud.client.discovery.DiscoveryClient;
    import org.springframework.core.Ordered;
    import org.springframework.util.Assert;
    
    import static com.netflix.appinfo.InstanceInfo.PortType.SECURE;
    
    /**
     * A {@link DiscoveryClient} implementation for Eureka.
     *
     * @author Spencer Gibb
     * @author Tim Ysewyn
     */
    public class EurekaDiscoveryClient implements DiscoveryClient {
    
        /**
         * Client description {@link String}.
         */
        public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
    
    // 这个就 Netflix 的 EurekaClient 客户端
        private final EurekaClient eurekaClient;
    
        private final EurekaClientConfig clientConfig;
    
        @Deprecated
        public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
            this(eurekaClient, eurekaClient.getEurekaClientConfig());
        }
    
        public EurekaDiscoveryClient(EurekaClient eurekaClient,
                EurekaClientConfig clientConfig) {
            this.clientConfig = clientConfig;
            this.eurekaClient = eurekaClient;
        }
    
        @Override
        public String description() {
            return DESCRIPTION;
        }
    
        @Override
        public List<ServiceInstance> getInstances(String serviceId) {
            List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
                    false);
            List<ServiceInstance> instances = new ArrayList<>();
            for (InstanceInfo info : infos) {
                instances.add(new EurekaServiceInstance(info));
            }
            return instances;
        }
    
    // 这个就是获取 服务列表
        @Override
        public List<String> getServices() {
    // 获取 服务信息
            Applications applications = this.eurekaClient.getApplications();
            if (applications == null) {
                return Collections.emptyList();
            }
            List<Application> registered = applications.getRegisteredApplications();
            List<String> names = new ArrayList<>();
            for (Application app : registered) {
                if (app.getInstances().isEmpty()) {
                    continue;
                }
                names.add(app.getName().toLowerCase());
    
            }
            return names;
        }
    。。。。
    }
    

    Applications applications = this.eurekaClient.getApplications(); 看一下这个方法。

    • DiscoveryClient 默认的实现
      private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
    .....
    // 看到这里我们只知道 客户端获取 服务信息 是 从一个 原子类中获取的 Applications 对象。这个对象里面肯定保存了 具体的服务信息。
        @Override
        public Applications getApplications() {
            return localRegionApps.get();
        }
    ....
    

    问题来了,这个Applications 在什么时候 设置 到 AtomicReference 里面的?按常规思路 服务列表信息 应该是从 eureka server 去 查询 ,拿到列表返回 才对啊。但是这里是 从 本地获取的,只能说明 eureka 在这里做了优化,因为如果 每次在获取服务列表的时候,都要发送一次 http 请求的话,会非常消耗资源,而且 服务列表 的特点是 一旦注册 成功后,变化不频繁,就算有 服务挂掉,也不是 实时变化的。所以 eureka 就把服务中心的 服务列表 保存了 一份在本地缓存(AtomicReference),然后客户端只需要去本地查询就ok,那么服务列表 要如何 去获取 并更新呢?很容易就会想到 后台线程来干这个事情,比如 开个定时任务,定时去轮询 服务列表信息,并更新到本地缓存即可。实时上 eureka 也就是这样做的。
    我们再去看一下 客户端的代码。

    • DiscoveryClient
    // 构造器
    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }
            
            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();
    
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }
    
            this.backupRegistryProvider = backupRegistryProvider;
            this.endpointRandomizer = endpointRandomizer;
            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());
    
            fetchRegistryGeneration = new AtomicLong(0);
    
            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }
    
            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
    
            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
                initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, this.getApplications().size());
    
                return;  // no need to setup up an network tasks and we are done
            }
    
            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
    
                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }
    
            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }
    
            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
    
            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
    
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            initScheduledTasks();
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
        }
    

    找到 以先代码:

       // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            initScheduledTasks();
    
    
    • initScheduledTasks();
     /**
         * Initializes all scheduled tasks.//开启所有的定时任务
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer // 定时 更新缓存
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread() // 传入一个线程
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    
            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
    
                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };
    
                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }
    
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    
    • new CacheRefreshThread() 看下这个线程。
     class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry(); // 刷新 数据
            }
        }
    
    • refreshRegistry();
       @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
    //其他看不懂,没关系,fetchRegistry 看名字 就知道是 抓取 注册信息,进去看一下。
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
    
                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
    
    • boolean success = fetchRegistry(remoteRegionsModified);
      private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
    // get 并保存 所有 ,其他逻辑可以不看
                    getAndStoreFullRegistry(); 
                } else {
    // get 并更新
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    
    • getAndStoreFullRegistry();
    private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
    
            logger.info("Getting all instance registry info from the eureka server");
    
    // http 请求 去查询 服务列表信息,并返回
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());
    
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
    //  localRegionApps.set(this.filterAndShuffle(apps)); 这句话就是将服务信息保存到本地。
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }
    

    到这里 我们就明白了,原来 eureka client 的服务发现 本质 还是去 远程服务 查询的,只是优化成了 后台线程 定时去获取,并更新到本地缓存,客户端直接就去 本地缓存取了,不直接取 发送http 请求取查询。

    相关文章

      网友评论

        本文标题:eureka-客户端服务发现-06

        本文链接:https://www.haomeiwen.com/subject/evsksctx.html