Eureka详解

作者: gluawwa | 来源:发表于2018-08-22 23:47 被阅读9次

      在将一个普通的spring boot应用注册到eureka server或是从eureka server中获取服务列表时,主要做了两件事:
        在应用主类配置了@EnableEurekaClient注解
        在application.properties中指定了服务注册中心的位置
      @EnableEurekaClient的源码如下:

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    public @interface EnableEurekaClient {
    }
    

      通过搜索EurekaClient我们找到了一个接口EurekaClient以及接口的实现类DiscoveryClient

    /**
     * The class that is instrumental for interactions with <tt>Eureka Server</tt>.
     *
     * <p>
     * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the
     * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with
     * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from
     * <tt>Eureka Server</tt> during shutdown
     * <p>
     * d) <em>Querying</em> the list of services/instances registered with
     * <tt>Eureka Server</tt>
     * <p>
     *
     * <p>
     * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>
     * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips
     * which do not change. All of the functions defined above fail-over to other
     * {@link java.net.URL}s specified in the list in the case of failure.
     * </p>
     *
     * @author Karthik Ranganathan, Greg Kim
     * @author Spencer Gibb
     *
     */
    @Singleton
    public class DiscoveryClient implements EurekaClient {
    ...
    }
    

      通过这个接口的注释,可以了解到DiscoveryClient负责下面的任务:
        向eureka server注册服务实例
        向eureka server注册服务租约
        向eureka server取消租约
        查询eureka server中的服务实例列表
      Eureka Client还要配置一个Eureka Server的url列表
      在具体研究eureka client完成的任务之前,我们先看看哪里对eureka server的url列表进行配置。根据配置的属性名eureka.client.service-url.defaultZone,通过serviceUrl可以找到该属性相关的加载属性,DiscoveryClient中的相关方法被标识为过时的,并@link到了EndPointUtils类,我们可以在该类中找到这个函数:

     /**
         * Get the list of all eureka service urls from properties file for the eureka client to talk to.
         *
         * @param clientConfig the clientConfig to use
         * @param instanceZone The zone in which the client resides
         * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
         * @return The list of all eureka service urls for the eureka client to talk to
         */
        public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
            List<String> orderedUrls = new ArrayList<String>();
            String region = getRegion(clientConfig);
            String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            if (availZones == null || availZones.length == 0) {
                availZones = new String[1];
                availZones[0] = DEFAULT_ZONE;
            }
            logger.debug("The availability zone for the given region {} are {}", region, availZones);
            int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
    
            List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
            if (serviceUrls != null) {
                orderedUrls.addAll(serviceUrls);
            }
            int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
            while (currentOffset != myZoneOffset) {
                serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
                if (serviceUrls != null) {
                    orderedUrls.addAll(serviceUrls);
                }
                if (currentOffset == (availZones.length - 1)) {
                    currentOffset = 0;
                } else {
                    currentOffset++;
                }
            }
    
            if (orderedUrls.size() < 1) {
                throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
            }
            return orderedUrls;
        }
    

      在上面的函数中,可以发现,客户端依次加载了两个内容,第一个是region,第二个是zone。
      通过getRegion函数,可以看到它从配置中读取了一个region返回,所以一个微服务应用只可以属于一个region。如果我们要自己设置,可以通过eureka.client.region属性来定义。

    public static String getRegion(EurekaClientConfig clientConfig) {
            String region = clientConfig.getRegion();
            if (region == null) {
                region = DEFAULT_REGION;
            }
            region = region.trim().toLowerCase();
            return region;
        }
    

      通过getAvailabilityZones函数,可以看出region和zone是一对多的关系。若要为应用指定zone,可以通过eureka.client.availableity-zone属性来设置。

    public String[] getAvailabilityZones(String region) {
            String value = this.availabilityZones.get(region);
            if (value == null) {
                value = DEFAULT_ZONE;
            }
            return value.split(",");
        }
    

      在获取region和zone的信息之后,才开始真正加载eureka server的具体地址。它根据传入的参数按一定算法确定加载位于哪一个zone的serviceUrls。具体获取serviceUrls的实现,可以查看EurekaClientConfigBean的getEurekaServerServiceUrls方法:

    @Override
        public List<String> getEurekaServerServiceUrls(String myZone) {
            String serviceUrls = this.serviceUrl.get(myZone);
            if (serviceUrls == null || serviceUrls.isEmpty()) {
                serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
            }
            if (!StringUtils.isEmpty(serviceUrls)) {
                final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
                List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
                for (String eurekaServiceUrl : serviceUrlsSplit) {
                    if (!endsWithSlash(eurekaServiceUrl)) {
                        eurekaServiceUrl += "/";
                    }
                    eurekaServiceUrls.add(eurekaServiceUrl);
                }
                return eurekaServiceUrls;
            }
    
            return new ArrayList<>();
        }
    

    服务注册

      在理解了多个服务注册中心的加载后,我们再回头看DiscoveryClient类是如何实现服务注册行为的,通过查看它的构造类,可以找到它调用了下面这个函数:

     */
        private void initScheduledTasks() {
            ...
            if (clientConfig.shouldRegisterWithEureka()) {
                ...
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    
              ...
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }
    

      从上面的函数中,可以看到一个与服务注册相关的判断语句if (clientConfig.shouldRegisterWithEureka())。在该分支内,创建了一个InstanceInfoReplicator类的实例,它会执行一个定时任务,任务的run函数如下所示:

    public void run() {
            try {
                discoveryClient.refreshInstanceInfo();
    
                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }
    

      run函数中的discoveryClient.register函数真正触发了注册。继续查看register()的实现内容,如下所示:

    boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
    

      顺着上面的思路,继续看initScheduledTasks函数,不难发现在其中还有两个定时任务,分别是服务获取和服务续约:

    if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    ...
     // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
    

    服务注册中心处理

      通过上面的源码分析,可以看到所有的交互都是通过rest请求发起的。eureka server对于各类rest请求的定义都位于com.netflix.eureka.resource包下
      以服务注册为例,注册接口在ApplicationResource类中:

    @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        ...
                DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
                if (dataCenterInfo instanceof UniqueIdentifier) {
                    String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
                    if (this.isBlank(dataCenterInfoId)) {
                        boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                        if (experimental) {
                            String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                            return Response.status(400).entity(entity).build();
                        }
    
                        if (dataCenterInfo instanceof AmazonInfo) {
                            AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                            String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                            if (effectiveId == null) {
                                amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                            }
                        } else {
                            logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                        }
                    }
                }
    
                this.registry.register(info, "true".equals(isReplication));
                return Response.status(204).build();
            }
        }
    

      在对注册信息进行校验之后,会调用InstanceResgistry的register函数来进行服务注册:

    public void register(final InstanceInfo info, final boolean isReplication) {
            this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }
    
    private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
            this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
            this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
        }
    
    

      在注册函数中,先调用publishEvent函数,将服务u 注册信息传播出去,然后调用AbstractInstanceRegistry父类中的注册实现,将InstanceInfo中的元数据信息存储在一个ConcurrentHashMap中。

    相关文章

      网友评论

        本文标题:Eureka详解

        本文链接:https://www.haomeiwen.com/subject/kmkuiftx.html