美文网首页程序员
apollo客户端探索

apollo客户端探索

作者: yeren108 | 来源:发表于2018-08-15 20:45 被阅读863次

    Config的获取

    我们首先从最简的apollo使用上开始。apollo为我们提供了一个操作类ConfigService,通过这个类我们能获取到配置(Config类)。我们来探究这个类是如何得到的。

    Config config = ConfigService.getAppConfig();
    

    看一下调用链:

    public static Config getAppConfig() {
        return getConfig(ConfigConsts.NAMESPACE_APPLICATION);
      }
    
    public static Config getConfig(String namespace) {
        return s_instance.getManager().getConfig(namespace);
      }
    

    到这里我们可以看到首先要获一个ConfigManager(DefaultConfigManager),它是一个默认的配置管理器,这个配置管理器仅仅提供两个方法:
    一个是用来获取配置的(根据namespace)

    @Override
      public Config getConfig(String namespace) {
        Config config = m_configs.get(namespace);
    
        if (config == null) {
          synchronized (this) {
            config = m_configs.get(namespace);
    
            if (config == null) {
              ConfigFactory factory = m_factoryManager.getFactory(namespace);
    
              config = factory.create(namespace);
              m_configs.put(namespace, config);
            }
          }
        }
    
        return config;
      }
    

    通过这个方法我们可以得知,要获得那个Config必须通过ConfigFactory创建出来。而ConfigFactory又是通过ConfigFactoryManager(DefaultConfigFactoryManager)获得的。从DefaultConfigFactoryManager类中我们看一下ConfigFactory的获取逻辑:

    @Override
      public ConfigFactory getFactory(String namespace) {
        // step 1: check hacked factory
        ConfigFactory factory = m_registry.getFactory(namespace);
    
        if (factory != null) {
          return factory;
        }
    
        // step 2: check cache
        factory = m_factories.get(namespace);
    
        if (factory != null) {
          return factory;
        }
    
        // step 3: check declared config factory
        factory = ApolloInjector.getInstance(ConfigFactory.class, namespace);
    
        if (factory != null) {
          return factory;
        }
    
        // step 4: check default config factory
        factory = ApolloInjector.getInstance(ConfigFactory.class);
    
        m_factories.put(namespace, factory);
    
        // factory should not be null
        return factory;
      }
    

    首先从配置注册器(ConfigRegistry)中获取配置工厂(ConfigFactory),
    如果这个配置工厂(ConfigFactory->DefaultConfigFactory)不存在则从缓存中找,还找不到则创建一个放到一个缓存中,以备下次使用。

    再看看工厂是如何获取配置的,在DefaultConfigFactory的create方法:

    @Override
      public Config create(String namespace) {
        DefaultConfig defaultConfig =
            new DefaultConfig(namespace, createLocalConfigRepository(namespace));
        return defaultConfig;
      }
    

    可以看到新建了一个默认配置(DefaultConfig ),并且创建了一个本地配置库,而在创建本地配置仓库的时候又创建了远程配置仓库。
    远程配置仓库做了哪些事呢?

    public RemoteConfigRepository(String namespace) {
        m_namespace = namespace;
        m_configCache = new AtomicReference<ApolloConfig>();
        m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
        m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
        m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
        remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
        m_longPollServiceDto = new AtomicReference<ServiceDTO>();
        m_remoteMessages = new AtomicReference<ApolloNotificationMessages>();
        m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
        m_configNeedForceRefresh = new AtomicBoolean(true);
        m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
            m_configUtil.getOnErrorRetryInterval() * 8);
        gson = new Gson();
        this.trySync();
        this.schedulePeriodicRefresh();
        this.scheduleLongPollingRefresh();
      }
    

    1 this.trySync();//同步远程配置
    2 this.schedulePeriodicRefresh();//添加每5毫秒同步一次的定时任务
    3 this.scheduleLongPollingRefresh();//启动这个定时任务

    配置管理器的另一个方法是用来获取配置文件的
    同样也是通过配置工厂(ConfigFactory->DefaultConfigFactory)来获取配置文件:

    public ConfigFile createConfigFile(String namespace, ConfigFileFormat configFileFormat) {
        ConfigRepository configRepository = createLocalConfigRepository(namespace);
        switch (configFileFormat) {
          case Properties:
            return new PropertiesConfigFile(namespace, configRepository);
          case XML:
            return new XmlConfigFile(namespace, configRepository);
          case JSON:
            return new JsonConfigFile(namespace, configRepository);
          case YAML:
            return new YamlConfigFile(namespace, configRepository);
          case YML:
            return new YmlConfigFile(namespace, configRepository);
        }
    
        return null;
      }
    

    它也会创建一个本地仓库。那么本地仓库又做了哪些事呢?

    public LocalFileConfigRepository(String namespace, ConfigRepository upstream) {
        m_namespace = namespace;
        m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
        this.setLocalCacheDir(findLocalCacheDir(), false);
        this.setUpstreamRepository(upstream);
        this.trySync();
      }
    

    1 this.setLocalCacheDir(findLocalCacheDir(), false);//设置本地配置缓存路径,并将远程仓库的配置持久化到本地
    2 this.setUpstreamRepository(upstream);//设置仓库来源流(远程仓库),并同步远程配置到本地。
    3 this.trySync();//将远程配置文件持久化到本地,并将本地文件到放到缓存中。


    配置的获取

    看到这里,远程的配置到本地的问题就解决了。那么我们程序是如何获取配置的呢?看一个例子:

    config.getProperty(key, DEFAULT_VALUE);
    
    public String getProperty(String key, String defaultValue) {
        // step 1: check system properties, i.e. -Dkey=value
        String value = System.getProperty(key);
    
        // step 2: check local cached properties file
        if (value == null && m_configProperties.get() != null) {
          value = m_configProperties.get().getProperty(key);
        }
    
        /**
         * step 3: check env variable, i.e. PATH=...
         * normally system environment variables are in UPPERCASE, however there might be exceptions.
         * so the caller should provide the key in the right case
         */
        if (value == null) {
          value = System.getenv(key);
        }
    
        // step 4: check properties file from classpath
        if (value == null && m_resourceProperties != null) {
          value = (String) m_resourceProperties.get(key);
        }
    
        if (value == null && m_configProperties.get() == null && m_warnLogRateLimiter.tryAcquire()) {
          logger.warn("Could not load config for namespace {} from Apollo, please check whether the configs are released in Apollo! Return default value now!", m_namespace);
        }
    
        return value == null ? defaultValue : value;
      }
    

    第一步从系统环境变量取
    第二步从本地缓存文件中取
    第三步检查环境变量
    第四步从类路径中取配置文件中的配置

    从这里我们可以知道配置的获取来源有4个地方。


    配置的监听

    上一个部分又解决了配置获取的问题,那么在配置又是如何监听变化的呢?
    首先我们new一个ConfigChangeListener:

    ConfigChangeListener changeListener = new ConfigChangeListener() {
          @Override
          public void onChange(ConfigChangeEvent changeEvent) {
            logger.info("Changes for namespace {}", changeEvent.getNamespace());
            for (String key : changeEvent.changedKeys()) {
              ConfigChange change = changeEvent.getChange(key);
              logger.info("Change - key: {}, oldValue: {}, newValue: {}, changeType: {}",
                  change.getPropertyName(), change.getOldValue(), change.getNewValue(),
                  change.getChangeType());
            }
          }
        };
    

    然后将这个listener加入到某个配置中(Config):

    config.addChangeListener(changeListener);
    

    那么当我们的配置项发生变化时,又是如何处理的呢?
    我们来看一下远程仓库(RemoteConfigRepository)同步的方法:

    protected synchronized void sync() throws Throwable {
        Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
    
        try {
          ApolloConfig previous = m_configCache.get();
          ApolloConfig current = loadApolloConfig();
    
          //reference equals means HTTP 304
          if (previous != current) {
            logger.debug("Remote Config refreshed!");
            m_configCache.set(current);
            this.fireRepositoryChange(m_namespace, this.getConfig());
          }
    
          if (current != null) {
            Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
                current.getReleaseKey());
          }
    
          transaction.setStatus(Transaction.SUCCESS);
        } catch (Throwable ex) {
          transaction.setStatus(ex);
          throw ex;
        } finally {
          transaction.complete();
        }
      }
    

    其中有这么一行

    this.fireRepositoryChange(m_namespace, this.getConfig());
    

    入参分别是namespace和properites(即配置),这一行又干了什么事呢?

    protected void fireRepositoryChange(String namespace, Properties newProperties) {
        for (RepositoryChangeListener listener : m_listeners) {
          try {
            listener.onRepositoryChange(namespace, newProperties);
          } catch (Throwable ex) {
            Tracer.logError(ex);
            logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
          }
        }
      }
    

    可以知道,这个方法将新的获取的配置放到某个namespace下的每一个listener中。
    再看一下放进去又干了什么,以下方法在DefaultConfig中:

    public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
        if (newProperties.equals(m_configProperties.get())) {
          return;
        }
        Properties newConfigProperties = new Properties();
        newConfigProperties.putAll(newProperties);
    
        Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties);
    
        //check double checked result
        if (actualChanges.isEmpty()) {
          return;
        }
    
        this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
    
        Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
      }
    

    这个方法先根据新老配置得出改变的项,然后将这些改变的项,放到fireConfigChange这个方法中:

    protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
        for (final ConfigChangeListener listener : m_listeners) {
          m_executorService.submit(new Runnable() {
            @Override
            public void run() {
              String listenerName = listener.getClass().getName();
              Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
              try {
                listener.onChange(changeEvent);
                transaction.setStatus(Transaction.SUCCESS);
              } catch (Throwable ex) {
                transaction.setStatus(ex);
                Tracer.logError(ex);
                logger.error("Failed to invoke config change listener {}", listenerName, ex);
              } finally {
                transaction.complete();
              }
            }
          });
        }
      }
    

    这个方法为每一个listener启动了一个任务线程,将changeEvent传到每一个listener的onChange方法:

    public void onChange(ConfigChangeEvent changeEvent) {
            logger.info("Changes for namespace {}", changeEvent.getNamespace());
            for (String key : changeEvent.changedKeys()) {
              ConfigChange change = changeEvent.getChange(key);
              logger.info("Change - key: {}, oldValue: {}, newValue: {}, changeType: {}",
                  change.getPropertyName(), change.getOldValue(), change.getNewValue(),
                  change.getChangeType());
            }
    }
    

    这样我们的程序就能捕捉到配置的改变了。至此,我们监听的整个流程也已经理清了。


    apollo客户端最常用的两个操作,一个是获取配置,一个是监听配置变化,都已经分析清楚了,当然还有一些细枝末节的东西也值得我们深究,这里限于篇幅就不探讨了。

    相关文章

      网友评论

        本文标题:apollo客户端探索

        本文链接:https://www.haomeiwen.com/subject/ndpvbftx.html