美文网首页程序员
SpringCloud 服务注册与发现 源码分析(一)

SpringCloud 服务注册与发现 源码分析(一)

作者: 凡毓不凡 | 来源:发表于2020-06-16 16:00 被阅读0次
    • SpringCloud 版本Hoxton.SR1
    • SpringBoot 版本2.2.1.RELEASE
    • 本文适用于对SpringBoot有一定基础得人,主要讲解Eureka 服务端得相关底层实现,讲解方式:场景驱动
    • 关键词 :服务端源码解析

    SpringCloud 服务端与客户端简单搭建 Eureka 服务端与客户端得简单使用
    SpringCloud 注解之@EnableEurekaServer与@EnableEurekaClient原理 我们对这两个注解的使用及原理做了分析。
    本篇与接下来的连载将会基于服务注册于发现源码层面全面解析

    1. Eureka Server

    首先 spring-cloud-starter-netflix-eureka-server 依赖下的spring.factories中只有一个自动装配类:

    image.png 我们来看一下此类:
    • 类的相关注解部分
      image.png
    1. @Import(EurekaServerInitializerConfiguration.class) : 自动装配该类时会创建 EurekaServerInitializerConfiguration的bean,看名字作用是服务初始化的相关操作。另外该类实现了ServletContextAware与SmartLifecycle接口
      image.png ,其中一个重要得地方是生命周期得start()启动方法 image.png ,看到这行代码eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext),是不是感觉似曾相识,没错就是 ServletContextListener接口得default方法,然后我们猜测这个应该是跟 tomcat容器中运行得servlet组件有关系,但是我们都知道boot依赖得是内置得tomcat容器,而且容器得相关操作是在boot得上下文中进行得,可以看一下我之前得一篇文章 SpringBoot启动 源码深度解析(四),放在此地是何意? 仔细搜寻发现有一个叫 EurekaBootStrap得类实现了ServletContextListener接口,此类是在Eureka中提供得而且EurekaServerBootstrap中得contextInitialized方法实现EurekaBootStrap得contextInitialized方法实现几乎如出一辙此时我们可以猜到在Eureka上下文以及环境得初始化与传统得spring运行在tomcat中得方式是一样得,而此处boot照搬了Eureka得初始化代码,通过SmartLifecycle拉起Eureka Server
    2. @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) :只有存在EurekaServerMarkerConfiguration.Marker 类型的bean才会自动装配当前类。
    3. @EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class }) : 发现是@EnableXXX开头的,根据作者之前分析的技巧,查看注解源码: image.png
      关于@EnableConfigurationProperties的原理,感兴趣的可以看一下作者之前的文章 SpringBoot配置属性绑定源码分析
      👍此处可以得知 @EnableConfigurationProperties 可以创建指定的bean定义,但是导入类要带有@ConfigurationProperties注解才可以👍。以当前导入的EurekaDashboardProperties为例: image.png ,配置的属性 eureka.dashboard.path的值会映射到当前类中的path成员变量上,eureka.dashboard.enabled的值会映射到enabled成员变量上。
    • 类的成员变量
      成员变量
    1. ApplicationInfoManager(原生得Eureka类):用来管理实例信息以及Eureka实例配置(在EurekaClientAutoConfiguration中创建,后面说到客户端的时候会说明
    2. EurekaServerConfig(原生得Eureka接口):用来存储Eureka中得服务配置信息,netflix的默认实现类是DefaultEurekaServerConfig, 对应springcloud中得实现为 EurekaServerConfigBean
    3. EurekaClientConfig(原生得Eureka接口):用来存储客户端中得配置信息, 对应springcloud中得实现为 EurekaClientConfigBean(在EurekaClientAutoConfiguration中创建,后面说到客户端的时候会说明)
    4. EurekaClient(原生得Eureka接口):定义了客户端发现得一系列接口协议:① 包含获取实例信息得接口 ② 包含元数据获取得接口 ③ 监控检查得相关方法 ④ Eureka客户端配置信息获取 ⑤ ApplicationInfoManager 单例对象获取对应springcloud中得实现是 CloudEurekaClient(在EurekaClientAutoConfiguration中创建,后面说到客户端的时候会说明)
    5. InstanceRegistryProperties:@EnableConfigurationProperties方式创建的实例配置信息
    • EurekaServerAutoConfiguration自动装配类 创建Bean
    1. 👍创建InstanceRegistry 实例Bean( 在注册实例时会考虑集群情况下其它Node相关操作的注册器)
        @Bean
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
                ServerCodecs serverCodecs) {
            // 强制初始化     
            this.eurekaClient.getApplications(); // force initialization
            // 创建InstanceRegistry(是spring cloud的实现),
            // 继承了PeerAwareInstanceRegistryImpl(Eureka得实现),
            // PeerAwareInstanceRegistry接口的实现类
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                    serverCodecs, this.eurekaClient,
                    this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    
    1. 👍创建 PeerEurekaNodes(用来管理PeerEurekaNode的生命周期辅助类), 实际创建子类 RefreshablePeerEurekaNodes,该类是当前自动装配类得内部静态类,另外该类还实现了ApplicationListener监听器,监听EnvironmentChangeEvent事件, 触发Eureka节点得更新
        @Bean
        @ConditionalOnMissingBean
        public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
                ServerCodecs serverCodecs,
                ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
            return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                    this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
                    replicationClientAdditionalFilters);
        }
    
    
    子类RefreshablePeerEurekaNodes中得方法 父类PeerEurekaNodes中得方法

    createPeerEurekaNode方法 :覆写了父类 PeerEurekaNodes,与父类中得实现唯一区别是添加了一个自定义过滤器 ReplicationClientAdditionalFilters
    shouldUpdate方法:当监听到EnvironmentChangeEvent改变事件时,判断是否需要更新,依据是环境资源中是否包含 eureka.client.regioneureka.client.service-urleureka.client.availability-zones. 这些键,若包含则需要更新。
    resolvePeerUrls()方法:返回不包含自己得可用区的服务端url。
    updatePeerEurekaNodes方法,继承自父类,首先从原先得Eureka Server 列表中移除新增得List列表,目的是找出不可用得服务列表,再从新增得列表中移除原先得(peerEurekaNodeUrls 变量中缓存得),目的是找出实际新增得服务列表,紧接着判断若两个集合是否为空,都为空则不更新;若shutdown列表不为空,找到shutdown得节点,调用PeerEurekaNode(具体得node)得shutdown方法关闭副本资源,若新增得服务列表不为空,则调用createPeerEurekaNode方法(前面提到得得方法,被子类覆写)创建新增得 PeerEurekaNode节点,最终将Eureka Server 节点保存在成员变量 this.peerEurekaNodes中;将Eureka Server 节点url保存到成员变量this.peerEurekaNodeUrls中。

    1. 👍👍创建 EurekaServerConfig 实例(Eureka中得类),创建动作交给EurekaServerAutoConfiguration 类得内部类 EurekaServerConfigBeanConfiguration 来完成,实现为 EurekaServerConfigBean( springcloud 中 Eureka Server得核心类),包括一系列得服务端参数配置,服务注册与续约等相关功能都会根据此类中得参数配置进行处理,示例: application.properties
        @Configuration(proxyBeanMethods = false)
        protected static class EurekaServerConfigBeanConfiguration {
    
            @Bean
            @ConditionalOnMissingBean
            public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
                EurekaServerConfigBean server = new EurekaServerConfigBean();
                if (clientConfig.shouldRegisterWithEureka()) { 
                    // Set a sensible default if we are supposed to replicate
                    server.setRegistrySyncRetries(5);
                }
                return server;
            }
    
        }
    

    我们发现registerWithEureka参数是从 EurekaClientConfigBean(客户端参数配置类,服务端是一个特殊得客户端,在EurekaClientAutoConfiguration中创建,后面说到客户端的时候会说明) 中获取得,若为false,那么重试次数就为0,默认该参数为 true,重试次数为5

    1. 👍👍创建 EurekaServerContext,Eureka Server上下文,实际创建得是默认实现 DefaultEurekaServerContext类,可以看到将上面得众多参数构造器注入进去,其中就有 PeerEurekaNodes和前面创建得相关Bean实例,此处我们重点分析。
        @Bean
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
                PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                    registry, peerEurekaNodes, this.applicationInfoManager);
        }
    

    进入到 DefaultEurekaServerContext 中,源码如下:

    /**
     * Represent the local server context and exposes getters to components of the
     * local server such as the registry.
     *
     * @author David Liu
     */
    @Singleton
    public class DefaultEurekaServerContext implements EurekaServerContext {
        private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
    
        private final EurekaServerConfig serverConfig;
        private final ServerCodecs serverCodecs;
        private final PeerAwareInstanceRegistry registry;
        private final PeerEurekaNodes peerEurekaNodes;
        private final ApplicationInfoManager applicationInfoManager;
    
        @Inject
        public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                                   ServerCodecs serverCodecs,
                                   PeerAwareInstanceRegistry registry,
                                   PeerEurekaNodes peerEurekaNodes,
                                   ApplicationInfoManager applicationInfoManager) {
            this.serverConfig = serverConfig;
            this.serverCodecs = serverCodecs;
            this.registry = registry;
            this.peerEurekaNodes = peerEurekaNodes;
            this.applicationInfoManager = applicationInfoManager;
        }
    
        @PostConstruct
        @Override
        public void initialize() {
            logger.info("Initializing ...");
            peerEurekaNodes.start();
            try {
                registry.init(peerEurekaNodes);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            logger.info("Initialized");
        }
    
        @PreDestroy
        @Override
        public void shutdown() {
            logger.info("Shutting down ...");
            registry.shutdown();
            peerEurekaNodes.shutdown();
            logger.info("Shut down");
        }
        @Override
        public EurekaServerConfig getServerConfig() {return serverConfig;}
    
        @Override
        public PeerEurekaNodes getPeerEurekaNodes() {return peerEurekaNodes;}
            
        @Override
        public ServerCodecs getServerCodecs() {return serverCodecs;}
            
        @Override
        public PeerAwareInstanceRegistry getRegistry() { return registry;}
    
        @Override
        public ApplicationInfoManager getApplicationInfoManager() {return applicationInfoManager;}
     
    }
    

    initialize()方法:可以看到使用@PostConstruct修饰,表示实例化之后初始化之前 Spring Bean 创建得生命周期,会调用此方法,(1)首先调用 Eureka Server 节点得start()方法:

    peerEurekaNodes.start() , 方法中会创建单个线程得线程池,线程名叫做Eureka-PeerNodesUpdater,然后调用上述提到得 updatePeerEurekaNodes(List<String> newPeerUrls)方法 立即更新一次Eureka Server得节点,然后每隔10分钟定时更新服务列表。(前面还说过监听到事件也可能会更新服务列表)(2)然后通过实例注册器(InstanceRegistry,继承了 PeerAwareInstanceRegistryImpl)调用 registry.init(peerEurekaNodes) 初始化服务节点: PeerAwareInstanceRegistryImpl中得init方法
    this.numberOfReplicationsLastMin.start()会启动一个Timer定时器每隔一分钟记录复制的续约数
    ;赋值成员变量 peerEurekaNodes;👍initializedResponseCache()方法会初始化ResponseCache(缓存得是服务列表)
    // 只读的服务列表
    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
    // Guava 缓存,读写服务列表
    private final LoadingCache<Key, Value>  readWriteCacheMap;
    
    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        // 根据配置eureka.server.useReadOnlyResponseCache判断,是否使用只读ResponseCache,默认true
        // 由于ResponseCache维护这一个可读可写的readWriteCacheMap,还有一个只读的readOnlyCacheMap
        // 此配置控制在get()应用数据时,是去只读Map读,还是读写Map读
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
    
        // eureka.server.responseCacheUpdateIntervalMs缓存更新频率,默认30s
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        
        // 创建Cache,com.google.common.cache.LoadingCache
        // 可以设置初始值(默认1000),数据写入过期时间(默认180秒),过期清理等
        this.readWriteCacheMap =  CacheBuilder.newBuilder()
                        .initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
    
        // 如果启用只读缓存(默认开启),那么每隔responseCacheUpdateIntervalMs=30s,执行缓存更新
        // (从两个缓存中取出值进行对比,若不相等则更新只读缓存)
        if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    } 
    

    ResponseCache 得作用是保存服务列表信息,用于客户端查询。实现上内部维护了两个Map,一个可读可写的readWriteCacheMap,每个操作都会写入,一个只读的readOnlyCacheMap,默认每30s更新一次, 其中缓存以压缩和非压缩形式维护,用于三类请求: all applications,增量更改和单个application,👍例如在ApplicationResource(Eureka暴漏得服务实例接口)中:

    ApplicationResource.getApplication方法 此处查询时,首先去缓存中查找,若 this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(),默认为true,表示查询的时候从readOnlyCacheMap中查找,否则从readWriteCacheMap中查找,而readWriteCacheMap中得数据会每隔30秒同步到readOnlyCacheMap中。

    然后成员变量 timer,名称为 ReplicaAwareInstanceRegistry - RenewalThresholdUpdater 得Time定时器 每隔15分钟调用 updateRenewalThreshold方法 更新续约临界值

        private void scheduleRenewalThresholdUpdateTask() {
            timer.schedule(new TimerTask() {
                               @Override
                               public void run() {
                                   updateRenewalThreshold(); // 更新续约临界值
                               }
                           }, serverConfig.getRenewalThresholdUpdateIntervalMs(),// 15 min
                    serverConfig.getRenewalThresholdUpdateIntervalMs()); // 15 min
        }
        private void updateRenewalThreshold() {
            try {
                Applications apps = eurekaClient.getApplications();
                int count = 0;
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        if (this.isRegisterable(instance)) {
                            ++count; //  记录实例总数
                        }
                    }
                }
                synchronized (lock) {
                    //(1)只有当临界值比 (期望续约得数量 乘以 续约临界值百分比默认 0.85)大,才更新,即:服务实例数大于85%才会去更新,小于85%不去更新续约服务
                    //(2)不启用自我保护机制(默认开启)
                    if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                            || (!this.isSelfPreservationModeEnabled())) {
                        this.expectedNumberOfClientsSendingRenews = count;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
            } catch (Throwable e) {
                logger.error("Cannot update renewal threshold", e);
            }
        }
    

    首先获取所有Instance实例个数,默认每个实例30s续约一次(1)如果开启自我保护模式,更新 expectedNumberOfRenewsPerMin预期每分钟续约数 和 numberOfRenewsPerMinThreshold每分钟续约阈值 (2)如果没有开启自我保护模式,只有当本期续约数大于之前的阈值,即当前不处在自我保护模式中(自我保护模式中,不能删除服务列表,阈值自然也不能更新),才可以更新 expectedNumberOfRenewsPerMin 和 numberOfRenewsPerMinThreshold
    最后调用initRemoteRegionRegistry()方法初始化远程Region注册器(若当前Region对应得实例宕机,可以拉取其他Region下面得实例进行调用)
    shutdown方法:标有 @PreDestroy注解,Bean销毁之前调用,用以关闭Eureka Server 节点以及一系统得清除重置操作。

    1. 创建 EurekaServerBootstrap( springcloud得类 )
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
                EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager,
                    this.eurekaClientConfig, this.eurekaServerConfig, registry,
                    serverContext);
        }
    

    上面已经大致分析了EurekaServerBootstrap类得作用,此处中重点分析一些类中得方法实现细节:

    EurekaServerBootstrap
    contextInitialized(ServletContext context)方法,spring容器启动之后会回调生命周期中得start启动方法,拉起 contextInitialized方法初始化Eureka Server。首先调用 initEurekaEnvironment方法 初始化Eureka Server得环境 ( 此处配置就是Netflix Archaius,都是基于Apache Commons 抽象 Configuration来实现得),然后调用initEurekaServerContext 方法初始化Eureka Server上下文,并将上下文缓存到 EurekaServerContextHolder中。 image.png
    有两处重要的逻辑:
    (1) int registryCount = this.registry.syncUp() 表示从相邻的eureka节点拷贝注册列表信息
        @Override
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
            // 此处重试次数为5
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                // 通过 eurekaClient(根据上面的简介可以知道此处是CloudEurekaClient实现) 获取全部服务
                Applications apps = eurekaClient.getApplications();
                for (Application app : apps.getRegisteredApplications()) {
                    // 循环获取实例列表(注册的时候多个服务实例可以用同一个应用名)
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            // 判断是否是可以注册得:正常情况下此方法一定返回 true
                            if (isRegisterable(instance)) {
                                // 注册实例 ( 注册逻辑得具体实现,在AbstractInstanceRegistry类中
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);                    
                                // 统计注册数(后续续约的时候会用到)
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    判断如果可注册,然后接着执行com.netflix.eureka.registry.AbstractInstanceRegistry#register方法执行具体得注册逻辑,可以看出此方法是在抽象父类AbstractInstanceRegistry中,但是InstanceRegistry(springcloud得bean,并且继承了PeerAwareInstanceRegistryImpl类)和PeerAwareInstanceRegistryImpl(Eureka中得注册实例对实现)都覆写了该方法:

    InstanceRegistry得register方法实现 PeerAwareInstanceRegistryImpl中得registry方法
    依次分析,可以看到此处handleRegistration方法中发布了一个EurekaInstanceRegisteredEvent事件,此事件对象会包含InstanceInfo(注册得当前实例信息)、leaseDuration(租约期限)、isReplication(是否是复制来得)另外事件源source就是当前InstanceRegistry对象本身。所以,我们这里可以进行扩展(比如:我们可以监听该事件,从而获得当前注册实例,进行自定义的逻辑处理)。然后调用父类PeerAwareInstanceRegistryImpl得register方法,红框中得方法重点分析首先判断若当前实例中若没有租约时间则使用默认得租约时间90秒,然后再调用抽象父类AbstractInstanceRegistry得register方法 :
        /**
         * Registers a new instance with a given duration.
         *
         * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
         */
        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
            try {
                // 获取并发读写锁。
                read.lock();
                // 从缓存中,根据应用名称查看是否包含当前实例得缓存注册信息
                Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
                // 统计计数器+1
                REGISTER.increment(isReplication);
                // 若缓存中不存在当前应用实例信息,创建一个空得CurrentHashMap对象,向缓存中添加这个CurrentHashMap对象
                if (gMap == null) {
                    final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                    gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                    if (gMap == null) {
                        gMap = gNewMap;
                    }
                }
                // 获取实例的租约信息
                Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
                // 表示当前实例已经有过租约了
                if (existingLease != null && (existingLease.getHolder() != null)) {
                    Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                    Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                    logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
    
                    // 已经有过租约得时间戳比当前要注册得时间戳大,
                    // 那么将替换掉要注册得实例为已经有过租约得实例
                    if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                        logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                                " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                        logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                        registrant = existingLease.getHolder();
                    }
                } else {
                    // 表示新注册得实例不存在租约
                    synchronized (lock) {
                        // 已经有过其他服务实例得租约信息了
                        if (this.expectedNumberOfClientsSendingRenews > 0) {
                            // 将旧的客户端要发送得续约数量得数值加1,默认数值是1;
                            // 即第一次注册的时候,该变量数值为2
                            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                            // 重新计算每分钟续约得临界值,
                            // 默认情况下是                              
                            // this.expectedNumberOfClientsSendingRenews *(60/30) *  0.85
                            updateRenewsPerMinThreshold();
                        }
                    }
                    logger.debug("No previous lease information found; it is new registration");
                }
                // 重新创建租约信息
                Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                // 若旧的租约信息存在,将旧的租约服务启动时间复制给新创建得租约信息
                if (existingLease != null) {
                    lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                }
                gMap.put(registrant.getId(), lease);
                synchronized (recentRegisteredQueue) {
                    recentRegisteredQueue.add(new Pair<Long, String>(
                            System.currentTimeMillis(),
                            registrant.getAppName() + "(" + registrant.getId() + ")"));
                }
                //  如果当前实例已经维护了OverriddenStatus,将其也放到此Eureka Server          
                //  的overriddenInstanceStatusMap中
                if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                    logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                    + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                    if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                        logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                        overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                    }
                }
                InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
                if (overriddenStatusFromMap != null) {
                    logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                    registrant.setOverriddenStatus(overriddenStatusFromMap);
                }
    
                // 根据实例得overridden状态规则,设置状态
                InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
                registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
                // 如果注册得租约状态为UP(准备接收请求状态),设置租约服务时间戳为当前时间
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                    lease.serviceUp();
                }
                registrant.setActionType(ActionType.ADDED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                registrant.setLastUpdatedTimestamp();
                //  使当前应用的ResponseCache缓存失效
                invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
                logger.info("Registered instance {}/{} with status {} (replication={})",
                        registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
            } finally {
                read.unlock();
            }
        }
    

    调用完父类的注册方法之后,在PeerAwareInstanceRegistryImpl中接着执行replicateToPeers方法:将实例信息复制到集群中其它节点

        private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                // 若是复制来的实例,则将记录的最后一分钟的副本数自增1
                if (isReplication) {
                    numberOfReplicationsLastMin.increment();
                }
                // 一个对象为什么跟一个集合用等号判断 ?  
                // (由于启动时调用的注册传入标志为true,所以会结束流程)
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }
                // 遍历所有得服务节点
                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // 跳过自身得服务url
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    // 将对应得行为(注册、下线、续约、心跳、状态更新)复制到其他节点  
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
    

    (2) registry.openForTraffic(this.applicationInfoManager, regisretryCount): 允许与客户端的数据传输。其中registryCount为 syncUp方法调用得返回值:可注册的服务实例数(为0表示单机)。通过先调用子类InstanceRegistry,若此时的regisretryCount为0,则取默认值1,然后调用父类的openForTraffic的方法:

        @Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // 设置续约数为注册的数量,默认每30秒续约一次
            this.expectedNumberOfClientsSendingRenews = count;
            // 更新每分钟续约临界值(前面分析过,就是:续约数 * 每分钟次数(2次) * 临界值百分比(0.85))
            updateRenewsPerMinThreshold();
            logger.info("Got {} instances from neighboring DS node", count);
            logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            // 设置实例信息状态为UP上线,并向监听器StatusChangeListener的实例发送StatusChangeEvent事件,
            // 告知实例状态改变,当然这些监听器取决于clientConfig客户端配置的onDemandUpdateStatusChange
            // 参数,为true的话,会添加监听器用于监听状态改变,否则不通知。
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            super.postInit();
        }
    
    1. 👍👍PeerEurekaNode( Eureka得原生类,真实的服务端节点,会保存服务端的url、服务配置信息、服务端注册器、目标主机、httpClient、批量任务转发器、非批量任务转发器):前面分析过 PeerEurekaNodes#updatePeerEurekaNodes方法时判断出url属于新增的话,则会创建PeerEurekaNode节点。而PeerEurekaNode构造器里面会初始化一系列的成员:
    public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
            // 最大批次请求250个,最大批量任务转发等待时间为500毫秒
            // 请求异常情况下会隔100毫秒重试,服务不可用等待1000毫秒
            this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
        }
    
      PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                         HttpReplicationClient replicationClient, EurekaServerConfig config,
                                         int batchSize, long maxBatchingDelayMs,
                                         long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
            this.registry = registry;
            this.targetHost = targetHost;
            this.replicationClient = replicationClient;
    
            this.serviceUrl = serviceUrl;
            this.config = config;
            this.maxProcessingDelayMs = config.getMaxTimeForReplication();
    
            String batcherName = getBatcherName();
            // 复制任务处理器
            ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
            // 批量任务转发器
            this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
                    batcherName,
                    config.getMaxElementsInPeerReplicationPool(),
                    batchSize,
                    config.getMaxThreadsForPeerReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
            // 单任务转发器
            this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
                    targetHost,
                    config.getMaxElementsInStatusReplicationPool(),
                    config.getMaxThreadsForStatusReplication(),
                    maxBatchingDelayMs,
                    serverUnavailableSleepTimeMs,
                    retrySleepTimeMs,
                    taskProcessor
            );
        }
    // 任务转发器工具类,用来创建TaskDispatcher,想象成Executors线程池工具类就行
    public class TaskDispatchers {
       // 创建批量任务转发器
        public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
                                                                                 int maxBufferSize,
                                                                                 int workloadSize,
                                                                                 int workerCount,
                                                                                 long maxBatchingDelay,
                                                                                 long congestionRetryDelayMs,
                                                                                 long networkFailureRetryMs,
                                                                                 TaskProcessor<T> taskProcessor){
            
            // 创建Acceptor执行器,构造器中会创建线程组并分配AcceptorThread
            // 然后执行AcceptorRunner任务
            final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
                    id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
            );
            // 创建TaskExecutors并启动工作线程WorkerRunnable(后面详细看工作线程如何工作的)
            final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
            return new TaskDispatcher<ID, T>() {
                @Override
                // 任务转发器的处理方法,批量任务转发器调用统一入口
                public void process(ID id, T task, long expiryTime) {
                    // 将任务添加到AcceptorExecutor类中的acceptorQueue队列里
                    // 并将接收任务数+1
                    acceptorExecutor.process(id, task, expiryTime);
                }
    
                @Override
                public void shutdown() {
                    // 将AcceptorExecutor类中的isShutdown状态设置为true
                    // 并中断AcceptorThread线程
                    acceptorExecutor.shutdown();
                    // 将TaskExecutors类中的isShutdown状态也设置为true
                    // 并循环中断所有的工作线程
                    taskExecutor.shutdown();
                }
            };
        }
    }
    // Acceptor线程的执行器
    class AcceptorExecutor<ID, T> {
        AcceptorExecutor(String id,
                         int maxBufferSize,
                         int maxBatchingSize,
                         long maxBatchingDelay,
                         long congestionRetryDelayMs,
                         long networkFailureRetryMs) {
            this.maxBufferSize = maxBufferSize;
            this.maxBatchingSize = maxBatchingSize;
            this.maxBatchingDelay = maxBatchingDelay;
            this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
            // 创建名为eurekaTaskExecutors的线程组 
            ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
            // 创建线程并指定线程组,接收AcceptorRunner任务
            // id:target_主机名;所以线程名称:TaskAcceptor-target_主机名
            this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
            this.acceptorThread.setDaemon(true);
            // 然后启动线程
            this.acceptorThread.start();
            // 记录指标
            final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
            final StatsConfig statsConfig = new StatsConfig.Builder()
                    .withSampleSize(1000)
                    .withPercentiles(percentiles)
                    .withPublishStdDev(true)
                    .build();
            final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
            this.batchSizeMetric = new StatsTimer(config, statsConfig);
            try {
                Monitors.registerObject(id, this);
            } catch (Throwable e) {
                logger.warn("Cannot register servo monitor for this object", e);
            }
        }
    }
    // Acceptor的任务类型
    class AcceptorRunner implements Runnable {
            @Override
            public void run() {
                long scheduleTime = 0;
                // 任务没有关闭
                while (!isShutdown.get()) {
                    try {
                        // 从队列中取出所有的任务并执行
                        drainInputQueues();
    
                        int totalItems = processingOrder.size();
    
                        long now = System.currentTimeMillis();
                        if (scheduleTime < now) {
                            scheduleTime = now + trafficShaper.transmissionDelay();
                        }
                        if (scheduleTime <= now) {
                            assignBatchWork();
                            assignSingleItemWork();
                        }
    
                        // If no worker is requesting data or there is a delay injected by the traffic shaper,
                        // sleep for some time to avoid tight loop.
                        if (totalItems == processingOrder.size()) {
                            Thread.sleep(10);
                        }
                    } catch (InterruptedException ex) {
                        // Ignore
                    } catch (Throwable e) {
                        // Safe-guard, so we never exit this loop in an uncontrolled way.
                        logger.warn("Discovery AcceptorThread error", e);
                    }
                }
            }
    
            private boolean isFull() {
                return pendingTasks.size() >= maxBufferSize;
            }
    
            private void drainInputQueues() throws InterruptedException {
                do {
                    // 从重试队列队尾循环取出任务,判断任务改怎么处理
                    drainReprocessQueue();
                    // 当调用PeerEurekaNode中创建的批量任务转发器 batchingDispatcher的                              
                    // process方法,会将任务放入到AcceptorExecutor类中的acceptorQueue  
                    // 队列里
                    // 此处是取出acceptorQueue队列中得任务,处理方式跟上述差不多
                    drainAcceptorQueue();
                    
                    if (!isShutdown.get()) {
                        // If all queues are empty, block for a while on the acceptor queue
                        if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                            TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                            if (taskHolder != null) {
                                appendTaskHolder(taskHolder);
                            }
                        }
                    }
                } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
            }
            
            private void drainAcceptorQueue() {
                while (!acceptorQueue.isEmpty()) {
                    appendTaskHolder(acceptorQueue.poll());
                }
            }
    
            private void drainReprocessQueue() {
                long now = System.currentTimeMillis();
                while (!reprocessQueue.isEmpty() && !isFull()) {
                    TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
                    ID id = taskHolder.getId();
                    if (taskHolder.getExpiryTime() <= now) {
                        expiredTasks++;
                    } else if (pendingTasks.containsKey(id)) {
                        overriddenTasks++;
                    } else {
                        pendingTasks.put(id, taskHolder);
                        processingOrder.addFirst(id);
                    }
                }
                if (isFull()) {
                    queueOverflows += reprocessQueue.size();
                    reprocessQueue.clear();
                }
            }
            
            private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
                if (isFull()) {
                    pendingTasks.remove(processingOrder.poll());
                    queueOverflows++;
                }
                TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
                if (previousTask == null) {
                    processingOrder.add(taskHolder.getId());
                } else {
                    overriddenTasks++;
                }
            }
    
            void assignSingleItemWork() {
                if (!processingOrder.isEmpty()) {
                    if (singleItemWorkRequests.tryAcquire(1)) {
                        long now = System.currentTimeMillis();
                        while (!processingOrder.isEmpty()) {
                            ID id = processingOrder.poll();
                            TaskHolder<ID, T> holder = pendingTasks.remove(id);
                            if (holder.getExpiryTime() > now) {
                                singleItemWorkQueue.add(holder);
                                return;
                            }
                            expiredTasks++;
                        }
                        singleItemWorkRequests.release();
                    }
                }
            }
    
            void assignBatchWork() {
                if (hasEnoughTasksForNextBatch()) {
                    if (batchWorkRequests.tryAcquire(1)) {
                        long now = System.currentTimeMillis();
                        int len = Math.min(maxBatchingSize, processingOrder.size());
                        List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                        while (holders.size() < len && !processingOrder.isEmpty()) {
                            ID id = processingOrder.poll();
                            TaskHolder<ID, T> holder = pendingTasks.remove(id);
                            if (holder.getExpiryTime() > now) {
                                holders.add(holder);
                            } else {
                                expiredTasks++;
                            }
                        }
                        if (holders.isEmpty()) {
                            batchWorkRequests.release();
                        } else {
                            batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                            batchWorkQueue.add(holders);
                        }
                    }
                }
            }
    
            private boolean hasEnoughTasksForNextBatch() {
                if (processingOrder.isEmpty()) {
                    return false;
                }
                if (pendingTasks.size() >= maxBufferSize) {
                    return true;
                }
    
                TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
                long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
                return delay >= maxBatchingDelay;
            }
        }
    // 批量任务执行器工具类
    class TaskExecutors<ID, T> {
        // 下面的方法会最终调用构造器创建批量任务执行器工具类TaskExecutors对象
        TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
            this.isShutdown = isShutdown;
            this.workerThreads = new ArrayList<>();
            // 指定线程组名为:eurekaTaskExecutors
            ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
            // 此处的workerCount可在EurekaServerConfig中配置,默认工作线程20个
            for (int i = 0; i < workerCount; i++) {
                // 回调线程创建实现
                WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
                // 创建线程
                Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
                workerThreads.add(workerThread);
                workerThread.setDaemon(true);
                // 启动线程
                workerThread.start();
            }
        }
    
    
        // 创建批量任务执行器工具类
        static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
                                                           int workerCount,
                                                           final TaskProcessor<T> processor,
                                                           final AcceptorExecutor<ID, T> acceptorExecutor) {
            final AtomicBoolean isShutdown = new AtomicBoolean();
            // 任务执行指标 
            final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
            // 创建TaskExecutors对象,并创建线程工厂WorkerRunnableFactory。创建指定的工作线程BatchWorkerRunnable
            return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() {
                // 线程创建的实现 
                @Override
                public WorkerRunnable<ID, T> create(int idx) {
                    // 线程名:TaskBatchingWorker-target_主机名-索引下标
                    return new BatchWorkerRunnable<>("TaskBatchingWorker-" +name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor);
                }
            }, workerCount, isShutdown);
        }
    
    

    👍👍小结:
    SpringCloud 会接管 Eureka Server端得一系列组件创建过程,包括其中重要得组件:EurekaServerContext、EurekaServerBootstrap、PeerEurekaNodes、ResponseCache、EurekaServerConfig、PeerAwareInstanceRegistry、ApplicationInfoManager等。
    扩展了PeerEurekaNodes得实现 RefreshablePeerEurekaNodes(可刷新得Eureka 节点,即当环境参数变化时(监听到 EnvironmentChangeEvent事件)就会触发Eureka Server的更新服务列表逻辑)。然后默认的实现DefaultEurekaServerContext 会启动一个每隔10分钟得定时任务,也会去更新服务列表
    通过EurekaServerBootstrap类来管理Eureka Server节点 环境和上下文的生命周期
    通过PeerAwareInstanceRegistry来实现Eureka Server节点的相关操作(注册、取消、续约、节点复制、更新等)
    通过EurekaServerConfig来管理Eureka Server端节点的属性配置
    通过PeerEurekaNodes类来管理节点的生命周期(启动、关闭、更新、新增)
    通过ResponseCache来缓存服务实例(readWriteCacheMap、readOnlyCacheMap),默认30秒会定时从readWriteCacheMap中将数据同步到readOnlyCacheMap中
    通过PeerEurekaNode服务端节点,执行peer节点得复制、批量发送等操作。封装了一系列得服务端交互接口:包括服务注册、心跳检测、服务下线、状态更新。其中,批量任务处理器得创建又会引入两个线程:BatchWorkerRunnable和AcceptorRunner,这两个线程通过生产者消费者模型(batchWorkQueue)实现任务处理与处理结果获取。还实现了一些异常处理机制(失败重试等):reprocessQueue、acceptorQueue、pendingTasks。(这块得批量任务处理方式很值得我们学习,另外Kafka得批量消息处理也很值得我们借鉴与吸收

    • 到此,Eureka服务端已经大致分析完毕,下篇将会针对于Eureka客户端进行剖析。
    1. ☛ 文章要是勘误或者知识点说的不正确,欢迎评论,毕竟这也是作者通过阅读源码获得的知识,难免会有疏忽!
    2. 要是感觉文章对你有所帮助,不妨点个关注,或者移驾看一下作者的其他文集,也都是干活多多哦,文章也在全力更新中。
    3. 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处!

    相关文章

      网友评论

        本文标题:SpringCloud 服务注册与发现 源码分析(一)

        本文链接:https://www.haomeiwen.com/subject/amzhnhtx.html