美文网首页
spring cloud是怎样通过一个注解将eureka ser

spring cloud是怎样通过一个注解将eureka ser

作者: zacone | 来源:发表于2021-11-09 01:49 被阅读0次

    eureka server源代码目结构

    ├── spring-cloud-netflix-eureka-server
    │   ├── pom.xml
    │   ├── src
    │   │   ├── main
    │   │   │   ├── java
    │   │   │   │   └── org
    │   │   │   │       └── springframework
    │   │   │   │           └── cloud
    │   │   │   │               └── netflix
    │   │   │   │                   └── eureka
    │   │   │   │                       └── server
    │   │   │   │                           ├── CloudJacksonJson.java
    │   │   │   │                           ├── EnableEurekaServer.java
    │   │   │   │                           ├── EurekaController.java
    │   │   │   │                           ├── EurekaDashboardProperties.java
    │   │   │   │                           ├── EurekaServerAutoConfiguration.java
    │   │   │   │                           ├── EurekaServerBootstrap.java
    │   │   │   │                           ├── EurekaServerConfigBean.java
    │   │   │   │                           ├── EurekaServerInitializerConfiguration.java
    │   │   │   │                           ├── EurekaServerMarkerConfiguration.java
    │   │   │   │                           ├── InstanceRegistry.java
    │   │   │   │                           ├── InstanceRegistryProperties.java
    │   │   │   │                           ├── ReplicationClientAdditionalFilters.java
    │   │   │   │                           └── event
    │   │   │   │                               ├── EurekaInstanceCanceledEvent.java
    │   │   │   │                               ├── EurekaInstanceRegisteredEvent.java
    │   │   │   │                               ├── EurekaInstanceRenewedEvent.java
    │   │   │   │                               ├── EurekaRegistryAvailableEvent.java
    │   │   │   │                               └── EurekaServerStartedEvent.java
    │   │   │   ├── resources
    │   │   │   │   ├── META-INF
    │   │   │   │   │   └── spring.factories
    │   │   │   │   ├── eureka
    │   │   │   │   │   └── server.properties
    

    启动过程分析

    1. @EnableEurekaServer

    在spring-cloud项目中,启动eureka server需要添加一个@EnableEurekaServer注解。

    2.@ConditionalOnBean

    spring-boot基于spring-framework的@Conditional提供了提一套条件注解,其中@ConditionalOnBean会根据当前BeanFactory中是否存在某个类的bean实例决定是否创建注解标识的类实例,而spring-cloud-netflix-eureka-server的判断条件则是@EnableEurekaServer中通过@Import导入的EurekaServerMarkerConfiguration内部类Marker。启动时spring扫描到Marker类的bean后,EurekaServerMarkerConfiguration中的eureka所需的bean便开始创建了。

    3.创建eureka-server所需的类实例

    3.1 eureka相关配置

    • 控制面板配置
        @Bean
        @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
        public EurekaController eurekaController() {
            return new EurekaController(this.applicationInfoManager);
        }
    

    用户启用了dashboard后,会创建EurekaController的bean,并注入ApplicationInfoManager实例供控制面板所需功能使用。

    • 编码器配置
        @Bean
        public ServerCodecs serverCodecs() {
            return new CloudServerCodecs(this.eurekaServerConfig);
        }
    

    spring cloud的CloudServerCodecs重写了eureka编码器DefaultServerCodecs的构造函数,目的是替换eureka的json序列化实现。相类似的,json序列化是通过spring cloud的CloudJacksonCodec重写eureka的EurekaJacksonCodec的构造函数。

        CloudJacksonCodec() {
                super();
    
                ObjectMapper mapper = new ObjectMapper();
                mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    
                SimpleModule module = new SimpleModule("eureka1.x", VERSION);
                module.addSerializer(DataCenterInfo.class, new DataCenterInfoSerializer());
                module.addSerializer(InstanceInfo.class, new CloudInstanceInfoSerializer());
                module.addSerializer(Application.class, new ApplicationSerializer());
                module.addSerializer(Applications.class,
                        new ApplicationsSerializer(this.getVersionDeltaKey(), this.getAppHashCodeKey()));
    
                // TODO: Watch if this causes problems
                // module.addDeserializer(DataCenterInfo.class,
                // new DataCenterInfoDeserializer());
                module.addDeserializer(LeaseInfo.class, new LeaseInfoDeserializer());
                module.addDeserializer(InstanceInfo.class, new CloudInstanceInfoDeserializer(mapper));
                module.addDeserializer(Application.class, new ApplicationDeserializer(mapper));
                module.addDeserializer(Applications.class,
                        new ApplicationsDeserializer(mapper, this.getVersionDeltaKey(), this.getAppHashCodeKey()));
    
                mapper.registerModule(module);
    
                HashMap<Class<?>, Supplier<ObjectReader>> readers = new HashMap<>();
                readers.put(InstanceInfo.class,
                        () -> mapper.reader().withType(InstanceInfo.class).withRootName("instance"));
                readers.put(Application.class,
                        () -> mapper.reader().withType(Application.class).withRootName("application"));
                readers.put(Applications.class,
                        () -> mapper.reader().withType(Applications.class).withRootName("applications"));
                setField("objectReaderByClass", readers);
    
                HashMap<Class<?>, ObjectWriter> writers = new HashMap<>();
                writers.put(InstanceInfo.class, mapper.writer().withType(InstanceInfo.class).withRootName("instance"));
                writers.put(Application.class, mapper.writer().withType(Application.class).withRootName("application"));
                writers.put(Applications.class, mapper.writer().withType(Applications.class).withRootName("applications"));
                setField("objectWriterByClass", writers);
    
                setField("mapper", mapper);
            }
    

    核心是使用反射能力,替换父类成员变量中的jackson实现。

        void setField(String name, Object value) {
            Field field = ReflectionUtils.findField(EurekaJacksonCodec.class, name);
            ReflectionUtils.makeAccessible(field);
            ReflectionUtils.setField(field, this, value);
        }
    
    • 注册中心实例创建
        @Bean
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
            this.eurekaClient.getApplications(); // force initialization
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
                    this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    

    spring cloud的InstanceRegistry继承了eureka的PeerAwareInstanceRegistryImpl,并接入了spring的ApplicationContextAware,将spring的context注入到了注册中心。

        @Override
        public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
            handleRegistration(info, leaseDuration, isReplication);
            super.register(info, leaseDuration, isReplication);
        }
    
        @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }
    
        @Override
        public boolean cancel(String appName, String serverId, boolean isReplication) {
            handleCancelation(appName, serverId, isReplication);
            return super.cancel(appName, serverId, isReplication);
        }
    
        @Override
        public boolean renew(final String appName, final String serverId, boolean isReplication) {
            log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
            List<Application> applications = getSortedApplications();
            for (Application input : applications) {
                if (input.getName().equals(appName)) {
                    InstanceInfo instance = null;
                    for (InstanceInfo info : input.getInstances()) {
                        if (info.getId().equals(serverId)) {
                            instance = info;
                            break;
                        }
                    }
                    publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
                    break;
                }
            }
            return super.renew(appName, serverId, isReplication);
        }
    
        @Override
        protected boolean internalCancel(String appName, String id, boolean isReplication) {
            handleCancelation(appName, id, isReplication);
            return super.internalCancel(appName, id, isReplication);
        }
    

    通过重写PeerAwareInstanceRegistryImpl,提供了eureka-client的注册、取消、续约事件订阅功能。

    • eureka的context创建
        @Bean
        @ConditionalOnMissingBean
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
                PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
                    this.applicationInfoManager);
        }
    

    此处就是eureka默认的实现,未做修改。

    • eureka启动器配置
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
                EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
                    registry, serverContext);
        }
    
    • jersey过滤器配置
        @Bean
        @ConditionalOnMissingBean
        public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() {
            return new ReplicationClientAdditionalFilters(Collections.emptySet());
        }
    
    • jersey过滤器注册器配置
        /**
         * Register the Jersey filter.
         * @param eurekaJerseyApp an {@link Application} for the filter to be registered
         * @return a jersey {@link FilterRegistrationBean}
         */
        @Bean
        public FilterRegistrationBean<?> jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {
            FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
            bean.setFilter(new ServletContainer(eurekaJerseyApp));
            bean.setOrder(Ordered.LOWEST_PRECEDENCE);
            bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
            return bean;
        }
    
    • eureka-server节点实例配置
        @Bean
        @ConditionalOnMissingBean
        public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
                ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
            return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs,
                    this.applicationInfoManager, replicationClientAdditionalFilters);
        }
    

    spring-cloud的RefreshablePeerEurekaNodes继承了eureka的PeerEurekaNodes,新增了一个构造方法,将ReplicationClientAdditionalFilters注入到当前实例成员变量中。

            private ReplicationClientAdditionalFilters replicationClientAdditionalFilters;
    
            RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig,
                    final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs,
                    final ApplicationInfoManager applicationInfoManager,
                    final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
                super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
                this.replicationClientAdditionalFilters = replicationClientAdditionalFilters;
            }
    
            @Override
            protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
                JerseyReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig,
                        serverCodecs, peerEurekaNodeUrl);
    
                this.replicationClientAdditionalFilters.getFilters().forEach(replicationClient::addReplicationClientFilter);
    
                String targetHost = hostFromUrl(peerEurekaNodeUrl);
                if (targetHost == null) {
                    targetHost = "host";
                }
                return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
            }
    

    作用是通过重写eureka节点创建方法(createPeerEurekaNode),将spring-cloud中配置的replicationClientAdditionalFilters(jersey的ClientFilter集)过滤器添加到jersey的client实例中(eureka接口基于JAX-RS,所以用的jersey)。

            @Override
            public void onApplicationEvent(final EnvironmentChangeEvent event) {
                if (shouldUpdate(event.getKeys())) {
                    updatePeerEurekaNodes(resolvePeerUrls());
                }
            }
    

    另外RefreshablePeerEurekaNodes实现了spring的ApplicationListener<EnvironmentChangeEvent>接口,在收到spring事件回调时,会触发eureka-server节点更新;

    • eureka配置类实例
        @Configuration(proxyBeanMethods = false)
        protected static class EurekaServerConfigBeanConfiguration {
    
            @Bean
            @ConditionalOnMissingBean
            public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
                EurekaServerConfigBean server = new EurekaServerConfigBean();
                if (clientConfig.shouldRegisterWithEureka()) {
                    // Set a sensible default if we are supposed to replicate
                    server.setRegistrySyncRetries(5);
                }
                return server;
            }
    
        }
    

    此处利用spring-boot的@ConfigurationProperties注解,将配置文件(application.properties)中的eureka配置信息同步到EurekaServerConfigBean类实例中,因EurekaServerConfigBean实现了EurekaServerConfig接口,故可供eureka-server启动时读取。

    3.2 spring相关配置

    • spring-boot端点配置
        @Bean
        public HasFeatures eurekaServerFeature() {
            return HasFeatures.namedFeature("Eureka Server", EurekaServerAutoConfiguration.class);
        }
    
    • JAX-RS配置
        /**
         * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
         * required by the Eureka server.
         * @param environment an {@link Environment} instance to retrieve classpath resources
         * @param resourceLoader a {@link ResourceLoader} instance to get classloader from
         * @return created {@link Application} object
         */
        @Bean
        public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
    
            ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false,
                    environment);
    
            // Filter to include only classes that have a particular annotation.
            //
            provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
            provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
            // Find classes in Eureka packages (or subpackages)
            //
            Set<Class<?>> classes = new HashSet<>();
            for (String basePackage : EUREKA_PACKAGES) {
                Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
                for (BeanDefinition bd : beans) {
                    Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
                    classes.add(cls);
                }
            }
    
            // Construct the Jersey ResourceConfig
            Map<String, Object> propsAndFeatures = new HashMap<>();
            propsAndFeatures.put(
                    // Skip static content used by the webapp
                    ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
                    EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
            DefaultResourceConfig rc = new DefaultResourceConfig(classes);
            rc.setPropertiesAndFeatures(propsAndFeatures);
    
            return rc;
        }
    
    • servlet过滤器配置
        @Bean
        @ConditionalOnBean(name = "httpTraceFilter")
        public FilterRegistrationBean<?> traceFilterRegistration(@Qualifier("httpTraceFilter") Filter filter) {
            FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
            bean.setFilter(filter);
            bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
            return bean;
        }
    

    4. 启动eureka-server

    EurekaServerInitializerConfiguration类实现了spring的SmartLifecycle接口,并且order值为1(bean创建优先级高),在spring生命周期开始时,就开始启动eureka-server。

        @Autowired
        private EurekaServerBootstrap eurekaServerBootstrap;
    
        @Override
        public void start() {
            new Thread(() -> {
                try {
                    // TODO: is this class even needed now?
                    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                    log.info("Started Eureka Server");
    
                    publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    EurekaServerInitializerConfiguration.this.running = true;
                    publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                    // Help!
                    log.error("Could not initialize Eureka servlet context", ex);
                }
            }).start();
        }
    

    创建一个新的线程,调用注入进来的成员变量eurekaServerBootstrap的contextInitialized方法开始启动,启动成功后发布eureka允许注册事件和eureka-server已启动事件。

    相关文章

      网友评论

          本文标题:spring cloud是怎样通过一个注解将eureka ser

          本文链接:https://www.haomeiwen.com/subject/brwpzltx.html