美文网首页配置中心Apollo
Apollo(3) Portal、Config、Admin Se

Apollo(3) Portal、Config、Admin Se

作者: Oliver_Li | 来源:发表于2022-12-09 21:35 被阅读0次
  • 这几部分以业务为主,主要关心数据流向和长轮询的实现,没贴太多代码


一、数据库表:
  1. portal DB表:
  • App、AppNamespace
  • 包括普通用户、三方用户、权限、操作记录、收藏等
  • 基础配置,包括生效环境、meta地址、部门、接口超时时间等portal的基础配置
  • 总体来说portalDB主要围绕用户和界面展示相关的
  • protal DB里的App、AppNamespace相关数据都是未生效的,数据放入portal DB后,用spring的发布监听机制,异步把数据同步到各个env的admin DB
  1. config db表:
  • App、Cluster
  • AppNamespace:属于App,1对多,代表用户创建过的配置,不可删除,只用于记录,比如用户创建过叫application和db的AppNamespace,继续往下对比和Namespace的区别
  • Namespace:属于Cluster,1对多,代表正在使用的配置文件,可以删除,在创建新Cluster时下面默认会创建出application、db,创建的依据是上面的AppNamespace表的记录,可以在后台手动试试,但用户想删除某个app cluster下的配置,删的就是Namespace表了
  • commit:提交历史,在执行所有修改操作时增加一条记录,记录修改item操作的增量数据(old and new)
  • item:所有具体配置项,未发布过的也都记录在这
  • release message:下面写
  • server config:一些服务相关配置,和portal类似
二、一些细节:
  1. cluster不创建就会用default表示,界面上不会显示,但并不是没有
  2. 每创建一个app默认会创建一个叫application的namespace,对应spring项目的application.properties
  3. openApi可以通过在portal界面注册三方用户,用HTTP + Token的方式直接调用portal相关接口,不用登陆portal手动点击配置发布等等操作
  4. namespace公有、私有、关联
  • 私有namespace:app私有的namespace,只能本服务获取到
  • 公有namespace:其他app可以关联app的公有namespace,app私有namespace如果有相同字段,以私有为准
  • 关联namespace:app关联其他公有namespace后会,会在本项目页面显示关联的共有namespace叫“关联namespace”
  1. 关联公有namespace实际是创建了一条新的namespace给相应app
  2. apollo通过appId + clusterName + namespaceName定位一个具体namespace,类似于maven的groupId + artifactId,源码里也叫watchKey
三、修改、发布:
  1. 配置修改时,item表记录配置项,commit表增加一条修改记录,到这里修改部分就结束了,主要看发布
  2. 配置发布概述,portal向admin service请求是同步,admin service到config service是异步,为了不引入额外组件,admin service写入数据库,config service轮询数据库推送消息的方式代替了消息队列,通过最新数据的ID作为版本号确认是否有配置更新,就是releaseMessage表
  3. 发布时item和commit表没变化,release、releaseHistory和releaseMessage会增加一条记录
  • release表:记录namespace的定位、namespace配置发布的记录
  • releaseHistory表:记录操作日志,上次releaseId、本次releaseId、定位、操作内容(发布、回滚、灰度相关操作)
  • releaseMessage表:模拟MQ推送,根据namespace定位、ID拉取数据
  1. 发送消息时,releaseMessage的ID会存到一个BlockingQueue里,config service会每5s取一次BlockingQueue的数据取完为止,因为只记录定位,同一namespace多次发布releaseMessage表里会有重复数据,需要定时清理,只保留最新一条数据
  2. config service每秒扫描是否有更新,更新时取出遍历所有listener推送Client,这个监听器里触发更新后的一些处理,包括清缓存、通知Client等等,最后更新缓存最大ID记录
  @Override
  public void afterPropertiesSet() throws Exception {
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    maxIdScanned = loadLargestMessageId();
    executorService.scheduleWithFixedDelay(() -> {
      Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
      try {
        scanMissingMessages();
        scanMessages();
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {
        transaction.setStatus(ex);
        logger.error("Scan and send message failed", ex);
      } finally {
        transaction.complete();
      }
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

  }

private boolean scanAndSendMessages() {
    // 一次扫500,多了下次再扫
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
      return false;
    }
    // 遍历所有注册上来的监听器,通知client
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();
    long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    // check id gaps, possible reasons are release message not committed yet or already rolled back
    if (newMaxIdScanned - maxIdScanned > messageScanned) {
      recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
    }
    maxIdScanned = newMaxIdScanned;
    return messageScanned == 500;
  }

/**
   * 遍历所有listener推送message
   * @param messages
   */
  private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
    for (ReleaseMessage message : messages) {
      for (ReleaseMessageListener listener : listeners) {
        try {
          listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
        } catch (Throwable ex) {
          Tracer.logError(ex);
          logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
        }
      }
    }
  }
    @Bean
    public ReleaseMessageScanner releaseMessageScanner() {
      ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
      //0\. handle release message cache
      releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
      //1\. handle gray release rule
      releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
      //2\. handle server cache
      releaseMessageScanner.addMessageListener(configService);
      releaseMessageScanner.addMessageListener(configFileController);
      //3\. notify clients
      releaseMessageScanner.addMessageListener(notificationControllerV2);
      releaseMessageScanner.addMessageListener(notificationController);
      return releaseMessageScanner;
    }
  1. 关联Client章节关于RemoteConfigLongPollService的描述,RemoteConfigLongPollService长轮训/notifications/v2这个接口,触发更新时返回http200,超时返回http304,NotificationControllerV2这个对象触发DeferredResult响应的逻辑就包括在上面提到的监听回调中
  2. NotificationControllerV2里面两个主要方法:
  • 接收Client长轮询的Controller (pollNotification)
  • 上面提到的ReleaseMessageListener的回调方法handleMessage
  1. pollNotification:
  • Client通过请求告知要监听的app、cluster、cluster下当前Client所有namespaces的release message表ID
  • 和样例类似,区别只是源码接口一次请求监听app+cluster下的所有namespace,还有一些额外的处理逻辑,比如等待响应期间数据库连接断掉避免资源浪费,controller中还会检查release message ID是否已经更新,如果已更新就直接响应给Client
  • Client请求超时90s,DeferredResult超时60s构成长轮询,Client接收到响应后会立即再次请求,代码见Client包RemoteConfigLongPollService
四、服务注册、发现:
  1. 回顾:Meta Server = config service + meta service + eureka在同一进程
  2. 启动eureka:
  • 在config service启动类标注了@EnableEurekaServer注解启动eureka
  1. 注册到eureka:
  • biz包的ApolloEurekaClientConfig类重写了获取eureka服务地址的方法,admin和config service都会注册到eureka
  1. 发现:
  • Core包里写过MetaDomainConsts用SPI获取了用户配的Meta Service地址,Client从这里拿到地址,Portal从数据库、配置获取有兴趣可以看看
  • Client和Portal分别有ConfigServiceLocator和AdminServiceAddressLocator会定时从Meta Service发现Config和Admin Service地址,虽然请求发起时也有一些负载策略但Meta Service最好做nginx负载,部署图也展示了
  • meta service一共两个接口,分别获取config和admin服务地址,分别实现了nacos、k8s、consul、eureka(默认)的服务发现,eureka实现比较简单,直接从EurekaClient获取AppName、InstanceId、HomePageUrl返回
四、ServerConfig基础配置:
  • portal db和config db都有ServerConfig表,和配置文件类似,在表里配置有通用性,不需要每个项目都配置一遍
  1. RefreshablePropertySource(抽象):继承MapPropertySource,提供抽象refresh(),能看出它是一个能刷新的kv数据源
  2. PortalDBPropertySource、BizDBPropertySource继承RefreshablePropertySource,refresh()时会从数据库抓取所有配置,并放入PropertySource,分别给portal和admin、config service使用
  3. RefreshableConfig(抽象):上面是两个代表数据源,这个是代表具体数据配置,对应也有两个实现,这里每60s会刷新数据源
@PostConstruct
  public void setup() {
    // 获取数据源,因为要区分portal和admin config,所以在实现类提供
    propertySources = getRefreshablePropertySources();
    if (CollectionUtils.isEmpty(propertySources)) {
      throw new IllegalStateException("Property sources can not be empty.");
    }

    // 刷新数据源并置入environment
    for (RefreshablePropertySource propertySource : propertySources) {
      propertySource.refresh();
      environment.getPropertySources().addLast(propertySource);
    }

    // 定时刷新
    ScheduledExecutorService
        executorService =
        Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigRefresher", true));

    executorService
        .scheduleWithFixedDelay(() -> {
          try {
            propertySources.forEach(RefreshablePropertySource::refresh);
          } catch (Throwable t) {
            logger.error("Refresh configs failed.", t);
            Tracer.logError("Refresh configs failed.", t);
          }
        }, CONFIG_REFRESH_INTERVAL, CONFIG_REFRESH_INTERVAL, TimeUnit.SECONDS);
  }
  1. RefreshableConfig对应的两个实现比较简单,根据key获取各种配置
  2. portal service在页面上还提供了对ServerConfig表配置更新的操作,略

相关文章

网友评论

    本文标题:Apollo(3) Portal、Config、Admin Se

    本文链接:https://www.haomeiwen.com/subject/hjkofdtx.html