美文网首页
深入Eureka Server启动源码分析(一)

深入Eureka Server启动源码分析(一)

作者: sharedCode | 来源:发表于2018-07-11 10:32 被阅读0次

    start包配置

    spring-cloud-netflix-eureka-server包结构

    在这个包下面META-INF下面有个spring.factories这个配置文件

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
    
    

    使用了springboot EnableAutoConfiguration这个注解,在springboot应用启动的时候,会自动加载EurekaServerAutoConfiguration这个bean,后面主要看这个bean的源码

    启动源码分析

    EurekaServerAutoConfiguration

    用于EurekaServer往beanfactory添加相关eureka-server功能bean

    @Configuration
    @Import(EurekaServerInitializerConfiguration.class)
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,
          InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    
        // 此处省略大部分代码,仅抽取一些关键的代码片段
    
        // 加载EurekaController, spring-cloud 提供了一些额外的接口,用来获取eurekaServer的信息
        @Bean
        @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
        public EurekaController eurekaController() {
           return new EurekaController(this.applicationInfoManager);
        }
        // 配置服务节点信息,这里的作用主要是为了配置Eureka的peer节点,也就是说当有收到有节点注册上来
        //的时候,需要通知给那些服务节点, (互为一个集群)
        @Bean
        @ConditionalOnMissingBean
        public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
            ServerCodecs serverCodecs) {
            return new PeerEurekaNodes(registry, this.eurekaServerConfig,
             this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
        }
        // EurekaServer的上下文
        @Bean
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
          PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
             registry, peerEurekaNodes, this.applicationInfoManager);
        }
        // 这个类的作用是spring-cloud和原生eureka的胶水代码,通过这个类来启动EurekaSever
        // 后面这个类会在EurekaServerInitializerConfiguration被调用,进行eureka启动
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
          EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
             this.eurekaClientConfig, this.eurekaServerConfig, registry,
             serverContext);
        }
    // 配置拦截器,ServletContainer里面实现了jersey框架,通过他来实现eurekaServer对外的restFull接口
    @Bean
    public FilterRegistrationBean jerseyFilterRegistration(
          javax.ws.rs.core.Application eurekaJerseyApp) {
       FilterRegistrationBean bean = new FilterRegistrationBean();
       bean.setFilter(new ServletContainer(eurekaJerseyApp));
       bean.setOrder(Ordered.LOWEST_PRECEDENCE);
       bean.setUrlPatterns(
             Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
       return bean;
    }
    
    // 拦截器实例
    @Bean
    public javax.ws.rs.core.Application jerseyApplication(Environment environment,
          ResourceLoader resourceLoader) {
    
       ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
             false, environment);
    
       // Filter to include only classes that have a particular annotation.
       //
       provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
       provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
       // Find classes in Eureka packages (or subpackages)
       //
       Set<Class<?>> classes = new HashSet<Class<?>>();
       for (String basePackage : EUREKA_PACKAGES) {
          Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
          for (BeanDefinition bd : beans) {
             Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                   resourceLoader.getClassLoader());
             classes.add(cls);
          }
       }
    
       // Construct the Jersey ResourceConfig
       //
       Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
       propsAndFeatures.put(
             // Skip static content used by the webapp
             ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
             EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
       DefaultResourceConfig rc = new DefaultResourceConfig(classes);
       rc.setPropertiesAndFeatures(propsAndFeatures);
    
       return rc;
    }
    }
    
    

    1.@Configuration 表明这是一个配置类
    2.@Import(EurekaServerInitializerConfiguration.class) 导入启动EurekaServer的bean
    3.@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 这个是表示只有在spring容器里面含有Market这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration ),这个就是控制是否开启EurekaServer的关键,在@EableEurekaServer这个注解里面,就是创建了一个Market兑现,用来告诉他,我开启了Eureka服务

    4.@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })

    5.@PropertySource("classpath:/eureka/server.properties") 加载配置文件。

    EurekaServerInitializerConfiguration

    从上面的代码分析上可以看到,在EurekaServerAutoConfiguration加载完成之后就会执行

    EurekaServerInitializerConfiguration这个类的start方法, 这个类实现了spring的SmartLifecyl,后续会开个单章介绍这个类

    /**
     * @author Dave Syer
     */
    @Configuration
    @CommonsLog
    public class EurekaServerInitializerConfiguration
          implements ServletContextAware, SmartLifecycle, Ordered {
    
       @Autowired
       private EurekaServerConfig eurekaServerConfig;
    
       private ServletContext servletContext;
    
       @Autowired
       private ApplicationContext applicationContext;
    
       @Autowired
       private EurekaServerBootstrap eurekaServerBootstrap;
    
       private boolean running;
    
       private int order = 1;
    
       @Override
       public void setServletContext(ServletContext servletContext) {
          this.servletContext = servletContext;
       }
    
       @Override
       public void start() {
          // 启动一个线程
          new Thread(new Runnable() {
             @Override
             public void run() {
                try {
                   //初始化EurekaServer,同时启动Eureka Server ,后面着重讲这里
                   eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                   log.info("Started Eureka Server");
                    // 发布EurekaServer的注册事件
                   publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                    // 设置启动的状态为true
                   EurekaServerInitializerConfiguration.this.running = true;
                    // 发送Eureka Start 事件 , 其他还有各种事件,我们可以监听这种时间,然后做一些特定的业务需求,后面会讲到。
                   publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                }
                catch (Exception ex) {
                   // Help!
                   log.error("Could not initialize Eureka servlet context", ex);
                }
             }
          }).start();
       }
    
       private EurekaServerConfig getEurekaServerConfig() {
          return this.eurekaServerConfig;
       }
    
       private void publish(ApplicationEvent event) {
          this.applicationContext.publishEvent(event);
       }
    
       @Override
       public void stop() {
          this.running = false;
          eurekaServerBootstrap.contextDestroyed(this.servletContext);
       }
    
       @Override
       public boolean isRunning() {
          return this.running;
       }
    
       @Override
       public int getPhase() {
          return 0;
       }
    
       @Override
       public boolean isAutoStartup() {
          return true;
       }
    
       @Override
       public void stop(Runnable callback) {
          callback.run();
       }
    
       @Override
       public int getOrder() {
          return this.order;
       }
    
    }
    
    

    EurekaServerBootstrap

    public void contextInitialized(ServletContext context) {
       try {
          // 初始化Eureka的环境变量
          initEurekaEnvironment();
          // 初始化Eureka的上下文
          initEurekaServerContext();
    
          context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
       }
       catch (Throwable e) {
          log.error("Cannot bootstrap eureka server :", e);
          throw new RuntimeException("Cannot bootstrap eureka server :", e);
       }
    }
    
    
    
    protected void initEurekaEnvironment() throws Exception {
       log.info("Setting the eureka configuration..");
    
       String dataCenter = ConfigurationManager.getConfigInstance()
             .getString(EUREKA_DATACENTER);
       if (dataCenter == null) {
          log.info(
                "Eureka data center value eureka.datacenter is not set, defaulting to default");
          ConfigurationManager.getConfigInstance()
                .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
       }
       else {
          ConfigurationManager.getConfigInstance()
                .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
       }
       String environment = ConfigurationManager.getConfigInstance()
             .getString(EUREKA_ENVIRONMENT);
       if (environment == null) {
          ConfigurationManager.getConfigInstance()
                .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
          log.info(
                "Eureka environment value eureka.environment is not set, defaulting to test");
       }
       else {
          ConfigurationManager.getConfigInstance()
                .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
       }
    }
    
    protected void initEurekaServerContext() throws Exception {
       // For backward compatibility
       JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
             XStream.PRIORITY_VERY_HIGH);
       XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
             XStream.PRIORITY_VERY_HIGH);
    
       if (isAws(this.applicationInfoManager.getInfo())) {
          this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
          this.awsBinder.start();
       }
    
       //初始化eureka server上下文
       EurekaServerContextHolder.initialize(this.serverContext);
    
       log.info("Initialized server context");
    
       // Copy registry from neighboring eureka node
       // 从相邻的eureka节点复制注册表 
       int registryCount = this.registry.syncUp();
        // 默认每30秒发送心跳,1分钟就是2次
        // 修改eureka状态为up 
        // 同时,这里面会开启一个定时任务,用于清理 60秒没有心跳的客户端。自动下线
       this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
       // Register all monitoring statistics.
       EurekaMonitors.registerAllStats();
    }
    public void contextDestroyed(ServletContext context) {
       try {
          log.info("Shutting down Eureka Server..");
          context.removeAttribute(EurekaServerContext.class.getName());
    
          destroyEurekaServerContext();
          destroyEurekaEnvironment();
    
       }
       catch (Throwable e) {
          log.error("Error shutting down eureka", e);
       }
       log.info("Eureka Service is now shutdown...");
    }
    PeerAwareInstanceRegistryImpl
    
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 计算每分钟最大续约数
        this.expectedNumberOfRenewsPerMin = count * 2;
        // 每分钟最小续约数
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got " + count + " instances from neighboring DS node");
        logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        // 设置实例的状态为UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例
        super.postInit();
    }
    
    

    PeerEurekaNodes

    从上面的EurekaServerAutoConfiguration类,我们可以看到有个初始化EurekaServerContext的方法

    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
          PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
       return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
             registry, peerEurekaNodes, this.applicationInfoManager);
    }
    
    

    DefaultEurekaServerContext 这个类里面的的initialize()方法是被@PostConstruct 这个注解修饰的,在应用加载的时候,会执行这个方法

    public void initialize() throws Exception {
        logger.info("Initializing ...");
        // 启动一个线程,读取其他集群节点的信息,后面后续复制
        peerEurekaNodes.start();
        //
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }
    
    

    peerEurekaNodes.start()主要是启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息
    然后启动了一个定时线程,每60秒更新一次,也就是说后续可以根据配置动态的修改节点配置。(原生的spring cloud config支持)

    PeerEurekaNodes
    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            // 首次进来,更新集群节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            // 搞个线程
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }
    
                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }
    // 根据URL 构建PeerEurekaNode信息
    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }
    
    

    registry.init(peerEurekaNodes);这里面使用的是PeerAwareInstanceRegistryImpl , 注册信息管理类里面的init方法

    1. 构建一个非空的ResponseCache
    2. 启动一个定时器,更新eureka的阈值 ,默认每15分钟执行一次 , 这个阈值主要是用来判断是否开启自我保护机制 , 后面会单独开篇幅来讲解
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        initializedResponseCache();
        scheduleRenewalThresholdUpdateTask();
        initRemoteRegionRegistry();
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }
    
    

    至此,EurekaServer算是启动完毕 , EurekaServerBootstrap是完全复制了原生EurekaBootstrap的代码, 因为原生的Eureka是在servlet应用,但是spring-cloud的应用是运行在内嵌的Tomcat等WEB服务器里面的,这里就是使用EurekaServerBootstrap来做替换,最终是Eureka能够在springboot中使用。

    相关文章

      网友评论

          本文标题:深入Eureka Server启动源码分析(一)

          本文链接:https://www.haomeiwen.com/subject/zzocpftx.html