美文网首页程序员
Spring Cloud Eureka 源码导读

Spring Cloud Eureka 源码导读

作者: 黄大海 | 来源:发表于2018-05-09 18:59 被阅读40次

    设计结构

    spring-cloud-common是spring制定的服务注册发现的抽象规范。其具体实现可以适配eureka/consul/zookeeper/etcd.

    spring-eureka设计结构 (1).png
    服务组册发现框架 CAP支持 协议 说明
    eureka AP Simple Peer Replication 简单/Spring集成度高
    consul AP Raft 支持跨机房
    zookeeper CP Paxos 老牌框架
    etcd CP Raft Raft协议比Paxos简单

    如何启动

    • @ EnableDiscoveryClient
    @Import(EnableDiscoveryClientImportSelector.class)
    public @interface EnableDiscoveryClient {
        ...
    }
    
    • EnableDiscoveryClientImportSelector#selectImports
    public String[] selectImports(AnnotationMetadata metadata) {
        ...
        SpringFactoriesLoader.loadFactoryNames(EnableDiscoveryClient.class)
        ...
    }
    
    • spring-cloud-netflix-eureka-client-${version}.jar
      |- META-INF
         |- spring.factories
    org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
    
    • EurekaDiscoveryClientConfiguration.Marker
    public class EurekaDiscoveryClientConfiguration {
    
        class Marker {}
    
        @Bean
        public Marker eurekaDiscoverClientMarker() {
            return new Marker();
        }
        
        ...
    }
    
    
    • EurekaClientAutoConfiguration
    @Configuration
    ...
    @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
    ...
    public class EurekaClientAutoConfiguration {
    }
    
    • 至此,大多数相关类都会在EurekaClientAutoConfiguration中被初始化,其中EurekaClient的构造器中会初始化 注册/发现 功能
    @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
    }
    
    • DiscoveryClient#initScheduledTasks, 这里做了三件事
      • 获取服务任务(CacheRefreshThread)
      • 心跳任务(HeartbeatThread)
      • 注册任务(InstanceInfoReplicator)
     private void initScheduledTasks() {
            ...
           if (clientConfig.shouldFetchRegistry()) {
                ...
                scheduler.schedule( new CacheRefreshThread())
                ...
           }
            ...
            if (clientConfig.shouldRegisterWithEureka()) {
                ... 
                // Heartbeat timer
                scheduler.schedule( new HeartbeatThread())
                ...
                // InstanceInfo replicator
                instanceInfoReplicator.start()
            }
           
     }
    
    

    如何注册

    • InstanceInfoReplicator#start 默认间隔40秒
    public void start(int initialDelayMs)
        ...
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        ... 
    }
    
    • InstanceInfoReplicator#run, 中间会通过事件通知而重复调用该方法。
    public void run() {
        ...
        discoveryClient.register();
        ...
    }
    
    • DiscoveryClient#register
    boolean register() throws Throwable {
        EurekaHttpResponse<Void> httpResponse = 
        eurekaTransport.registrationClient.register(instanceInfo);
    }
    
    • AbstractJerseyEurekaHttpClient#register, 最后使用Jersey Rest客户端发起HTTP请求
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    如何获取服务列表

    • CacheRefreshThread#run, 默认30秒跑一次
    • CacheRefreshThread#fetchRegistry, 分为全量更新和增量更新
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        ...
        if (...){
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
       ...
    }
    
    • DiscoveryClient#getAndStoreFullRegistry, localRegionApps就是应用信息缓存
    private void getAndStoreFullRegistry() throws Throwable {
        ...
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        localRegionApps.set(this.filterAndShuffle(apps));
        ...
    }
    

    Server如何做Replication

    • EnableEurekaServer -> EurekaServerMarkerConfiguration.Marker -> EurekaServerAutoConfiguration。 Server端的配置主要集中在这里。

    • EurekaServerAutoConfiguration#peerAwareInstanceRegistry

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
                ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
    
    • PeerAwareInstanceRegistryImpl#register
      • 调用父类注册逻辑 super#register
      • 复制到其他节点 replicateToPeers
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        ...
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    • PeerAwareInstanceRegistryImpl#replicateToPeers
      • 判断是不是别的server传来的replicate数据。如果是,结束处理。
    private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
        ...
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
    
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    }
    
    • 最后调用Jersey发起HTTP请求, 这里有个关键点JerseyReplicationClient#addExtraHeaders, 这个特殊的http头,用来分辨是否是replicate请求
     webResource.header("x-netflix-discovery-replication", "true");
    

    相关文章

      网友评论

        本文标题:Spring Cloud Eureka 源码导读

        本文链接:https://www.haomeiwen.com/subject/cpuorftx.html