美文网首页
eureka(一)-功能介绍与客户端之服务获取

eureka(一)-功能介绍与客户端之服务获取

作者: Olla_0632 | 来源:发表于2020-03-16 19:21 被阅读0次

    Eureka源码分析(2.1.4.Release)

    首先源码切忌一行一行debug,需先了解eureka主要功能后,再分析其功能如何实现。


    image.png

    主要功能

    • 服务注册:客户端向eureka服务端注册自己的服务,服务端将信息存于多级缓存中。
    • 服务续约:服务注册后,客户端会定时发送心跳包来保持可用性,避免被剔除。每30秒(eureka.instance.leaseRenewallIntervalInSeconds)发送一次心跳来进行服务续约。
    • 服务同步:eureka服务端之间互相进行注册,构建server集群,不同Server之间会进行服务同步,用来保证服务信息的一致性。
    • 服务列表获取:客户端请求eureka server获取注册的服务清单,并缓存到客户端本地。默认间隔30秒。(eureka.client.registryFetchIntervalSeconds)

    fetchRegistry:true 当值为true时,会定期从eureka服务端获取列表,集群中的eureka服务端也会从其他的eureka节点中获取列表)

    • 远程调用:服务调用方在获取到服务清单后,就可以根据自身的一个负载均衡策略找到一个服务url,并对他发起请求。
    • 服务下线:客户端关闭或重启时,需告诉server端该服务已下线。服务端在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去
    • 服务剔除:如果客户端没有正常下线,需要有剔除机制,将不可用的服务删除掉。EurekaServer 在启动时会创建一个定时任务,每隔默认60秒,从当前服务清单中把超时没有续约(默认90秒,eureka.instance.leaseExpirationDurationInSeconds)的服务剔除
    • 自我保护:当短时间内,统计续约失败的比例,如果达到一定阈值,就会触发自我保护机制。在该机制下,Eureka Server不会剔除任何微服务。等到一分钟内最近续约个数大于一分钟内最小续约,再退出自我保护机制。自我保护开关(eureka.server.enable-self-preservation:false)

    Eureka保证AP

    什么是AP?
    著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。
    由于分区容错性在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。Eureka选择了保证AP。

    1. 如一个客户端因网络原因在一个eureka服务端无法获取服务列表时,可以从其他eureka服务端获取数据,即使与之前的eureka服务列表不一致;
    2. eureka服务端有同步机制,保证了服务列表的最终一致性(非强一致性);

    客户端初始化

    这里简要分析一下
    springboot会读取spring-cloud-netflix-eureka-client/2.1.1.RELEASE/spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar!/META-INF/spring.factories文件.(详情请看上一篇springboot自动装载

    自动初始化EnableAutoConfiguration下的所有配置类,内容如下:


    image.png

    重点看EurekaClientAutoConfiguration类

    //初始化eureka客户端
    @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager,
                    EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }
    
    image.png

    CloudEurekaClient继承了DiscoveryClient类,该类为原生eureka类,所以springcloud的eureka客户端实际是封装了原生eureka客户端,分析发现,eureka主要功能都在原生类上,所以直接看原生eureka类的初始化代码就行了。
    简要初始化流程如下:


    image.png

    具体初始化代码:com.netflix.discovery.DiscoveryClient#DiscoveryClient(com.netflix.appinfo.ApplicationInfoManager, com.netflix.discovery.EurekaClientConfig, com.netflix.discovery.AbstractDiscoveryClientOptionalArgs, javax.inject.Provider<com.netflix.discovery.BackupRegistry>)
    代码片段:

    ……忽略上部分代码……
    //实例化一个线程池(核心线程:2个,分别对应心跳续约任务和获取服务列表任务)
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());
    //声明心跳续约执行器(续约)
                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  
    //声明更新服务列表缓存执行器(获取任务列表)
                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                ); 
    ……忽略部分代码……
                //重点看这里:初始化定时任务
                initScheduledTasks();
    

    com.netflix.discovery.DiscoveryClient#initScheduledTasks:

     private void initScheduledTasks() {
            //默认为true,只有eureka服务端非集群模式时,该设置可以为false
            if (clientConfig.shouldFetchRegistry()) {
                //默认eureka.client.registryFetchIntervalSeconds=30,延时30秒执行
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                //默认eureka.client.cacheRefreshExecutorExponentialBackOffBound=10,如果发生超时的重试延迟的最大乘数。
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
              //其中scheduler和cacheRefreshExecutor为之前提到的声明对象,其实就是初始化了一个cacheRefresh名字的线程执行任务
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
    ……忽略下半部分代码(与服务注册任务相关)……
    

    在这里可以看到scheduler.schedule方法就是在定义一个延时任务,30秒后执行。但是按理说应该定义一个定时任务定时获取服务列表才对。为什么要这样定义呢?下面重点看看TimedSupervisorTask

    初始化实际赋值:

            //上文提到的线程池
            this.scheduler = scheduler;
            //上文提到的缓存更新执行器
            this.executor = cacheRefreshExecutor;
            //超时时间:上文提到的默认30秒
            this.timeoutMillis = timeUnit.toMillis(registryFetchIntervalSeconds);
            //执行的真正任务
            this.task = new CacheRefreshThread();
            //延迟时间:上文提到的默认30秒
            this.delay = new AtomicLong(timeoutMillis);
            //最大延迟时间:30秒*10=300秒,有什么用呢?看下面解释
            this.maxDelay = timeoutMillis * expBackOffBound
    

    这里说一下TimedSupervisorTask的执行逻辑,非固定的执行时间,而是采用动态间隔时间,去执行获取服务列表任务。(在平常工作中,也可以效仿此方法实现动态设置任务)

    具体实现流程图如下:


    image.png

    具体实现代码:
    com.netflix.discovery.TimedSupervisorTask#run:

    @Override
        public void run() {
            Future<?> future = null;
            try {
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                //执行任务在默认超时时间内。
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);  
                //如果任务没超时,则把延时时间设置为默认的超时时间30秒
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();
            } catch (TimeoutException e) {
                //如果超时
                logger.warn("task supervisor timed out", e);
                timeoutCounter.increment();
                long currentDelay = delay.get();
                //设置最新延时时间:将当前超时时间*2,与最大延时比较,即超时时间不能超过300秒
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                //CAS来控制多线程同步
                delay.compareAndSet(currentDelay, newDelay);
    
            } catch (RejectedExecutionException e) {
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, reject the task", e);
                } else {
                    logger.warn("task supervisor rejected the task", e);
                }
    
                rejectedCounter.increment();
            } catch (Throwable e) {
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, can't accept the task");
                } else {
                    logger.warn("task supervisor threw an exception", e);
                }
    
                throwableCounter.increment();
            } finally {
                if (future != null) {
                    future.cancel(true);
                }
                //又调用一个延时任务,延时时间为新设置的延时时间
                if (!scheduler.isShutdown()) {
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }
        }
    

    服务列表获取

    下面再来看看CacheRefreshThread的实现逻辑
    com.netflix.discovery.DiscoveryClient#refreshRegistry->
    com.netflix.discovery.DiscoveryClient#fetchRegistry

        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                //获取本地缓存:localRegionApps.get()
                Applications applications = getApplications();
                //判断是全量更新还是增量更新
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    //全量获取
                    getAndStoreFullRegistry();
                } else {
                    //增量获取
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                ……忽略下部分代码……
        }
    

    全量获取:
    com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry->
    eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())->
    com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications->
    com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplicationsInternal


    image.png

    增量获取:
    com.netflix.discovery.DiscoveryClient#getAndUpdateDelta:


    image.png

    eueka不是每次都是采用全量获取服务列表,而是默认采用增量方式,获取变化的实例。
    当然为保证获取到的服务列表是最新的,获取增量的同时,eureka服务端也会把最新的服务列表hashcode返回,客户端将此hashcode与更新后的本地缓存hashcode作对比,如果不相同,则重新进行全量获取。

    可以学习的地方:

    1. TimeSupervisorTask是一个动态时间间隔任务,如果遇到执行任务超时,则扩大为原来间隔时间的两倍,一直到延时时间的阈值。一旦新任务不再超时,间隔时间又恢复为初始值。如服务列表获取任务默认为每隔30秒执行,随着执行时间超时扩大为原来的2倍再执行,直到到达阈值。如果执行未超时,则设置为原来的间隔时间执行
    2. 采用CAS来控制多线程同步。
    3. 增量获取数据列表。尽量避免全量获取(当数据量很大的时候)
    4. 服务列表保存在本地缓存,每次远程调用,不需要经过注册中心,提高执行速度。当eureka服务端挂了的时候,eureka客户端还能通过本地缓存的服务列表正常调用远程服务。

    相关文章

      网友评论

          本文标题:eureka(一)-功能介绍与客户端之服务获取

          本文链接:https://www.haomeiwen.com/subject/xlaidhtx.html