美文网首页
Spring Cloud Eureka 源码分析 — Serve

Spring Cloud Eureka 源码分析 — Serve

作者: 想起个帅气的头像 | 来源:发表于2021-01-18 20:25 被阅读0次

    一. 前言

    本文主要分析eureka server端启动流程,包括启动入口、bean注入过程、初始化过程等。

    client端注册逻辑请参见Spring Cloud Eureka 源码分析 —— Client端
    server端服务治理请参见Spring Cloud Eureka 源码分析 —— Server端(服务治理)

    二. 源码分析

    1. 入口

    我们知道开启eureka server端要加注解@EnableEurekaServer

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(EurekaServerMarkerConfiguration.class)
    public @interface EnableEurekaServer {
    
    }
    

    这个enable注解引入了EurekaServerMarkerConfiguration的import,内部只是一个Marker的标记,用于激活EurekaServerAutoConfiguration

    2. 启动配置bean注入

    EurekaServerAutoConfiguration

    介绍下内部主要的bean定义

    EurekaController
        @Bean
        @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
                matchIfMissing = true)
        public EurekaController eurekaController() {
            return new EurekaController(this.applicationInfoManager);
        }
    

    定义了eureka看板的controller

    PeerAwareInstanceRegistry
        @Bean
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
                ServerCodecs serverCodecs) {
            this.eurekaClient.getApplications(); // force initialization
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                    serverCodecs, this.eurekaClient,
                    this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    

    eureka server端完成服务注册、续约、下线的核心处理类,所有的业务实现逻辑都在此类完成。

    其中代码注释中写到force initialization,原因是eurekaClient是lazy的bean,eureka server虽然是服务端,但其也一个客户端进行自我注册。在初始化过程中,完成一系列的客户端侧的注册、拉取等。详情参考Spring Cloud Eureka 源码分析 —— Client端

    PeerEurekaNodes
        @Bean
        @ConditionalOnMissingBean
        public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
                ServerCodecs serverCodecs,
                ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
            return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                    this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
                    replicationClientAdditionalFilters);
        }
    

    这个bean的字面意思是平级的eureka server端node,内部会封装eureka各个server节点的信息。

    EurekaServerContext
        @Bean
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
                PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                    registry, peerEurekaNodes, this.applicationInfoManager);
        }
    

    eureka上下文,封装了eureka的各种bean对象。

    EurekaServerBootstrap
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
                EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager,
                    this.eurekaClientConfig, this.eurekaServerConfig, registry,
                    serverContext);
        }
    

    eureka server端的启动类,封装了server端的启动、销毁实现。在完成bean初始化后调用。

    FilterRegistrationBean
        @Bean
        @ConditionalOnBean(name = "httpTraceFilter")
        public FilterRegistrationBean<?> traceFilterRegistration(
                @Qualifier("httpTraceFilter") Filter filter) {
            FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
            bean.setFilter(filter);
            bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
            return bean;
        }
    

    提供了指定bean名为httpTraceFilter过滤器的Registration,如果有这个bean会加入到FilterRegistrationBean中,在springboot actuator中有默认实现,也可以自定义bean的实现。

    EurekaServerConfig
            @Bean
            @ConditionalOnMissingBean
            public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
                EurekaServerConfigBean server = new EurekaServerConfigBean();
                if (clientConfig.shouldRegisterWithEureka()) {
                    // Set a sensible default if we are supposed to replicate
                    server.setRegistrySyncRetries(5);
                }
                return server;
            }
    

    保存server端配置属性的bean,前缀是eureka.server.xxx

    常用配置:
    参数 说明
    enableSelfPreservation 是否允许开启自我保护,默认true
    renewalPercentThreshold 开启自我保护的阈值乘数百分比,默认0.85
    renewalThresholdUpdateIntervalMs 定时任务重新计算自我保护阈值的时间周期,默认15min
    responseCacheUpdateIntervalMs readOnlyCacheMap每次从readWriteCacheMap同步最新数据的时间周期,默认30s
    responseCacheAutoExpirationInSeconds 注册表readWriteCacheMap的过期时间,默认180s
    useReadOnlyResponseCache 是否开启注册表的readOnlyCacheMap,默认true
    expectedClientRenewalIntervalSeconds 期望客户端发送续约的时间周期,默认30s
    leaseExpirationDurationInSeconds 服务端接收到最后一次续约后,超过此时间未再次续约可被剔除,默认90s
    evictionIntervalTimerInMs 剔除任务的执行周期,默认60s
    jerseyApplication
        @Bean
        public javax.ws.rs.core.Application jerseyApplication(Environment environment,
                ResourceLoader resourceLoader) {
    
            ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                    false, environment);
    
            // Filter to include only classes that have a particular annotation.
            //
            provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
            provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
            // Find classes in Eureka packages (or subpackages)
            //
            Set<Class<?>> classes = new HashSet<>();
            for (String basePackage : EUREKA_PACKAGES) {
                Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
                for (BeanDefinition bd : beans) {
                    Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                            resourceLoader.getClassLoader());
                    classes.add(cls);
                }
            }
    
            // Construct the Jersey ResourceConfig
            Map<String, Object> propsAndFeatures = new HashMap<>();
            propsAndFeatures.put(
                    // Skip static content used by the webapp
                    ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
                    EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
            DefaultResourceConfig rc = new DefaultResourceConfig(classes);
            rc.setPropertiesAndFeatures(propsAndFeatures);
    
            return rc;
        }
    
    扫描了一系列的Resource的bean,具体列表如下图:
        @Bean
        public FilterRegistrationBean<?> jerseyFilterRegistration(
                javax.ws.rs.core.Application eurekaJerseyApp) {
            FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
            bean.setFilter(new ServletContainer(eurekaJerseyApp));
            bean.setOrder(Ordered.LOWEST_PRECEDENCE);
            bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
            return bean;
        }
    

    注册jerseyFilter的bean,所有的/eureka/*的请求都会经过这个filter,ServletContainer作为servlet来处理请求,而已经的请求资源规则就是上面的这些resource。

    3. 启动流程

    EurekaServerContext

    eureka server端的上下文,涵盖了server端的各种bean的对象,并在内部完成。。。的初始化

    @PostConstruct
        @Override
        public void initialize() {
            logger.info("Initializing ...");
            peerEurekaNodes.start();
            try {
                registry.init(peerEurekaNodes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            logger.info("Initialized");
        }
    
    @PreDestroy
        @Override
        public void shutdown() {
            logger.info("Shutting down ...");
            registry.shutdown();
            peerEurekaNodes.shutdown();
            logger.info("Shut down");
        }
    

    在bean完成构造之后,会基于@PostConstruct注解调用initialize方法。首先会调用peerEurekaNodes.start();

        public void start() {
            taskExecutor = Executors.newSingleThreadScheduledExecutor(
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                            thread.setDaemon(true);
                            return thread;
                        }
                    }
            );
            try {
                updatePeerEurekaNodes(resolvePeerUrls());
                Runnable peersUpdateTask = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            updatePeerEurekaNodes(resolvePeerUrls());
                        } catch (Throwable e) {
                            logger.error("Cannot update the replica Nodes", e);
                        }
    
                    }
                };
                taskExecutor.scheduleWithFixedDelay(
                        peersUpdateTask,
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                        TimeUnit.MILLISECONDS
                );
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
            for (PeerEurekaNode node : peerEurekaNodes) {
                logger.info("Replica node URL:  {}", node.getServiceUrl());
            }
        }
    
     /**
         * Resolve peer URLs.
         * 返回配置的service-url,如果配置了myUrl,则需要过滤一下,如 http://localhost:8761/eureka/,http://localhost:8762/eureka/
         */
        protected List<String> resolvePeerUrls() {
            InstanceInfo myInfo = applicationInfoManager.getInfo();
            String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
            List<String> replicaUrls = EndpointUtils
                    .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
    
            int idx = 0;
            while (idx < replicaUrls.size()) {
                if (isThisMyUrl(replicaUrls.get(idx))) {
                    replicaUrls.remove(idx);
                } else {
                    idx++;
                }
            }
            return replicaUrls;
        }
    
    1. 首先创建了一个task线程池;
    2. 根据给定的service-urls来更新PeerEurekaNodesPeerEurekaNode,删除或创建新的节点;
    3. 开启了一个定时任务,每个一段事件,同样刷新PeerEurekaNodes

    介绍下更新PeerEurekaNodes的过程:

    updatePeerEurekaNodes
      protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
            if (newPeerUrls.isEmpty()) {
                logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
                return;
            }
            // 一种两个集合取差集的写法
            Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
            toShutdown.removeAll(newPeerUrls);
            Set<String> toAdd = new HashSet<>(newPeerUrls);
            toAdd.removeAll(peerEurekaNodeUrls);
    
            if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
                return;
            }
    
            // Remove peers no long available
            List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
            // 将不存在的url从集合中移除,并调用node.shutdown();
            if (!toShutdown.isEmpty()) {
                logger.info("Removing no longer available peer nodes {}", toShutdown);
                int i = 0;
                while (i < newNodeList.size()) {
                    PeerEurekaNode eurekaNode = newNodeList.get(i);
                    if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                        newNodeList.remove(i);
                        eurekaNode.shutDown();
                    } else {
                        i++;
                    }
                }
            }
    
            // Add new peers
            // 创建新的PeerEurekaNode节点,这里createPeerEurekaNode会使用RefreshablePeerEurekaNodes实现类的方法,
            //并在PeerEurekaNode创建了batchingDispatcher和nonBatchingDispatcher
            if (!toAdd.isEmpty()) {
                logger.info("Adding new peer nodes {}", toAdd);
                for (String peerUrl : toAdd) {
                    newNodeList.add(createPeerEurekaNode(peerUrl));
                }
            }
    
            this.peerEurekaNodes = newNodeList;
            this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
        }
    
        /**
         * Shuts down all resources used for peer replication.
         */
        public void shutDown() {
            batchingDispatcher.shutdown();
            nonBatchingDispatcher.shutdown();
        }
    
        protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
            HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
            String targetHost = hostFromUrl(peerEurekaNodeUrl);
            if (targetHost == null) {
                targetHost = "host";
            }
            return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
        }
    
    1. 比较当前url的集合和之前集合的差异,来决定删除还是新建
    2. 新建会给每个service-url创建一个PeerEurekaNode对象,封装了这个node的各种信息。
    3. 删除会调用node.shutDown, 来关闭任务执行器。

    再回到EurekaServerContextinitialize方法,继续执行registry.init(peerEurekaNodes);

        @Override
        public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
            // 开启一个计数器
            this.numberOfReplicationsLastMin.start();
            this.peerEurekaNodes = peerEurekaNodes;
            // 初始化注册表的缓存
            initializedResponseCache();
            // 设置定时任务,用来更新自我保护的阈值
            scheduleRenewalThresholdUpdateTask();
            // 初始化远程region的Registry
            initRemoteRegionRegistry();
    
            try {
                // JMX监控
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
            }
        }
    

    这个方法主要完成eureka server的上下文关键功能的初始化。

    initializedResponseCache
    @Override
        public synchronized void initializedResponseCache() {
            if (responseCache == null) {
                responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
            }
        }
    
          // 只读cache
          private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
          // 读写cache
          private final LoadingCache<Key, Value> readWriteCacheMap;
    
        ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
            this.serverConfig = serverConfig;
            this.serverCodecs = serverCodecs;
            this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
            this.registry = registry;
            // 默认值30s
            long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
            // 初始化读写cache
            this.readWriteCacheMap =
                    CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                            // getResponseCacheAutoExpirationInSeconds 默认值180s
                            .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                            .removalListener(new RemovalListener<Key, Value>() {
                                @Override
                                public void onRemoval(RemovalNotification<Key, Value> notification) {
                                    Key removedKey = notification.getKey();
                                    if (removedKey.hasRegions()) {
                                        Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                        regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                    }
                                }
                            })
                            .build(new CacheLoader<Key, Value>() {
                                @Override
                                public Value load(Key key) throws Exception {
                                    if (key.hasRegions()) {
                                        Key cloneWithNoRegions = key.cloneWithoutRegions();
                                        regionSpecificKeys.put(cloneWithNoRegions, key);
                                    }
                                    Value value = generatePayload(key);
                                    return value;
                                }
                            });
            // 这里的shouldUseReadOnlyResponseCache 默认值是true
            if (shouldUseReadOnlyResponseCache) {
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    
            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
            }
        }
    

    ResponseCacheImpl 内部定义了两个内存级的cache,在构造方法中会初始化readWriteCacheMap,并默认开启只读缓存刷新任务。默认每隔30s,将读写cache中的数据同步到只读cache中,实现读写分离。

    scheduleRenewalThresholdUpdateTask

    开启一个schedule,用于更新续约的阈值,根据每分钟实际续约情况来决定server端是否开启自我保护。
    自我保护的目的是防止因为server端和client端的各类连接问题,导致大量有效client端被下线,从而影响到client之间本身的相互调用。

    /**
         * Schedule the task that updates <em>renewal threshold</em> periodically.
         * The renewal threshold would be used to determine if the renewals drop
         * dramatically because of network partition and to protect expiring too
         * many instances at a time.
         *
         */
        // renewalThresholdUpdateIntervalMs 默认值15分钟
        private void scheduleRenewalThresholdUpdateTask() {
            timer.schedule(new TimerTask() {@Override public void run() {
                                   updateRenewalThreshold(); }
                           }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                    serverConfig.getRenewalThresholdUpdateIntervalMs());
        }
    
        private void updateRenewalThreshold() {
            try {
                Applications apps = eurekaClient.getApplications();
                int count = 0;
                // 获取当前所有客户端的总实例数
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        if (this.isRegisterable(instance)) {
                            ++count;
                        }
                    }
                }
                synchronized (lock) {
                    // Update threshold only if the threshold is greater than the
                    // current expected threshold or if self preservation is disabled.
                    // renewalPercentThreshold 默认值0.85
                    // 如果当前总实例数 > 预计客户端总续约数 * 0.85 
                    // 或者自我保护处于关闭状态
                    // 就更新阈值
                    if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                            || (!this.isSelfPreservationModeEnabled())) {
                        this.expectedNumberOfClientsSendingRenews = count;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
            } catch (Throwable e) {
                logger.error("Cannot update renewal threshold", e);
            }
        }
    
        protected void updateRenewsPerMinThreshold() {
            this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                    * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                    * serverConfig.getRenewalPercentThreshold());
        }
    

    比较当前客户端的总实例数和(预计续约的总次数*0.85) ,以及自我保护当前的状态来决定是否更新阈值。
    更新阈值的公式:
    每分钟续约阈值 = 预计客户端续约数(客户端实例数) * 每分钟续约的次数 * 阈值百分比

    initRemoteRegionRegistry
    
        protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
    
        protected void initRemoteRegionRegistry() throws MalformedURLException {
            Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
            if (!remoteRegionUrlsWithName.isEmpty()) {
                allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
                int remoteRegionArrayIndex = 0;
                for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
                    RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
                            serverConfig,
                            clientConfig,
                            serverCodecs,
                            remoteRegionUrlWithName.getKey(),
                            new URL(remoteRegionUrlWithName.getValue()));
                    regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
                    allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
                }
            }
            logger.info("Finished initializing remote region registries. All known remote regions: {}",
                    (Object) allKnownRemoteRegions);
        }
    

    如果配置了远程region,则会将远程registry进行初始化。

    至此EurekaServerContext的初始化过程介绍完成。

    EurekaServerInitializerConfiguration

    EurekaServerAutoConfiguration中的主要bean都初始化完成后,可以看到在类上有个Import的注解,注入EurekaServerInitializerConfiguration进行eureka server的启动。

    public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { ... }
    

    跟Client端起始点一样,都是以SmartLifecycle作为入口。直接看start()方法的实现。

        @Override
        public void start() {
            new Thread(() -> {
                try {
                    // TODO: is this class even needed now?
                    eurekaServerBootstrap.contextInitialized(
                            EurekaServerInitializerConfiguration.this.servletContext);
                    log.info("Started Eureka Server");
    
                    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    EurekaServerInitializerConfiguration.this.running = true;
                    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                    // Help!
                    log.error("Could not initialize Eureka servlet context", ex);
                }
            }).start();
        }
    

    可以看到这个start完成如下几件事情:

    1. contextInitialized:通过启动类来完成上下文的初始化
    2. 发送EurekaRegistryAvailableEvent事件。
    3. 发送EurekaServerStartedEvent 事件。
    eurekaServerBootstrap.
        public void contextInitialized(ServletContext context) {
            try {
                initEurekaEnvironment();
                initEurekaServerContext();
    
                context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
            }
            catch (Throwable e) {
                log.error("Cannot bootstrap eureka server :", e);
                throw new RuntimeException("Cannot bootstrap eureka server :", e);
            }
        }
    

    只有两个方法,分别是initEurekaEnvironment,initEurekaServerContext, 具体展开。

    initEurekaEnvironment
        protected void initEurekaEnvironment() throws Exception {
            log.info("Setting the eureka configuration..");
    
            // 配置数据中心
            String dataCenter = ConfigurationManager.getConfigInstance()
                    .getString(EUREKA_DATACENTER);
            if (dataCenter == null) {
                log.info(
                        "Eureka data center value eureka.datacenter is not set, defaulting to default");
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
            }
            else {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
            }
            // 指定读取配置环境
            String environment = ConfigurationManager.getConfigInstance()
                    .getString(EUREKA_ENVIRONMENT);
            if (environment == null) {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
                log.info(
                        "Eureka environment value eureka.environment is not set, defaulting to test");
            }
            else {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
            }
        }
    

    设置数据中心和部署环境,默认是default和test

    initEurekaServerContext
        protected void initEurekaServerContext() throws Exception {
            // For backward compatibility
            JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                    XStream.PRIORITY_VERY_HIGH);
            XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                    XStream.PRIORITY_VERY_HIGH);
    
            if (isAws(this.applicationInfoManager.getInfo())) {
                this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                        this.eurekaClientConfig, this.registry, this.applicationInfoManager);
                this.awsBinder.start();
            }
            // 设置一个可以用于获取上下文的Holder
            EurekaServerContextHolder.initialize(this.serverContext);
    
            log.info("Initialized server context");
    
            // Copy registry from neighboring eureka node
            int registryCount = this.registry.syncUp();
            this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
            // Register all monitoring statistics.
            EurekaMonitors.registerAllStats();
        }
    

    这个方法主要完成两件事,通过syncUp将注册表的信息同步到其他的eureka server节点上,openForTraffic刷新自我保护阈值,并开启驱逐任务。

    syncUp
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
    
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    // 重试逻辑等待时间,一旦有一个server同步成功,后续就不再重试
                    try {
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    通过eurekaClient.getApplications()获取到client的注册表,遍历每个实例,并判断是否可在当前region中注册,满足则发起register

    InstanceRegistry.register
        // org.springframework.cloud.netflix.eureka.server.InstanceRegistry
        @Override
        public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
            handleRegistration(info, leaseDuration, isReplication);
            super.register(info, leaseDuration, isReplication);
        }
    
        private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
            log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
            publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
        }
    

    先由子类InstanceRegistry发送EurekaInstanceRegisteredEvent事件,再由父类完成真正的register逻辑。

    AbstractInstanceRegistry.register
        //com.netflix.eureka.registry.AbstractInstanceRegistry
        /**
         * Registers a new instance with a given duration.
         *
         * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
         */
        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                read.lock();
                // registry缓存了实例注册的信息,注册实现本质上就是将实例信息添加到register属性中。
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
                // 计数器+1
                REGISTER.increment(isReplication);
                //初始化gMap
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
                // Retain the last dirty timestamp without overwriting it, if there is already a lease
                if (existingLease != null && (existingLease.getHolder() != null)) {
                    // 如果之前有缓存续约信息,比较两个对象的时间差
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                    // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                    // InstanceInfo instead of the server local copy.
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        // 如果本次注册是更老的实例信息,就还使用上次缓存的对象
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();
                    }
                } else {
                    // The lease does not exist and hence it is a new registration
                    // 如果本次是第一次注册,expectedNumberOfClientsSendingRenews+1,并更新自我保护阈值
                    synchronized (lock) {
                        if (this.expectedNumberOfClientsSendingRenews > 0) {
                            // Since the client wants to register it, increase the number of clients sending renews
                            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                            updateRenewsPerMinThreshold();
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                if (existingLease != null) {
                    // 使用之前的服务开启时间
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                gMap.put(registrant.getId(), lease);
                synchronized (recentRegisteredQueue) {
                    // recentRegisteredQueue是一个记录注册操作的队列,key是注册时间,value是客户端实例id,主要用于debug或统计使用
                    recentRegisteredQueue.add(new Pair<Long, String>(
                            System.currentTimeMillis(),
                            registrant.getAppName() + "(" + registrant.getId() + ")"));
                }
                // This is where the initial state transfer of overridden status happens
                if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                    if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                        overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    registrant.setOverriddenStatus(overriddenStatusFromMap);
                }
    
                // Set the status based on the overridden status rules
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
                registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // If the lease is registered with UP status, set lease service up timestamp
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                    lease.serviceUp();
                }
                registrant.setActionType(ActionType.ADDED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                registrant.setLastUpdatedTimestamp();
                invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }
    

    client端注册到server端的核心实现逻辑:
    三个参数分别是注册的实例信息、续约周期、是否是其他server的复制(true表示复制,false表示client发起的注册)
    首先从register属性中根据appName来获取之前的续约信息,如果是第一次注册,则expectedNumberOfClientsSendingRenews加1;否则比较一下参数和缓存的instanceInfo哪个是最新的,按照最新的进行后续操作。

    创建一个新的lease对象,保存到gMap中,并添加到recentRegisteredQueue中,(注册队列)用于debugging或者统计。

    根据overridden status规则来设置registrant的status,是否接受流量

    更新各种registrant的属性,将本次变更加入到recentlyChangedQueue,失效readWriteCache

    public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
            for (Key.KeyType type : Key.KeyType.values()) {
                for (Version v : Version.values()) {
                    invalidate(
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                            new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                    );
                    if (null != vipAddress) {
                        invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                    }
                    if (null != secureVipAddress) {
                        invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
                    }
                }
            }
        }
    

    分别失效appName自身的cache和全局cache和增量cache。

    注册完成之后,回到EurekaServerBootstrapinitEurekaServerContext,调用openForTraffic

    openForTraffic
        @Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
            this.expectedNumberOfClientsSendingRenews = count;
            updateRenewsPerMinThreshold();
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            super.postInit();
        }
    

    这个方法先重新计算了自我保护的阈值,设置实例状态为UP。

    调用父类的postInit()完成驱逐任务的开启。

    protected void postInit() {
            renewsLastMin.start();
            if (evictionTaskRef.get() != null) {
                evictionTaskRef.get().cancel();
            }
            evictionTaskRef.set(new EvictionTask());
            evictionTimer.schedule(evictionTaskRef.get(),
                    serverConfig.getEvictionIntervalTimerInMs(),
                    serverConfig.getEvictionIntervalTimerInMs());  // 默认值60s
        }
    
    class EvictionTask extends TimerTask {
    
            private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
            @Override
            public void run() {
                try {
                    long compensationTimeMs = getCompensationTimeMs();
                    logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                    evict(compensationTimeMs);
                } catch (Throwable e) {
                    logger.error("Could not run the evict task", e);
                }
            }
    
    }
    

    驱逐的具体分析请参见Spring Cloud Eureka 源码分析 — Server端(服务治理)

    至此,server端完整启动流程分析完毕。

    相关文章

      网友评论

          本文标题:Spring Cloud Eureka 源码分析 — Serve

          本文链接:https://www.haomeiwen.com/subject/txeyaktx.html