美文网首页
eureka服务端源码解读

eureka服务端源码解读

作者: Y了个J | 来源:发表于2021-07-04 19:48 被阅读0次

    剖析eureka服务端启动流程

    服务端启动类-入口处

    @EnableEurekaServer
    @SpringBootApplication
    public class EurekaServerApplication {
         public static void main(String[] args) {
             new SpringApplicationBuilder(EurekaServerApplication.class);
         }
    }
    
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import({EurekaServerMarkerConfiguration.class})
    public @interface EnableEurekaServer {
    }
    
    @Configuration
    public class EurekaServerMarkerConfiguration {
        public EurekaServerMarkerConfiguration() {
        }
    
        @Bean
        public EurekaServerMarkerConfiguration.Marker eurekaServerMarkerBean() {
            return new EurekaServerMarkerConfiguration.Marker();
        }
    
        class Marker {
            Marker() {
            }
        }
    }
    

    从上面的代码可以看出,eureka服务端的启动时依赖于@EnableEurekaServer这个注解的,这个注解干了什么事情呢?引入EurekaServerMarkerConfiguration配置,这个是一个标记性质的配置,主要是生成一个EurekaServerMarkerConfiguration.Marker对象,那么为什么要生成这个对象

    @Configuration
    @Import(EurekaServerInitializerConfiguration.class)
    //eureka服务端的初始化配置依赖于EurekaServerMarkerConfiguration.Marker对象的存在与否
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,
            InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    
        private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
                "com.netflix.eureka" };
    
        @Autowired
        private ApplicationInfoManager applicationInfoManager;
    
        @Autowired
        private EurekaServerConfig eurekaServerConfig;
    
        @Autowired
        private EurekaClientConfig eurekaClientConfig;
        ....
    }
    

    从上面的代码可以看出当EurekaServerMarkerConfiguration.Marker对象存在时,才自动初始化eureka服务端

    简单看看eureka server初始化配置有哪些

    /*Eureka server对外提供的restful资源的Jersey resources 包列表*/
    private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
                "com.netflix.eureka" };
    
    //所有以EurekaConstants.DEFAULT_PREFIX为前缀的请求都转到Jersey
         Register the Jersey filter
        
        @Bean
        public FilterRegistrationBean jerseyFilterRegistration(
                javax.ws.rs.core.Application eurekaJerseyApp) {
            FilterRegistrationBean bean = new FilterRegistrationBean();
            bean.setFilter(new ServletContainer(eurekaJerseyApp));
            bean.setOrder(Ordered.LOWEST_PRECEDENCE);
            bean.setUrlPatterns(
                    Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
    
            return bean;
        }
    
        @Bean
        public javax.ws.rs.core.Application jerseyApplication(Environment environment,
                ResourceLoader resourceLoader) {
    
            ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
                    false, environment);
    
            // Filter to include only classes that have a particular annotation.
            //
            provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
            provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    
            // Find classes in Eureka packages (or subpackages)
            // 扫描EUREKA_PACKAGES 包下的Jersey资源 包含StatusResource,ApplicationsResource,InstancesResource。。。
            Set<Class<?>> classes = new HashSet<Class<?>>();
            for (String basePackage : EUREKA_PACKAGES) {
                Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
                for (BeanDefinition bd : beans) {
                    Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
                            resourceLoader.getClassLoader());
                    classes.add(cls);
                }
            }
    
            Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
            propsAndFeatures.put(
                    // Skip static content used by the webapp
                    ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
                    EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
    
            DefaultResourceConfig rc = new DefaultResourceConfig(classes);
            rc.setPropertiesAndFeatures(propsAndFeatures);
    
            return rc;
        }
    

    上面的配置定义了eureka server 对外提供的服务端口,包含实例的注册,实例的续约,实例的取消,已注册应用列表的获取,变更应用信息的获取,其他节点的信息同步等接口。

    配置eureka server的web UI 接口,可控制是否启用web 可视化端点

    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
       public EurekaController eurekaController() {
          return new EurekaController(this.applicationInfoManager);
       }
    }
    

    实例化服务实例具体操作实现,封装了实例的取消,注册,续约等操作

        @Bean
        public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
                ServerCodecs serverCodecs) {
            this.eurekaClient.getApplications(); // force initialization
            return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                    serverCodecs, this.eurekaClient,
                    this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
                    this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
        }
    

    封装eureka节点信息,以及eureka节点信息更新操作

     @Bean
     @ConditionalOnMissingBean
     public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) {
         return new EurekaServerAutoConfiguration.RefreshablePeerEurekaNodes(registry, 
         this.eurekaServerConfig,
         this.eurekaClientConfig, 
         serverCodecs,
         this.applicationInfoManager);
     }
    

    初始化eureka server环境上下文信息,引导eureka server启动

        @Bean
        public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
                PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
            return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
                    registry, peerEurekaNodes, this.applicationInfoManager);
        }
    
        @Bean
        public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
                EurekaServerContext serverContext) {
            return new EurekaServerBootstrap(this.applicationInfoManager,
                    this.eurekaClientConfig, this.eurekaServerConfig, registry,
                    serverContext);
        }
    

    分析一下EurekaServerBootstrap初始化了哪些信息

    在EurekaServerInitializerConfiguration中可以看到实现了SmartLifecycle接口,由spring管理EurekaServer的生命周期

    @Configuration
    @CommonsLog
    public class EurekaServerInitializerConfiguration
            implements ServletContextAware, SmartLifecycle, Ordered {
    
        @Autowired
        private EurekaServerConfig eurekaServerConfig;
    
        private ServletContext servletContext;
    
        @Autowired
        private ApplicationContext applicationContext;
    
        @Autowired
        private EurekaServerBootstrap eurekaServerBootstrap;
    
        private boolean running;
    
        private int order = 1;
    
        @Override
        public void setServletContext(ServletContext servletContext) {
            this.servletContext = servletContext;
        }
    
        @Override
        public void start() {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //TODO: is this class even needed now?
                        //初始化上下文信息
                        eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                        log.info("Started Eureka Server");
                        //发布初始化OK事件
                        publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                        EurekaServerInitializerConfiguration.this.running = true;
                        publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
                    }
                    catch (Exception ex) {
                        // Help!
                        log.error("Could not initialize Eureka servlet context", ex);
                    }
                }
            }).start();
        }
    
        @Override
        public void stop() {
            //生命周期结束,处理后续善后问题
            this.running = false;
            eurekaServerBootstrap.contextDestroyed(this.servletContext);
        }
    。。。
    }
    

    @see EurekaServerBootstrap.java

    public void contextInitialized(ServletContext context) {
            try {
                //初始化环境信息
                initEurekaEnvironment();
                initEurekaServerContext();
    
                context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
            }
            catch (Throwable e) {
                log.error("Cannot bootstrap eureka server :", e);
                throw new RuntimeException("Cannot bootstrap eureka server :", e);
            }
        }
    
        protected void initEurekaEnvironment() throws Exception {
            log.info("Setting the eureka configuration..");
    
            /** 数据中心*/
            String dataCenter = ConfigurationManager.getConfigInstance()
                    .getString(EUREKA_DATACENTER);
            if (dataCenter == null) {
                log.info(
                        "Eureka data center value eureka.datacenter is not set, defaulting to default");
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
            }
            else {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
            }
            /**设置环境 默认 test*/
            String environment = ConfigurationManager.getConfigInstance()
                    .getString(EUREKA_ENVIRONMENT);
            if (environment == null) {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
                log.info(
                        "Eureka environment value eureka.environment is not set, defaulting to test");
            }
            else {
                ConfigurationManager.getConfigInstance()
                        .setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
            }
        }
    
        protected void initEurekaServerContext() throws Exception {
            // For backward compatibility
            JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                    XStream.PRIORITY_VERY_HIGH);
            XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                    XStream.PRIORITY_VERY_HIGH);
    
            if (isAws(this.applicationInfoManager.getInfo())) {
                this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                        this.eurekaClientConfig, this.registry, this.applicationInfoManager);
                this.awsBinder.start();
            }
    
            EurekaServerContextHolder.initialize(this.serverContext);
    
            log.info("Initialized server context");
    
            //从临近的eureka节点拷贝服务注册信息
            int registryCount = this.registry.syncUp();
            //初始化过期节点清除定时任务
            this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
            // Register all monitoring statistics.
            EurekaMonitors.registerAllStats();
        }
    

    @see PeerAwareInstanceRegistryImpl.java

     /** 从临近的对等节点获取注册的服务信息,并注册到自己的注册表里*/
        @Override
        public int syncUp() {
            // Copy entire entry from neighboring DS node
            int count = 0;
            //根据设置的重试次数,获取并设置注册表信息,默认可重试5次,可见EurekaServerAutoConfiguration中的配置
            for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
                if (i > 0) {
                    try {
                        //每次重试间隔30秒  默认
                        Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                    } catch (InterruptedException e) {
                        logger.warn("Interrupted during registry transfer..");
                        break;
                    }
                }
                //这里并不是发送请求去获取,从代码中可看到数据来自localRegionApps.get();那么这个数据又是怎么来的?
                Applications apps = eurekaClient.getApplications();
                //获取到了注册数据则注册到自身注册表中
                for (Application app : apps.getRegisteredApplications()) {
                    for (InstanceInfo instance : app.getInstances()) {
                        try {
                            if (isRegisterable(instance)) {
                                register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                                count++;
                            }
                        } catch (Throwable t) {
                            logger.error("During DS init copy", t);
                        }
                    }
                }
            }
            return count;
        }
    

    跟着上面发现的问题,继续分析Applications apps = eurekaClient.getApplications();这里的数据来自何处

    @see DiscoveryClient

    /*这里便会请求其他eureka 节点,获取服务注册信息,所以前面那个地方才会有数据*/
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();
    
                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }
    
            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();
    
            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();
    
            // registry was fetched successfully, so return true
            return true;
        }
    

    对于过期未续约服务实例节点的处理,定时清理任务是如何处理的
    @see PeerAwareInstanceRegistryImpl.java

      @Override
        public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
            // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
            //每分钟期待接收到的续约数量 比如有17个注册实例,那么每分钟期待的续约就是34个 ,30秒回续约一次,阈值就是 34 * 0.85 = 28 ,取整啦
            this.expectedNumberOfRenewsPerMin = count * 2;
            //每分钟需要接收的续约请求阈值【期待续约的总量 * 配置的阈值百分比 默认 0.85】
            this.numberOfRenewsPerMinThreshold =
                    (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
            logger.info("Got " + count + " instances from neighboring DS node");
            logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
            this.startupTime = System.currentTimeMillis();
            if (count > 0) {
                this.peerInstancesTransferEmptyOnStartup = false;
            }
            DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
            //aws暂时不管
            boolean isAws = Name.Amazon == selfName;
            if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
                logger.info("Priming AWS connections for all replicas..");
                primeAwsReplicas(applicationInfoManager);
            }
            logger.info("Changing status to UP");
            applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
            //这里便会初始化清理的定时任务
            super.postInit();
        }
        
       protected void postInit() {
            //激活记录最后一分钟续约的数量,用于处理自我保护模式的开启问题
            renewsLastMin.start();
            if (evictionTaskRef.get() != null) {
                evictionTaskRef.get().cancel();
            }
            evictionTaskRef.set(new EvictionTask());
            evictionTimer.schedule(evictionTaskRef.get(),
                    serverConfig.getEvictionIntervalTimerInMs(),
                    serverConfig.getEvictionIntervalTimerInMs());
        }
    

    开始看清理过期节点任务之前,先看一下怎么记录续约次数的

    public class MeasuredRate {
        private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
        private final AtomicLong lastBucket = new AtomicLong(0);
        private final AtomicLong currentBucket = new AtomicLong(0);
    
        private final long sampleInterval;
        private final Timer timer;
    
        private volatile boolean isActive;
    
        /**
         * @param sampleInterval in milliseconds
         */
        public MeasuredRate(long sampleInterval) {
            this.sampleInterval = sampleInterval;
            this.timer = new Timer("Eureka-MeasureRateTimer", true);
            this.isActive = false;
        }
    
        public synchronized void start() {
            if (!isActive) {
                //激活记录最后一分钟的续约数量任务 sampleInterval 可以从代码看到是 1分钟
                timer.schedule(new TimerTask() {
    
                    @Override
                    public void run() {
                        try {
                            // 每分钟都会从头开始计算
                            lastBucket.set(currentBucket.getAndSet(0));
                        } catch (Throwable e) {
                            logger.error("Cannot reset the Measured Rate", e);
                        }
                    }
                }, sampleInterval, sampleInterval);
    
                isActive = true;
            }
        }
    
        public synchronized void stop() {
            if (isActive) {
                timer.cancel();
                isActive = false;
            }
        }
    
        /**
         * Returns the count in the last sample interval.
         */
        public long getCount() {
            return lastBucket.get();
        }
    
        /**
         * 每次续约都会自增1
         */
        public void increment() {
            currentBucket.incrementAndGet();
        }
    }
    

    通过查找调用关系,可以看到increment()方法被调用的地方AbstractInstanceRegistry里的renew方法,正是续约的方法


    知晓了续约数量的记录后再往下看过期节点清理任务

    class EvictionTask extends TimerTask {
    
            private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
            @Override
            public void run() {
                try {
                    //获取补偿时间,比如gc消耗的时间,或者时钟偏差
                    long compensationTimeMs = getCompensationTimeMs();
                    logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                    evict(compensationTimeMs);
                } catch (Throwable e) {
                    logger.error("Could not run the evict task", e);
                }
            }
    
            long getCompensationTimeMs() {
                //当前时间
                long currNanos = getCurrentTimeNano();
                //最后一次执行时间
                long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
                if (lastNanos == 0l) {
                    return 0l;
                }
                //当前时间与最后一次时间的时间差
                long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
                //时间差与执行时间间隔的差值即为需要补偿的时间
                long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
                return compensationTime <= 0l ? 0l : compensationTime;
            }
    
            long getCurrentTimeNano() {  // for testing
                return System.nanoTime();
            }
        }
    
     /*实际的过期节点清理逻辑*/
     public void evict(long additionalLeaseMs) {
            logger.debug("Running the evict task");
    
            if (!isLeaseExpirationEnabled()) {
                logger.debug("DS: lease expiration is currently disabled.");
                return;
            }
    
            /*
            *首先收集所有过期的实例集合,然后以随机的顺序清除,
            *如果不这么做的话,当大批量清除时,还没等到自我保护起作用,
            *可能就已经把所有应用都剔除掉啦,随机化逐出,
            *将影响均匀分摊在了整个应用里
            */
            List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
            for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                if (leaseMap != null) {
                    for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                        Lease<InstanceInfo> lease = leaseEntry.getValue();
                        //找出过期的节点,最后一次续约时间加上过期时间再加上补偿时间是否小于当前时间,小则过期啦
                        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                            expiredLeases.add(lease);
                        }
                    }
                }
            }
    
            // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
            // triggering self-preservation. Without that we would wipe out full registry.
            int registrySize = (int) getLocalRegistrySize();
            int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
            int evictionLimit = registrySize - registrySizeThreshold;
    
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
                Random random = new Random(System.currentTimeMillis());
                for (int i = 0; i < toEvict; i++) {
                    // Pick a random item (Knuth shuffle algorithm)
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = expiredLeases.get(i);
    
                    String appName = lease.getHolder().getAppName();
                    String id = lease.getHolder().getId();
                    EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    internalCancel(appName, id, false);
                }
            }
        }
    

    先看是否允许清理
    @see PeerAwareInstanceRegistryImpl.java

    @Override
    public boolean isLeaseExpirationEnabled() {
        #如果自我保护模式不打开,则直接允许清理过期节点
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        
        //如果保护模式打开啦,则如果最后一分钟续约的数量大于计算的阈值,则允许,否则不允许清除
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }
    
    @Override
    public boolean isSelfPreservationModeEnabled() {
        return serverConfig.shouldEnableSelfPreservation();
    }
    
    从注册表移除过期节点
    protected boolean internalCancel(String appName, String id, boolean isReplication) {
            try {
                read.lock();
                //数据统计
                CANCEL.increment(isReplication);
                Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
                Lease<InstanceInfo> leaseToCancel = null;
                if (gMap != null) {
                    //从注册表中移除
                    leaseToCancel = gMap.remove(id);
                }
                synchronized (recentCanceledQueue) {
                    recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
                }
                InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
                if (instanceStatus != null) {
                    logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
                }
                if (leaseToCancel == null) {
                    CANCEL_NOT_FOUND.increment(isReplication);
                    logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                    return false;
                } else {
                    //这里将状态变更了的实例信息添加到 recentlyChangedQueue 队列,这个是干嘛?什么目的?
                    leaseToCancel.cancel();
                    InstanceInfo instanceInfo = leaseToCancel.getHolder();
                    String vip = null;
                    String svip = null;
                    if (instanceInfo != null) {
                        instanceInfo.setActionType(ActionType.DELETED);
                        recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                        instanceInfo.setLastUpdatedTimestamp();
                        vip = instanceInfo.getVIPAddress();
                        svip = instanceInfo.getSecureVipAddress();
                    }
                    
                    //让responseCache缓存失效,这缓存干嘛用的?
                    invalidateCache(appName, vip, svip);
                    logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                    return true;
                }
            } finally {
                read.unlock();
            }
        }
    

    为什么recentlyChangedQueue 保存最近变更的实例信息

    由于实例状态的变更相对来说,是不会频繁变更的,所以如果客户端要保持与eureka服务端的注册信息的同步,
    需要通过心跳来实现,如果每次心跳都全量获取实例信息,集群庞大的话,效率较低,存在大量重复的信息,
    浪费带宽,所以通过增量获取已变更的实例信息会是一个更好的选择,通过源码可追溯到eureka服务端提供了增量获取变更的注册实例信息的端点

    @see ApplicationsResource.java

    /**
    *获取有关所有增量更改的信息[注册,取消,状态更改和过期],一般对注册表的更改很少,因此只获得 *delta比获得完整的注册表更有效率,由于delta信息在一段时间内被缓存,因此请求
    *可以在配置的缓存刷新时间内多次返回相同的数据
    */
    @Path("delta")
    @GET
    public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
        @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
        @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
    
    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    
    // If the delta flag is disabled in discovery or if the lease expiration
    // has been disabled, redirect clients to get all instances
    //是否禁止增量获取
    if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
        return Response.status(Status.FORBIDDEN).build();
    }
    
    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL_DELTA.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
    }
    
    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }
    
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );
    //从缓存获取增量数据 这里就使用了responseCache
    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
    

    看一下增量信息的来源
    @see AbstractInstanceRegistry.java

    public Applications getApplicationDeltas() {
            GET_ALL_CACHE_MISS_DELTA.increment();
            Applications apps = new Applications();
            apps.setVersion(responseCache.getVersionDelta().get());
            Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
            try {
                write.lock();
                //终于看到了recentlyChangedQueue队列,原来是用来处理增量更新的信息同步的
                Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
                logger.debug("The number of elements in the delta queue is :"
                        + this.recentlyChangedQueue.size());
                while (iter.hasNext()) {
                    Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                    InstanceInfo instanceInfo = lease.getHolder();
                    Object[] args = {instanceInfo.getId(),
                            instanceInfo.getStatus().name(),
                            instanceInfo.getActionType().name()};
                    logger.debug(
                            "The instance id %s is found with status %s and actiontype %s",
                            args);
                    Application app = applicationInstancesMap.get(instanceInfo
                            .getAppName());
                    if (app == null) {
                        app = new Application(instanceInfo.getAppName());
                        applicationInstancesMap.put(instanceInfo.getAppName(), app);
                        apps.addApplication(app);
                    }
                    app.addInstance(decorateInstanceInfo(lease));
                }
    
                boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
    
                if (!disableTransparentFallback) {
                    Applications allAppsInLocalRegion = getApplications(false);
    
                    for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                        Applications applications = remoteRegistry.getApplicationDeltas();
                        for (Application application : applications.getRegisteredApplications()) {
                            Application appInLocalRegistry =
                                    allAppsInLocalRegion.getRegisteredApplications(application.getName());
                            if (appInLocalRegistry == null) {
                                apps.addApplication(application);
                            }
                        }
                    }
                }
    
                Applications allApps = getApplications(!disableTransparentFallback);
                apps.setAppsHashCode(allApps.getReconcileHashCode());
                return apps;
            } finally {
                write.unlock();
            }
        }
    

    responseCache 这个比较简单,直接看ResponseCacheImpl实现基本就明白了,里面会有一个定时任务专门处理
    只读缓存的更新,默认30s更新一次,如果关闭使用只读缓存,则每次拿的都是最新的

    recentlyChangedQueue这个里面的变更信息数据多久会删掉呢
    @see AbstractInstanceRegistry.java

     protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
            this.serverConfig = serverConfig;
            this.clientConfig = clientConfig;
            this.serverCodecs = serverCodecs;
            this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
            this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
    
            this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
            
            //清理最近的变更信息队列里过期的数据 30s执行一次
            this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                    serverConfig.getDeltaRetentionTimerIntervalInMs(),
                    serverConfig.getDeltaRetentionTimerIntervalInMs());
        }
        
        private TimerTask getDeltaRetentionTask() {
            return new TimerTask() {
         
                @Override
                public void run() {
                    Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                    while (it.hasNext()) {
                        //移除超过3分钟的数据
                        if (it.next().getLastUpdateTime() <
                                System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                            it.remove();
                        } else {
                            break;
                        }
                    }
                }
            };
        }
    

    相关文章

      网友评论

          本文标题:eureka服务端源码解读

          本文链接:https://www.haomeiwen.com/subject/yjkvultx.html