美文网首页
spring-cloud-eureka-client功能简要分析

spring-cloud-eureka-client功能简要分析

作者: 云海_54d4 | 来源:发表于2018-04-28 15:48 被阅读0次

      spring cloud默认使用eureka来做服务治理。作为client的核心功能有两个:服务发现和服务注册。
      通过查看spring-cloud-netflix-eureka-client下的类发现功能核心类是CloudEurekaClient,他继承了netflix的DiscoveryClient。
    查看DiscoveryClient源码,注释中写明了DiscoveryClient的四个功能:


    image.png

    下面一个个来看这些功能是如何实现的。
    1.d)服务发现getApplications()

        @Override
        public Applications getApplications() {
            return localRegionApps.get();
        }
    
    

    localRegionApps是一个成员变量

        private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
    
    

    所以一定是有其他地方先set了Applications,getApplications() 方法才能正确返回。通过搜索代码,发现为localRegionApps设值的地方代码基本相似:

    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    Applications serverApps = httpResponse.getEntity();
    localRegionApps.set(this.filterAndShuffle(serverApps));
    

    可以看出localRegionApps的来源,通过eurekaTransport.queryClient获得一个EurekaHttpResponse<Applications>,再进过过滤和乱序处理存入localRegionApps。
      那现在就有两个问题:1.这段代码是由谁执行的,2.eurekaTransport.queryClient是如何工作的。
      通过call Hierarchy发现调用链的最开始:

     class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    

      查询CacheRefreshThread被使用的地方:

    private void initScheduledTasks(){...}
    

    而initScheduledTasks()被调用的地方是DiscoveryClient的构造函数,所以这一切在DiscoveryClient被创建时就开始了。
    查看initScheduledTasks()代码:

    scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
    ...
    scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    

    可以看出这是由线程池周期执行的任务,那现在就有两个地方:1.执行的线程池scheduler、cacheRefreshExecutor、cacheRefreshExecutor 2.被执行的回调:HeartbeatThread、CacheRefreshThread。CacheRefreshThread上面知道了是用来设置localRegionApps的,HeartbeatThread从注释来看是用来b)服务续约的:

      /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {
    
            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    

    找到三个线程池的初始化代码:

     scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff
    
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );
    

    可以看出eureka默认用了两个线程来做任务调度,两个任务各有一个专属线程独自负责任务的实际执行。这种依靠线程池的隔离策略,在netflix组件中用到的地方有很多。
        找到了CacheRefreshThread是被谁调用的,接着分析eurekaTransport.queryClient是如何工作的。eurekaTransport.queryClient的服务发现方法:getApplications()

    @Override
        public EurekaHttpResponse<Applications> getApplications(String... regions) {
            return getApplicationsInternal("apps/", regions);
        }
    
        private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath,
                String[] regions) {
            String url = serviceUrl + urlPath;
    
            if (regions != null && regions.length > 0)
                urlPath = (urlPath.contains("?") ? "&" : "?") + "regions="
                        + StringUtil.join(regions);
    
            ResponseEntity<EurekaApplications> response = restTemplate.exchange(url,
                    HttpMethod.GET, null, EurekaApplications.class);
    
            return anEurekaHttpResponse(response.getStatusCodeValue(),
                    response.getStatusCode().value() == HttpStatus.OK.value()
                            && response.hasBody() ? (Applications) response.getBody() : null)
                                    .headers(headersOf(response)).build();
        }
    

    可以看出是向serviceUrl进行http请求获得相关数据的。而serviceUrl是DiscoveryClient在初始化eurekaTransport时传入的。DiscoveryClient初始化时主要有三个参数:ApplicationInfoManager、EurekaClientConfig、ApplicationEventPublisher,通过查看配置类EurekaClientAutoConfiguration可以看到,在SpringCloud中创建的其实是DiscoveryClient的子类CloudEurekaClient:

            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }
    

    ApplicationInfoManager、EurekaClientConfig是自动注入的ApplicationInfoManager manager, EurekaClientConfig config而ApplicationEventPublisher是容器上下文ApplicationContext。继续查找代码可以发现:

            @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
            public ApplicationInfoManager eurekaApplicationInfoManager(
                    EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }
    
            @ConfigurationProperties(EurekaClientConfigBean.PREFIX)
            public class EurekaClientConfigBean implements EurekaClientConfig {...}
    

        ApplicationInfoManager来源于EurekaInstanceConfig ,而EurekaInstanceConfig 其实是由spring创建的EurekaInstanceConfigBean,通过@ConfigurationProperties("eureka.instance")收集了配置文件中的eureka.instance前缀的配置。
        EurekaClientConfig其实是由spring实现并创建的EurekaClientConfigBean,通过@ConfigurationProperties("eureka.client")收集了配置文件中的eureka.client前缀的配置。
        到这可以看出来springcloud做的工作其实就是收集配置并用来初始化DiscoveryClient。其实spring本质上作为一个beanfactory,工作就是创建并管理bean的生命周期。

    2.ab)注册和续约
    注册方法:register(),查看调用发现在DiscoveryClient的构造函数和renew()函数中被调用了。查看代码:

        /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
    

    其实是通过eurekaTransport.registrationClient.register(instanceInfo)和server进行了一次rest-http调用。instanceInfo是这个客户端本身。
    续约方法:renew(),上面说到了续约方法被放入new HeartbeatThread()中,被线程池周期执行。

        /**
         * Renew with the eureka service by making the appropriate REST call
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }
    

    可以看出也是通过 eurekaTransport.registrationClient进行一次http请求,如果请求失败,则调用一次register()从新注册,如果都失败则返回false。
    3.c)注销

    @PreDestroy
        @Override
        public synchronized void shutdown() {
            if (isShutdown.compareAndSet(false, true)) {
                logger.info("Shutting down DiscoveryClient ...");
    
                if (statusChangeListener != null && applicationInfoManager != null) {
                    applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
                }
    
                cancelScheduledTasks(); //取消上面所说的两个周期任务,并关闭线程池
    
                // If APPINFO was registered
                if (applicationInfoManager != null
                        && clientConfig.shouldRegisterWithEureka()
                        && clientConfig.shouldUnregisterOnShutdown()) {
                    applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                    unregister();
                }
    
                if (eurekaTransport != null) {
                    eurekaTransport.shutdown();
                }
    
                heartbeatStalenessMonitor.shutdown();
                registryStalenessMonitor.shutdown();
    
                logger.info("Completed shut down of DiscoveryClient");
            }
        }
    

    可以看出主要工作有两部分,1.回收资源 2.调用unregister()通知eureka-server注销。查看unregister()代码:

    void unregister() {
            // It can be null if shouldRegisterWithEureka == false
            if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
                try {
                    logger.info("Unregistering ...");
                    EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                    logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
                } catch (Exception e) {
                    logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
                }
            }
        }
    

    可以看出这里也是通过eurekaTransport.registrationClient进行了一次http请求。

    综上所述,eureka的主要靠独立的线程池执行周期性任务来执行http请求来进行服务发现的更新和服务续约。而spring所扮演的角色只是DiscoveryClient的创建和管理者,并没有改变eureka的内部功能。我们也可以通过自己创建和管理DiscoveryClient在非springcloud项目中独立地使用eureka,eureka功能完备,自己集成相对简单。总之,就是从server通过http请求获得服务数据而已,可以自己通过浏览器访问:http://localhost:8761/eureka/apps看看数据

    相关文章

      网友评论

          本文标题:spring-cloud-eureka-client功能简要分析

          本文链接:https://www.haomeiwen.com/subject/kseglftx.html