美文网首页
Spring Cloud Eureka 源码分析 —— 服务访问

Spring Cloud Eureka 源码分析 —— 服务访问

作者: 想起个帅气的头像 | 来源:发表于2021-01-30 01:26 被阅读0次

    一. 前言

    本篇主要说明eureka客户端与服务端间感知延迟的原因,并从源码层面解释各个延迟点的源码实现,以及如何保证调用方平滑感知实例上下线。

    二. 服务上线

    1. service注册到服务端(启动后即时注册),0s
    2. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
    3. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
    4. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

    服务上线被客户端感知的最大耗时为90s

    三. 服务下线

    正常下线

    1. service下线发起cancel请求到服务端(服务停止前即时请求),0s
    2. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
    3. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
    4. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

    服务正常下线被客户端感知的最大耗时为90s,延迟点跟上线完全一致。

    异常下线

    异常下线指service下线时并没有主动发送cancel请求,例如kill -9 或直接宕机。

    1. 服务端剔除(evict)过期任务的执行周期, evictionIntervalTimerInMs=60s
    2. 剔除任务会对90s内未发起续约的请求进行剔除,leaseExpirationDurationInSeconds=90s
    3. 服务端只读/读写缓存同步周期,responseCacheUpdateIntervalMs=30s
    4. 客户端拉取最新注册表周期,registryFetchIntervalSeconds=30s
    5. 客户端ribbon缓存serverList更新周期,ServerListRefreshInterval=30s

    服务异常下线被客户端感知的最大耗时为240s

    下面将依次分析每个步骤所在源码的位置及实现。

    四. 源码分析

    服务端只读/读写缓存同步

    ResponseCacheImpl

      // responseCacheUpdateIntervalMs 默认30s
      if (shouldUseReadOnlyResponseCache) {
          timer.schedule(getCacheUpdateTask(),
                  new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                          + responseCacheUpdateIntervalMs),
                  responseCacheUpdateIntervalMs);
      }
    
      private TimerTask getCacheUpdateTask() {
          return new TimerTask() {
              @Override
              public void run() {
                  for (Key key : readOnlyCacheMap.keySet()) {
                      try {
                          CurrentRequestVersion.set(key.getVersion());
                          Value cacheValue = readWriteCacheMap.get(key);
                          Value currentCacheValue = readOnlyCacheMap.get(key);
                          if (cacheValue != currentCacheValue) {
                              readOnlyCacheMap.put(key, cacheValue);
                          }
                      } catch (Throwable th) {
                          logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                      }
                  }
              }
          };
      }
    

    getCacheUpdateTask的schedule每隔30s执行一次,遍历readOnlyCacheMap中的每个key,从readWriteCacheMap中取出最新值,保存到value中。

    如果是新的key,readOnlyCacheMap之前并没有缓存,则会在getValue时,完成readOnlyCacheMap的填充。

    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
     }
    

    客户端拉取最新注册表任务

    DiscoveryClient

    /**
     * Initializes all scheduled tasks.
     */
    // registryFetchIntervalSeconds 默认值30s
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        ...
    }
    

    在client端启动过程中,DiscoveryClient的构造方法中初始化了多个schedule任务,其中一个就是开启周期拉取服务端注册表任务,周期时间为30s。执行任务是new CacheRefreshThread(),拉取到的最新注册表会保存到本地缓存中localRegionApps。
    AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>()

    客户端ribbon缓存serverList更新

    PollingServerListUpdater

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
            // initialDelayMs 默认1s
            // refreshIntervalMs 默认30s
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
    

    在ribbon启动过程中,DynamicServerListLoadBalancer的构造方法调用了serverListUpdater.start(updateAction); 开启了周期刷新serverList的任务,每隔30s执行一次。
    执行的任务是doUpdate()

      protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
          @Override
          public void doUpdate() {
              updateListOfServers();
          }
      };
    
      public void updateListOfServers() {
          List<T> servers = new ArrayList<T>();
          if (serverListImpl != null) {
              servers = serverListImpl.getUpdatedListOfServers();
              
              if (filter != null) {
                  servers = filter.getFilteredListOfServers(servers);
              }
          }
          updateAllServerList(servers);
       }
    

    这里在获取最新的server注册表时,使用的是eureka缓存的值,localRegionAppsremoteRegionVsApps,并没有发起远程拉取注册表的请求。将更新后的serverList缓存到BaseLoadBalancer父类中

       @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> allServerList = Collections
                .synchronizedList(new ArrayList<Server>());
        @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> upServerList = Collections
                .synchronizedList(new ArrayList<Server>());
    

    我们在自定义ribbon rule时,继承AbstractLoadBalancerRule,即可直接通过getLoadBalancer()来获取当前的serverList。

    服务端剔除(evict)过期任务

    AbstractInstanceRegistry

    class EvictionTask extends TimerTask {
    
        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
    
        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
    }
    
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        // EvictionIntervalTimerInMs 默认60s
        evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
    }
    

    EurekaBootStrap初始化eureka上下文过程中,内部调用AbstractInstanceRegistry.postInit开启EvictionTask, 每隔60s执行一次剔除任务。在剔除过程中会计算每次的补偿时间(compensationTimeMs),防止因为gc或时钟回拨等因素产生误差。

    剔除任务会对90s内未发起续约的请求进行剔除

    AbstractInstanceRegistry

    public void evict(long additionalLeaseMs) {
        // ... 
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    // 这里用于判定是否过期
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // ... 后续将过期的实例随机剔除一部分,不超过总实例数的15%。
    }
    
    // 判定是否过期,additionalLeaseMs为补偿时间
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
    

    通过判定过期时间可以发现,两次的续约时间差需要小于(duration + additionalLeaseMs),假如不考虑补偿因素,那么续约时间差需小于duration

    再看下duration是怎么来的。

    public Lease(T r, int durationInSecs) {
        holder = r;
        registrationTimestamp = System.currentTimeMillis();
        lastUpdateTimestamp = registrationTimestamp;
        duration = (durationInSecs * 1000);
    }
    

    首先,duration是通过Lease的构造方法赋值,而lease对象的取值是从registry缓存中获得的,registry缓存则是在实例注册方法实现中进行保存。

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        //...
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        //...
    }
    

    通过实例注册的方法实现可以看到,判定过期周期时间是在InstanceInfo中定义的,如果为空,则使用默认值Lease.DEFAULT_DURATION_IN_SECS是90s。

    InstanceInfo是由客户端侧将本次注册的实例信息传递来,所以继续看客户端对于InstanceInfo的封装过程。

    客户端的注册实现主要在DiscoveryClient中。

    InstanceInfo myInfo = applicationInfoManager.getInfo();
    // ...
    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    

    EurekaClientAutoConfiguration

    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }
    

    注册参数instanceInfo是在注入过程中保存到了ApplicationInfoManager中,ApplicationInfoManager在创建时,会通过InstanceInfoFactory工厂来创建一个InstanceInfo的实例,duration则定义在了factory构造中。

    public class InstanceInfoFactory {
    
        public InstanceInfo create(EurekaInstanceConfig config) {
            LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                    .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
    
            // Builder the instance information to be registered with eureka server
            InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
            // ... build各种参数
            
            InstanceInfo instanceInfo = builder.build();
            instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
            return instanceInfo;
        }
        // ...
    }
    

    可以看到,这里使用config.getLeaseExpirationDurationInSeconds()作为duration的值,在EurekaInstanceConfigBean中配置了duration的值,默认90s

    private int leaseExpirationDurationInSeconds = 90;
    

    至此我们可以确定,客户端默认使用90s作为实例剔除的过期时间。

    五. 配置优化

    经过上面的分析,我们了解到各个情况服务感知的所有延迟点,以及实现原理。在此基础上,我们可以对真实场景下eureka参数配置进行适当优化。

    主要解决两个问题:

    1. 减少客户端的感知时间
    2. 保证客户端可以正确访问已从erueka下线的实例,不会因为其下线而缓存未及时更新导致失败

    1. 减少客户端的感知时间

    eureka server 配置

    服务端只读/读写缓存同步周期缩短到10s,因为只限于内存间两个map的操作,可以大幅缩短缓存同步时间 (30s -> 10s)

    eureka.server.response-cache-update-interval-ms=10000
    

    eureka client 配置

    客户端拉取最新注册表周期缩短到10s,因为客户端每次只会主动拉取增量配置,这里也适当缩短拉取时间 (30s -> 10s)

    eureka.client.registry-fetch-interval-seconds=10
    

    客户端ribbon缓存serverList更新周期缩短到5s,ribbon的更新只会进行内存间的同步,这里可以大幅度缩短时间 (30s -> 5s)

    ribbon.ServerListRefreshInterval=5000
    

    此时,服务上线感知时间最大耗时 25s

    2. 旧实例从eureka server下线后继续保持可用(平滑启动)

    eureka为了保证调用的高效率和高可用性,在内部模型中加入了各级缓存(包括ribbon),这就导致如果旧实例下线后,如果客户端没有及时把旧实例地址剔除,请求仍然可以被打到下线实例上导致报错。

    结合上文的内容,如果想实现平滑启动需要完成以下几步:

    1. 旧实例shutdown前,eureka client需要感知实例即将关闭,并及时告知eureka server即将下线
    2. 调用方需要尽可能快速感知到旧实例的状态变化
    3. 旧实例从发送下线通知到彻底shutdown这个周期需要被拉长,来保证客户端更新缓存前,请求打到此实例上依然可以处理。

    继续从源码层面分析这三步该如何实现。

    2.1 eureka client感知实例即将关闭

    spring通过在启动过程注册shutdown hook,当实例关闭前,会发送ContextClosedEvent事件。

    关于Spring处理服务关闭的详细过程请参考 Spring 源码分析 —— 服务优雅关闭

    eureka会监听ContextClosedEvent事件,来完成通知server端下线的操作。

    EurekaAutoServiceRegistration

    public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            if (event instanceof WebServerInitializedEvent) {
                onApplicationEvent((WebServerInitializedEvent) event);
            }
            else if (event instanceof ContextClosedEvent) {
                onApplicationEvent((ContextClosedEvent) event);
            }
        }
        public void onApplicationEvent(ContextClosedEvent event) {
            if (event.getApplicationContext() == context) {
                stop();
            }
        }
    
        @Override
        public void stop() {
            this.serviceRegistry.deregister(this.registration);
            this.running.set(false);
        }
    }
    

    EurekaAutoServiceRegistration实现SmartApplicationListener接口来监听ContextClosedEvent事件,最终会调用deregister

    除此之外,在Spring 源码分析 —— 服务优雅关闭 一篇分析到spring的shutdown hook除了发送ContextClosedEvent事件之外,还会调用所有的lifecycle的stop方法,实现所有lifecycle的关闭动作,所以这里的stop方法也会在事件处理完成之后再次被调用,最终也会调用deregister

    @Override
    public void deregister(EurekaRegistration reg) {
        if (reg.getApplicationInfoManager().getInfo() != null) {
            if (log.isInfoEnabled()) {
                log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status DOWN");
            }
            reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
        }
    }
    

    deregister方法将ApplicationInfo中的实例status修改成DOWN,eureka内部会监听状态变更事件发送给eureka server端,此时server是可以及时更新实例状态为DOWN。

    2.2 调用方尽快感知server中实例新状态

    上文中也讲到,调用方服务有两层缓存,分别是eureka客户端拉取server列表的localRegionApps缓存和ribbon的serverList缓存,为此我们将更新周期分别改成了10s和5s

    eureka.client.registry-fetch-interval-seconds=10
    
    ribbon.ServerListRefreshInterval=5000
    

    这样客户端最大的感知时间就是15s,我们需要保证在这15s内,访问旧实例不会失败,因此需要拉长旧实例的下线时间。

    2.3 拉长旧实例的下线时间

    想要拉长下线时间比较容易,通过sleep就可以,但是有个前提:必须保证server端的实例状态已经为DOWN, 且servlet容器没有被停止,在这个阶段的sleep才有意义。

    如何来保证sleep的时机,需要继续深入分析eureka的实现。

    如何确定sleep线程的时机

    前文已经提到,eureka中EurekaAutoServiceRegistration分别实现了SmartLifecycle, SmartApplicationListener,因此有两个入口来感知关闭事件,分别是

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }
    
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof WebServerInitializedEvent) {
            onApplicationEvent((WebServerInitializedEvent) event);
        }
        else if (event instanceof ContextClosedEvent) {
            onApplicationEvent((ContextClosedEvent) event);
        }
    }
    
    这两个入口的调用先后顺序是如何的?

    Spring 源码分析 —— 服务优雅关闭中有提到,AbstractApplicationContext#doClose()方法的实现中,会先发送ContextClosedEvent事件,再通过lifecycleProcessor调用所有lifecycle的stop

    AbstractApplicationContext

    protected void doClose() {
        // ...
        try {
            // Publish shutdown event.
            // 发布ContextClosedEvent关闭事件
            publishEvent(new ContextClosedEvent(this));
        }
        catch (Throwable ex) {
            logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
        }
    
        // Stop all Lifecycle beans, to avoid delays during individual destruction.
        // 调用所有lifecycle子类bean的关闭方法
        if (this.lifecycleProcessor != null) {
            try {
                this.lifecycleProcessor.onClose();
            }
            catch (Throwable ex) {
                logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
            }
        }
        // ...
    }
    

    可以得知,listener的处理先于lifecycle的处理

    了解了eureka停止时机之后,还需要看看servlet容器是何时停止的。

    servlet容器(Tomcat)何时被停止

    从这张继承关系图可以发现,除了EurekaAutoServiceRegistration之外,还有两个WebServer的bean,WebServerGracefulShutdownLifecycleWebServerStartStopLifecycle也同样实现了SmartLifecycle

    WebServerStartStopLifecycle 负责webServer的启动和停止
    WebServerStartStopLifecycle 只负责webServer的优雅停止(默认不执行)

    class WebServerStartStopLifecycle implements SmartLifecycle {
    
        private final ServletWebServerApplicationContext applicationContext;
    
        private final WebServer webServer;
    
        private volatile boolean running;
    
        WebServerStartStopLifecycle(ServletWebServerApplicationContext applicationContext, WebServer webServer) {
            this.applicationContext = applicationContext;
            this.webServer = webServer;
        }
    
        @Override
        public void start() {
            this.webServer.start();
            this.running = true;
            this.applicationContext
                    .publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
        }
    
        @Override
        public void stop() {
            this.webServer.stop();
        }
    
        @Override
        public boolean isRunning() {
            return this.running;
        }
    
        @Override
        public int getPhase() {
            return Integer.MAX_VALUE - 1;
        }
    }
    
    class WebServerGracefulShutdownLifecycle implements SmartLifecycle {
    
        private final WebServer webServer;
    
        private volatile boolean running;
    
        WebServerGracefulShutdownLifecycle(WebServer webServer) {
            this.webServer = webServer;
        }
    
        @Override
        public void start() {
            this.running = true;
        }
    
        @Override
        public void stop() {
            throw new UnsupportedOperationException("Stop must not be invoked directly");
        }
    
        @Override
        public void stop(Runnable callback) {
            this.running = false;
            this.webServer.shutDownGracefully((result) -> callback.run());
        }
    
        @Override
        public boolean isRunning() {
            return this.running;
        }
    }
    
    SmartLifecycle实现类的执行顺序是如何的?

    他们的执行顺序是由getPhase的值决定的。在spring启动过程中,会根据phase的值从小到大执行,在停止过程中,会从大到小执行(注意是相反的),具体的实现在DefaultLifecycleProcessor中。

    SmartLifecycle phase
    EurekaAutoServiceRegistration 0
    WebServerStartStopLifecycle Integer.MAX_VALUE - 1
    WebServerGracefulShutdownLifecycle Integer.MAX_VALUE

    启动顺序: EurekaAutoServiceRegistration -> WebServerStartStopLifecycle -> WebServerGracefulShutdownLifecycle
    停止顺序: WebServerGracefulShutdownLifecycle -> WebServerStartStopLifecycle -> EurekaAutoServiceRegistration

    结合上面的分析,整个shutdown的执行顺序如下图:

    shutdown 执行顺序

    至此,我们可以在图中插入点1插入点2完成sleep的操作。

    但是因为WebServerGracefulShutdownLifecycle已经是最高优先级了,如果我们默认没有开启优雅关闭,可以在插入点2实现SmartLifecycle并配置最高优先级,否则为了稳妥和保证扩展性,更应该在插入点1来完成。

    插入点1: SmartApplicationListener实现
    @Slf4j
    public class UnawareBootListener implements SmartApplicationListener {
    
        // server读写cache的同步周期 
        public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
        // eureka.client 配置
        private final EurekaClientOptimizeConfigBean eurekaConfig;
        // ribbon 配置
        private final RibbonOptimizeConfigBean ribbonConfig;
    
        public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
            this.eurekaConfig = eurekaConfig;
            this.ribbonConfig = ribbonConfig;
        }
    
        @Override
        public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
            return ContextClosedEvent.class.isAssignableFrom(eventType);
        }
    
        // eureka是0,这里设置成1,比eureka低1级
        @Override
        public int getOrder() {
            return 1;
        }
    
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
            if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
                registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
            }
            Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
            if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
                serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
            }
            int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
                    + DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
            try {
                Thread.sleep(shutDownWaitTime * 1000L);
            } catch (InterruptedException e) {
                log.warn("UnawareBootListener wait to shutdown interrupted");
            }
            log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
        }
    }
    
    插入点2: lifecycle实现
    @Slf4j
    public class UnawareBoot implements SmartLifecycle {
    
        // server读写cache的同步周期 
        public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
        // eureka.client 配置
        private final EurekaClientOptimizeConfigBean eurekaConfig;
        // ribbon 配置
        private final RibbonOptimizeConfigBean ribbonConfig;
        private volatile boolean running = false;
    
        public UnawareBoot(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
            this.eurekaConfig = eurekaConfig;
            this.ribbonConfig = ribbonConfig;
        }
    
        @Override
        public void start() {
            running = true;
        }
    
        @Override
        public void stop() {
            running = false;
            Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
            if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
                registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
            }
            Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
            if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
                serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
            }
            int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000) + EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
            try {
                Thread.sleep(shutDownWaitTime * 1000L);
            } catch (InterruptedException e) {
                log.warn("UnawareBoot wait to shutdown interrupted");
            }
            log.info("UnawareBoot wait to shutdown seconds: {}s finish", shutDownWaitTime);
        }
    
        /**
         * 设置最高优先级,stop时优先阻塞
         */
        @Override
        public int getPhase() {
            return Integer.MAX_VALUE;
        }
    
        @Override
        public boolean isRunning() {
            return running;
        }
    }
    

    看上去很完美,整个思路没有问题,确实可以解决平滑重启的问题,但是中间少考虑的一点,就是eureka client续约心跳任务。如果当前代码在sleep之前,client先发送了续约请求,那样同步给server的状态就从DOWN变成了UP。

    漏洞修复

    心跳续约任务实现

    InstanceInfoReplicator

     public void run() {
        try {
            // 刷新实例状态,也就是这个方法将之前的DOWN转为了UP
            discoveryClient.refreshInstanceInfo();
    
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    

    DiscoveryClient

    void refreshInstanceInfo() {
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        applicationInfoManager.refreshLeaseInfoIfRequired();
    
        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }
    
        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }
    

    InstanceInfoReplicator心跳续约任务会每隔replicationIntervalSeconds(默认30s),向server同步当前状态,同步之前会计算当前的最新状态。计算状态由DiscoveryClient.getHealthCheckHandler().getStatus()完成。

    EurekaHealthCheckHandler

    @Override
    public InstanceStatus getStatus(InstanceStatus instanceStatus) {
        return getHealthStatus();
    }
    
    protected InstanceStatus getHealthStatus() {
        final Status status;
        // statusAggregator默认会在初始化时注入
        if (statusAggregator != null) {
            status = getStatus(statusAggregator);
        }
        else {
            status = getStatus(getHealthIndicator());
        }
        return mapToInstanceStatus(status);
    }
    
    protected Status getStatus(StatusAggregator statusAggregator) {
        Status status;
    
        Set<Status> statusSet = new HashSet<>();
        if (healthIndicators != null) {
            statusSet.addAll(
                    healthIndicators.values().stream().map(HealthIndicator::health)
                                .map(Health::getStatus).collect(Collectors.toSet()));
        }
    
        if (reactiveHealthIndicators != null) {
            statusSet.addAll(reactiveHealthIndicators.values().stream()
                    .map(ReactiveHealthIndicator::health).map(Mono::block)
                    .filter(Objects::nonNull).map(Health::getStatus)
                    .collect(Collectors.toSet()));
        }
        // 这个方法会将set集合中的每个status进行排序,返回order最低的一个set
        status = statusAggregator.getAggregateStatus(statusSet);
        return status;
    }
    

    这个getStatus会计算当前最新的状态,计算的方式遍历所有的healthIndicators,基于当前实例的各种状态、参数、数据库状态等分别计算Status,构成一个Set<Status>集合。

    SimpleStatusAggregator

    @Override
    public Status getAggregateStatus(Set<Status> statuses) {
        return statuses.stream().filter(this::contains).min(this.comparator).orElse(Status.UNKNOWN);
    }
    

    SimpleStatusAggregator会将Set集合进行排序,返回order最低的一个set,默认顺序从低到高依次是
    DOWN -> OUT_OF_SERVICE -> UP -> UNKNOWN

    如果一切正常,这里比较后的状态就是UP,重新设置到InstanceInfo中,变更事件会将此次变更发给server,server中的实例状态就被更新为UP。

    解释完心跳续约的过程之后,我们知道,如果只是单纯依赖变更事件去同步server实例DOWN的状态是不严谨的。需要彻底将eureka shutdown才可以。

    shutdown eureka client

    先看下eureka自己是怎么实现shutdown的。

    EurekaClientAutoConfiguration

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
    @org.springframework.cloud.context.config.annotation.RefreshScope
    @Lazy
    public EurekaClient eurekaClient(ApplicationInfoManager manager,
            EurekaClientConfig config, EurekaInstanceConfig instance,
            @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
                
        ApplicationInfoManager appManager;
        if (AopUtils.isAopProxy(manager)) {
            appManager = ProxyUtils.getTargetObject(manager);
        }
        else {
            appManager = manager;
        }
        CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);
        cloudEurekaClient.registerHealthCheck(healthCheckHandler);
        return cloudEurekaClient;
    }
    

    EurekaClient的定义中,指定了destroyMethod属性,当bean在被回收时,会调用此方法。
    DiscoveryClient

    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");
    
            if (statusChangeListener != null && applicationInfoManager != null) {
                // 从applicationInfoManager移除事件变更监听器
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }
            // 取消所有的定时任务
            cancelScheduledTasks();
    
            // If APPINFO was registered
            if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) {
                // 设置状态为DOWN,并主动发送注销请求cancel到server端,这里不在依赖监听器发送
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
                unregister();
            }
            // 关闭Transport client
            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }
            // 关闭各种Monitor
            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();
    
            Monitors.unregisterObject(this);
    
            logger.info("Completed shut down of DiscoveryClient");
        }
    }
    
    private void cancelScheduledTasks() {
        // 停止心跳续约任务
        if (instanceInfoReplicator != null) {
            instanceInfoReplicator.stop();
        }
        // 停止心跳续约执行器
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdownNow();
        }
        // 停止定时拉取server注册表执行器
        if (cacheRefreshExecutor != null) {
            cacheRefreshExecutor.shutdownNow();
        }
        // 停止ScheduledExecutorService
        if (scheduler != null) {
            scheduler.shutdownNow();
        }
        // 停止定时拉取server注册表任务
        if (cacheRefreshTask != null) {
            cacheRefreshTask.cancel();
        }
        // 停止心跳任务
        if (heartbeatTask != null) {
            heartbeatTask.cancel();
        }
    }
    

    shutdown主要完成了几件事:移除事件变更监听器停止所有的定时任务设置实例状态为DOWN发起注销请求关闭Transport client

    了解了整个关闭过程之后,如果我们想彻底保证server的注册表处于DOWN的状态,只需要手动调用DiscoveryClient.shutdown()
    准确来说shutdown之后的server注册表已经把当前实例下掉了,不再显示DOWN状态。

    五. 平滑启动完整版实现

    @Slf4j
    public class UnawareBootListener implements SmartApplicationListener, ApplicationContextAware {
    
        // server读写cache的同步周期 
        public static final Integer EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS = 10;
        // eureka.client 配置
        private final EurekaClientOptimizeConfigBean eurekaConfig;
        // ribbon 配置
        private final RibbonOptimizeConfigBean ribbonConfig;
    
        public UnawareBootListener(EurekaClientOptimizeConfigBean eurekaConfig, RibbonOptimizeConfigBean ribbonConfig) {
            this.eurekaConfig = eurekaConfig;
            this.ribbonConfig = ribbonConfig;
        }
        
        @Override
        public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
            return ContextClosedEvent.class.isAssignableFrom(eventType);
        }
    
        //之前指定order的值也可以忽略了,都已经主动shutdown,不需要在关心listener的顺序
    
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
            // 主动触发eureka client shutdown
            discoveryClient.shutdown();
            Integer registryFetchIntervalSeconds = eurekaConfig.getRegistryFetchIntervalSeconds();
            if (registryFetchIntervalSeconds == null || registryFetchIntervalSeconds <= 0) {
                registryFetchIntervalSeconds = DEFAULT_FETCH_INTERVAL_SECONDS;
            }
            Integer serverListRefreshInterval = ribbonConfig.getServerListRefreshInterval();
            if (serverListRefreshInterval == null || serverListRefreshInterval <= 0) {
                serverListRefreshInterval = DEFAULT_SERVER_LIST_REFRESH_INTERVAL_SECONDS;
            }
            int shutDownWaitTime = registryFetchIntervalSeconds + (serverListRefreshInterval / 1000)
                    + DEFAULT_EUREKA_READ_WRITE_CACHE_SYNC_INTERVAL_SECONDS;
            try {
                Thread.sleep(shutDownWaitTime * 1000L);
            } catch (InterruptedException e) {
                log.warn("UnawareBootListener wait to shutdown interrupted");
            }
            log.info("UnawareBootListener wait to shutdown seconds: {}s finish", shutDownWaitTime);
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
    

    六. 总结

    本文从源码层面剖析了eureka内部如何感知实例上下线,如何刷新缓存等,并给出了解决平滑启动的最佳实践。

    写在文末:

    实践代码虽然只有区区几十行,但至少需要了解上百倍代码量实现。了解如何启动、如何刷新、如何停止,考虑前后依赖的各种组件,前后耗时一个月,花了十几个小时,才写出这几十行代码。

    相关文章

      网友评论

          本文标题:Spring Cloud Eureka 源码分析 —— 服务访问

          本文链接:https://www.haomeiwen.com/subject/rcnyzktx.html