美文网首页
eureka 源码跟踪

eureka 源码跟踪

作者: NirvanalI | 来源:发表于2020-04-09 12:11 被阅读0次

    首先看@EnableDiscoveryClient 源码:

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    // 往容器中注册 EnableDiscoveryClientImportSelector
    @Import({EnableDiscoveryClientImportSelector.class})
    public @interface EnableDiscoveryClient {
        boolean autoRegister() default true;
    }
    

    先看EnableDiscoveryClientImportSelector 继承关系:

    image.png

    从上图可以发现EnableDiscoveryClientImportSelector 实现了ImportSelector 接口。
    接着看EnableDiscoveryClientImportSelector 的代码:

    public String[] selectImports(AnnotationMetadata metadata) {
        String[] imports = super.selectImports(metadata);
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(this.getAnnotationClass().getName(), true));
        boolean autoRegister = attributes.getBoolean("autoRegister");
        // 如果是自动注册添加 org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration
        // AutoServiceRegistrationConfiguration 这个貌似就是一个空实现类。
        if (autoRegister) {
            List<String> importsList = new ArrayList(Arrays.asList(imports));
            importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
            imports = (String[])importsList.toArray(new String[0]);
        } else {
            Environment env = this.getEnvironment();
            if (ConfigurableEnvironment.class.isInstance(env)) {
                ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
                LinkedHashMap<String, Object> map = new LinkedHashMap();
                map.put("spring.cloud.service-registry.auto-registration.enabled", false);
                MapPropertySource propertySource = new MapPropertySource("springCloudDiscoveryClient", map);
                configEnv.getPropertySources().addLast(propertySource);
            }
        }
        return imports;
    }
    

    spring-cloud-commons包下 /META-INF/spring.factories 可以发现, 自动注入的类:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.client.CommonsClientAutoConfiguration,\
    org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClientAutoConfiguration,\
    // NoopDiscoveryClientAutoConfiguration 已经过时
    org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration,\
    org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration,\
    

    插一段: 注册EurekaDiscoveryClient
    查看spring-cloud-netflix-eureka-client 下的 /META-INF/spring.factories 内容

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
    org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
    
    org.springframework.cloud.bootstrap.BootstrapConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
    

    看上面spring.factories 标红的配置类:
    EurekaClientAutoConfiguration:

    public class EurekaClientAutoConfiguration {
        
        // 往容器中注入EurekaDiscoveryClient 类。
        @Bean
        public DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
            return new EurekaDiscoveryClient(client, clientConfig);
        }
    }
    

    首先看看SimpleDiscoveryClientAutoConfiguration 配置类代码:

    @Configuration
    @AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class})
    public class SimpleDiscoveryClientAutoConfiguration {
        @Autowired(
            required = false
        )
        private ServerProperties server;
        @Autowired
        private ApplicationContext context;
        // 配置文件中设置的 spring.application.name
        @Value("${spring.application.name:application}")
        private String serviceId;
        @Autowired
        private InetUtils inet;
        
        public SimpleDiscoveryClientAutoConfiguration() {
        }
        
        // 将配置好的服务信息, serviceId, hostname, port 等信息包装在SimpleDiscoveryProperties 中返回
        @Bean
        public SimpleDiscoveryProperties simpleDiscoveryProperties() {
            SimpleDiscoveryProperties simple = new SimpleDiscoveryProperties();
            simple.getLocal().setServiceId(this.serviceId);
            simple.getLocal().setUri(URI.create("http://" + this.inet.findFirstNonLoopbackHostInfo().getHostname() + ":" + this.findPort()));
            return simple;
        }
        
        // 根据simpleDiscoveryProperties() 方法新增DiscoveryClient 的实现SimpleDiscoveryClient 到容器中
        @Bean
        @Order(2147483647)
        public DiscoveryClient simpleDiscoveryClient() {
            return new SimpleDiscoveryClient(this.simpleDiscoveryProperties());
        }
        
        // ....
    }
    

    再看看CompositeDiscoveryClientAutoConfiguration 配置类代码:

    @Configuration
    // 加载SimpleDiscoveryClientAutoConfiguration 前加载该类
    @AutoConfigureBefore({SimpleDiscoveryClientAutoConfiguration.class})
    public class CompositeDiscoveryClientAutoConfiguration {
        public CompositeDiscoveryClientAutoConfiguration() {
            
        }
        
        // 新增DiscoveryClient 实现类CompositeDiscoveryClient. 该类中包含容器中现有的discoveryClients 集合
        // 在项目中注入 DiscoveryClient  默认注入的就是该实现类[CompositeDiscoveryClient]
        // 自动注入discoveryClients 会发现该集合中包含 EurekaDiscoveryClient和 SimpleDiscoveryClient 两个
        @Bean
        @Primary
        public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
            return new CompositeDiscoveryClient(discoveryClients);
        }
    }
    

    我们重点看下 EurekaDiscoveryClient 中的代码实现:
    EurekaDiscoveryClient 中的代码:

    public class EurekaDiscoveryClient implements DiscoveryClient {
        public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
        private final EurekaClient eurekaClient;
        private final EurekaClientConfig clientConfig;
        
        /** @deprecated */
        @Deprecated
        public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
            this(eurekaClient, eurekaClient.getEurekaClientConfig());
        }
        
        public EurekaDiscoveryClient(EurekaClient eurekaClient, EurekaClientConfig clientConfig) {
            this.clientConfig = clientConfig;
            this.eurekaClient = eurekaClient;
        }
        
        public String description() {
            return "Spring Cloud Eureka Discovery Client";
        }
        // 根据serviceId 获取ServiceInstance 的集合
        public List<ServiceInstance> getInstances(String serviceId) {
            List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false);
            List<ServiceInstance> instances = new ArrayList();
            Iterator var4 = infos.iterator();
            while(var4.hasNext()) {
                InstanceInfo info = (InstanceInfo)var4.next();
                instances.add(new EurekaDiscoveryClient.EurekaServiceInstance(info));
            }
            
            return instances;
        }
        // 获取服务名集合
        public List<String> getServices() {
            Applications applications = this.eurekaClient.getApplications();
            if (applications == null) {
                return Collections.emptyList();
            } else {
                List<Application> registered = applications.getRegisteredApplications();
                List<String> names = new ArrayList();
                Iterator var4 = registered.iterator();
                
                while(var4.hasNext()) {
                    Application app = (Application)var4.next();
                    if (!app.getInstances().isEmpty()) {
                        names.add(app.getName().toLowerCase());
                    }
                }
                
                return names;
            }
        }
    }
    

    我们从上面代码可以发现, 获取服务的方法最后的实现都丢给了EurekaClient
    EurekaClient的实现:
    -DiscoveryClient[重点debug实现]
    -CloudEurekaClient

    DiscoveryClient的代码查看:

    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        this.clientConfig = config;
        staticClientConfig = this.clientConfig;
        this.transportConfig = config.getTransportConfig();
        this.instanceInfo = myInfo;
        if (myInfo != null) {
            this.appPathIdentifier = this.instanceInfo.getAppName() + "/" + this.instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }
    
    
        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new InstanceInfoBasedUrlRandomizer(this.instanceInfo);
        this.localRegionApps.set(new Applications());
        this.fetchRegistryGeneration = new AtomicLong(0L);
        this.remoteRegionsToFetch = new AtomicReference(this.clientConfig.fetchRegistryForRemoteRegions());
        this.remoteRegionsRef = new AtomicReference(this.remoteRegionsToFetch.get() == null ? null : ((String)this.remoteRegionsToFetch.get()).split(","));
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registry.lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
    
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, "eurekaClient.registration.lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
    
        logger.info("Initializing Eureka in region {}", this.clientConfig.getRegion());
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            this.scheduler = null;
            this.heartbeatExecutor = null;
            this.cacheRefreshExecutor = null;
            this.eurekaTransport = null;
            this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion());
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
            this.initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());
        } else {
            try {
                this.scheduler = Executors.newScheduledThreadPool(2, 
                        (new ThreadFactoryBuilder())
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
                this.heartbeatExecutor = new ThreadPoolExecutor(1, 
                        this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 
                        0L, 
                        TimeUnit.SECONDS, 
                        new SynchronousQueue(), 
                        (new ThreadFactoryBuilder())
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build());
                this.cacheRefreshExecutor = new ThreadPoolExecutor(1, 
                        this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 
                        0L, 
                        TimeUnit.SECONDS, 
                        new SynchronousQueue(), 
                        (new ThreadFactoryBuilder())
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build());
                this.eurekaTransport = new DiscoveryClient.EurekaTransport();
                this.scheduleServerEndpointTask(this.eurekaTransport, args);
                
                this.instanceRegionChecker = new InstanceRegionChecker((AzToRegionMapper)azToRegionMapper, this.clientConfig.getRegion());
            } catch (Throwable var10) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", var10);
            }
            // ...
            this.initScheduledTasks();
            // ...
        }
    }
    

    上面新建了一个心跳线程池[heartbeatExecutor]和一个注册信息列表刷新缓存池[cacheRefreshExecutor]

    initScheduledTasks() 方法:

    private void initScheduledTasks() {
        int renewalIntervalInSecs;
        int expBackOffBound;
        if (this.clientConfig.shouldFetchRegistry()) {
            renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
            expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", 
                this.scheduler, 
                this.cacheRefreshExecutor, 
                renewalIntervalInSecs, 
                TimeUnit.SECONDS, 
                expBackOffBound, 
                // 刷新服务列表缓存服务
                new DiscoveryClient.CacheRefreshThread()), 
                (long)renewalIntervalInSecs, 
                TimeUnit.SECONDS);
        }
        
        if (this.clientConfig.shouldRegisterWithEureka()) {
            renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
            this.scheduler.schedule(new TimedSupervisorTask("heartbeat", 
                this.scheduler, 
                this.heartbeatExecutor, 
                renewalIntervalInSecs, 
                TimeUnit.SECONDS, 
                expBackOffBound, 
                // 续租服务
                new DiscoveryClient.HeartbeatThread()), 
                (long)renewalIntervalInSecs, 
                TimeUnit.SECONDS);
            this.instanceInfoReplicator = new InstanceInfoReplicator(this, 
                this.instanceInfo, 
                this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 
                2);
            // ....
            this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    

    接着进入instanceInfoReplicator[实现了Runnable接口] 的run()方法:

    public void run() {
        boolean var6 = false;
        ScheduledFuture next;
        label53: {
            try {
                var6 = true;
                this.discoveryClient.refreshInstanceInfo();
                Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    this.discoveryClient.register();
                    this.instanceInfo.unsetIsDirty(dirtyTimestamp);
                    var6 = false;
                } else {
                    var6 = false;
                }
                break label53;
            } catch (Throwable var7) {
                logger.warn("There was a problem with the instance info replicator", var7);
                var6 = false;
            } finally {
                if (var6) {
                    ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                    this.scheduledPeriodicRef.set(next);
                }
            }
            next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
            this.scheduledPeriodicRef.set(next);
            return;
        }
        next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
        this.scheduledPeriodicRef.set(next);
    }
    

    服务注册方法最终回调用到DiscoveryClientregister() 方法:

    boolean register() throws Throwable {
        logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
        EurekaHttpResponse httpResponse;
        try {
            httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
        } catch (Exception var3) {
            logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
            throw var3;
        }
        
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    

    [eureka-clienteureka-server 注册的最终提交代码]
    [服务注册]
    register() 继续调用 AbstractJerseyEurekaHttpClient.register() 方法:

    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        
        EurekaHttpResponse var5;
        try {
            Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
            // resourceBuilder[WebResource实例] 中的url: http://localhost:8761/eureka/apps/EUREKA-CLIENT
            // resourceBuilder 中CopyOnWriteHashMap<String, Object> properties 属性: 
            // "http.protocol.handle-redirects" -> "false"
            // "com.sun.jersey.client.property.followRedirects" -> "false"
            // "com.sun.jersey.impl.client.httpclient.connectionManager" ->
            // "http.useragent" -> "Java-EurekaClient/v1.9.12"
            this.addExtraHeaders(resourceBuilder);
            // 向eureka-server 执行注册请求[并将自身信息 info提交过去]
            response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip"))
                .type(MediaType.APPLICATION_JSON_TYPE))
                .accept(new String[]{"application/json"}))
                .post(ClientResponse.class, info);
            var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return var5;
    }
    

    [续订服务续约]

    private class HeartbeatThread implements Runnable {
        private HeartbeatThread() {
        }
        public void run() {
            if (DiscoveryClient.this.renew()) {
                DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    

    DiscoveryClient.this.renew()代码:

    boolean renew() {
        try {
            EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient
                .sendHeartBeat(this.instanceInfo.getAppName(), 
                this.instanceInfo.getId(), 
                this.instanceInfo, 
                (InstanceStatus)null);
            logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                this.REREGISTER_COUNTER.increment();
                logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
                long timestamp = this.instanceInfo.setIsDirtyWithTime();
                boolean success = this.register();
                if (success) {
                    this.instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            } else {
                return httpResponse.getStatusCode() == Status.OK.getStatusCode();
            }
        } catch (Throwable var5) {
            logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);
            return false;
        }
    }
    

    最终调用com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat方法:

    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        EurekaHttpResponse var10;
        try {
            // 最后请求的链接: 
            // http://localhost:8761/eureka/apps/EUREKA-CLIENT/localhost:eureka-client:8762?status=UP&lastDirtyTimestamp=1583157097771
            WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            this.addExtraHeaders(requestBuilder);
            // 执行请求操作
            // 如果请求返回404, 说明不存在, 需要重新走注册流程
            response = (ClientResponse)requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity()) {
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            var10 = eurekaResponseBuilder.build();
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return var10;
    }
    

    [服务下线取消]:

    @PreDestroy
    public synchronized void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");
            if (this.statusChangeListener != null && this.applicationInfoManager != null) {
                this.applicationInfoManager.unregisterStatusChangeListener(this.statusChangeListener.getId());
            }
            this.cancelScheduledTasks();
            if (this.applicationInfoManager != null && this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldUnregisterOnShutdown()) {
                this.applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                // 服务下线操作
                this.unregister();
            }
            if (this.eurekaTransport != null) {
                this.eurekaTransport.shutdown();
            }
            this.heartbeatStalenessMonitor.shutdown();
            this.registryStalenessMonitor.shutdown();
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
    

    进入同类方法unregister() 方法中:

    void unregister() {
        if (this.eurekaTransport != null && this.eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = this.eurekaTransport.registrationClient.cancel(this.instanceInfo.getAppName(), this.instanceInfo.getId());
                logger.info("DiscoveryClient_{} - deregister  status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception var2) {
                logger.error("DiscoveryClient_{} - de-registration failed{}", new Object[]{this.appPathIdentifier, var2.getMessage(), var2});
            }
        }
    }
    

    cancel() 最终执行com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#cancel:

    public EurekaHttpResponse<Void> cancel(String appName, String id) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        EurekaHttpResponse var6;
        try {
            Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
            this.addExtraHeaders(resourceBuilder);
            response = (ClientResponse)resourceBuilder.delete(ClientResponse.class);
            var6 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return var6;
    }
    

    相关文章

      网友评论

          本文标题:eureka 源码跟踪

          本文链接:https://www.haomeiwen.com/subject/eeipphtx.html