美文网首页配置中心Apollo
Apollo(3) Portal、Config、Admin Se

Apollo(3) Portal、Config、Admin Se

作者: Oliver_Li | 来源:发表于2022-12-09 21:35 被阅读0次
    • 这几部分以业务为主,主要关心数据流向和长轮询的实现,没贴太多代码


    一、数据库表:
    1. portal DB表:
    • App、AppNamespace
    • 包括普通用户、三方用户、权限、操作记录、收藏等
    • 基础配置,包括生效环境、meta地址、部门、接口超时时间等portal的基础配置
    • 总体来说portalDB主要围绕用户和界面展示相关的
    • protal DB里的App、AppNamespace相关数据都是未生效的,数据放入portal DB后,用spring的发布监听机制,异步把数据同步到各个env的admin DB
    1. config db表:
    • App、Cluster
    • AppNamespace:属于App,1对多,代表用户创建过的配置,不可删除,只用于记录,比如用户创建过叫application和db的AppNamespace,继续往下对比和Namespace的区别
    • Namespace:属于Cluster,1对多,代表正在使用的配置文件,可以删除,在创建新Cluster时下面默认会创建出application、db,创建的依据是上面的AppNamespace表的记录,可以在后台手动试试,但用户想删除某个app cluster下的配置,删的就是Namespace表了
    • commit:提交历史,在执行所有修改操作时增加一条记录,记录修改item操作的增量数据(old and new)
    • item:所有具体配置项,未发布过的也都记录在这
    • release message:下面写
    • server config:一些服务相关配置,和portal类似
    二、一些细节:
    1. cluster不创建就会用default表示,界面上不会显示,但并不是没有
    2. 每创建一个app默认会创建一个叫application的namespace,对应spring项目的application.properties
    3. openApi可以通过在portal界面注册三方用户,用HTTP + Token的方式直接调用portal相关接口,不用登陆portal手动点击配置发布等等操作
    4. namespace公有、私有、关联
    • 私有namespace:app私有的namespace,只能本服务获取到
    • 公有namespace:其他app可以关联app的公有namespace,app私有namespace如果有相同字段,以私有为准
    • 关联namespace:app关联其他公有namespace后会,会在本项目页面显示关联的共有namespace叫“关联namespace”
    1. 关联公有namespace实际是创建了一条新的namespace给相应app
    2. apollo通过appId + clusterName + namespaceName定位一个具体namespace,类似于maven的groupId + artifactId,源码里也叫watchKey
    三、修改、发布:
    1. 配置修改时,item表记录配置项,commit表增加一条修改记录,到这里修改部分就结束了,主要看发布
    2. 配置发布概述,portal向admin service请求是同步,admin service到config service是异步,为了不引入额外组件,admin service写入数据库,config service轮询数据库推送消息的方式代替了消息队列,通过最新数据的ID作为版本号确认是否有配置更新,就是releaseMessage表
    3. 发布时item和commit表没变化,release、releaseHistory和releaseMessage会增加一条记录
    • release表:记录namespace的定位、namespace配置发布的记录
    • releaseHistory表:记录操作日志,上次releaseId、本次releaseId、定位、操作内容(发布、回滚、灰度相关操作)
    • releaseMessage表:模拟MQ推送,根据namespace定位、ID拉取数据
    1. 发送消息时,releaseMessage的ID会存到一个BlockingQueue里,config service会每5s取一次BlockingQueue的数据取完为止,因为只记录定位,同一namespace多次发布releaseMessage表里会有重复数据,需要定时清理,只保留最新一条数据
    2. config service每秒扫描是否有更新,更新时取出遍历所有listener推送Client,这个监听器里触发更新后的一些处理,包括清缓存、通知Client等等,最后更新缓存最大ID记录
      @Override
      public void afterPropertiesSet() throws Exception {
        databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
        maxIdScanned = loadLargestMessageId();
        executorService.scheduleWithFixedDelay(() -> {
          Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
          try {
            scanMissingMessages();
            scanMessages();
            transaction.setStatus(Transaction.SUCCESS);
          } catch (Throwable ex) {
            transaction.setStatus(ex);
            logger.error("Scan and send message failed", ex);
          } finally {
            transaction.complete();
          }
        }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
    
      }
    
    private boolean scanAndSendMessages() {
        // 一次扫500,多了下次再扫
        List<ReleaseMessage> releaseMessages =
            releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
        if (CollectionUtils.isEmpty(releaseMessages)) {
          return false;
        }
        // 遍历所有注册上来的监听器,通知client
        fireMessageScanned(releaseMessages);
        int messageScanned = releaseMessages.size();
        long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
        // check id gaps, possible reasons are release message not committed yet or already rolled back
        if (newMaxIdScanned - maxIdScanned > messageScanned) {
          recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
        }
        maxIdScanned = newMaxIdScanned;
        return messageScanned == 500;
      }
    
    /**
       * 遍历所有listener推送message
       * @param messages
       */
      private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
        for (ReleaseMessage message : messages) {
          for (ReleaseMessageListener listener : listeners) {
            try {
              listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
            } catch (Throwable ex) {
              Tracer.logError(ex);
              logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
            }
          }
        }
      }
    
        @Bean
        public ReleaseMessageScanner releaseMessageScanner() {
          ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
          //0\. handle release message cache
          releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
          //1\. handle gray release rule
          releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
          //2\. handle server cache
          releaseMessageScanner.addMessageListener(configService);
          releaseMessageScanner.addMessageListener(configFileController);
          //3\. notify clients
          releaseMessageScanner.addMessageListener(notificationControllerV2);
          releaseMessageScanner.addMessageListener(notificationController);
          return releaseMessageScanner;
        }
    
    1. 关联Client章节关于RemoteConfigLongPollService的描述,RemoteConfigLongPollService长轮训/notifications/v2这个接口,触发更新时返回http200,超时返回http304,NotificationControllerV2这个对象触发DeferredResult响应的逻辑就包括在上面提到的监听回调中
    2. NotificationControllerV2里面两个主要方法:
    • 接收Client长轮询的Controller (pollNotification)
    • 上面提到的ReleaseMessageListener的回调方法handleMessage
    1. pollNotification:
    • Client通过请求告知要监听的app、cluster、cluster下当前Client所有namespaces的release message表ID
    • 和样例类似,区别只是源码接口一次请求监听app+cluster下的所有namespace,还有一些额外的处理逻辑,比如等待响应期间数据库连接断掉避免资源浪费,controller中还会检查release message ID是否已经更新,如果已更新就直接响应给Client
    • Client请求超时90s,DeferredResult超时60s构成长轮询,Client接收到响应后会立即再次请求,代码见Client包RemoteConfigLongPollService
    四、服务注册、发现:
    1. 回顾:Meta Server = config service + meta service + eureka在同一进程
    2. 启动eureka:
    • 在config service启动类标注了@EnableEurekaServer注解启动eureka
    1. 注册到eureka:
    • biz包的ApolloEurekaClientConfig类重写了获取eureka服务地址的方法,admin和config service都会注册到eureka
    1. 发现:
    • Core包里写过MetaDomainConsts用SPI获取了用户配的Meta Service地址,Client从这里拿到地址,Portal从数据库、配置获取有兴趣可以看看
    • Client和Portal分别有ConfigServiceLocator和AdminServiceAddressLocator会定时从Meta Service发现Config和Admin Service地址,虽然请求发起时也有一些负载策略但Meta Service最好做nginx负载,部署图也展示了
    • meta service一共两个接口,分别获取config和admin服务地址,分别实现了nacos、k8s、consul、eureka(默认)的服务发现,eureka实现比较简单,直接从EurekaClient获取AppName、InstanceId、HomePageUrl返回
    四、ServerConfig基础配置:
    • portal db和config db都有ServerConfig表,和配置文件类似,在表里配置有通用性,不需要每个项目都配置一遍
    1. RefreshablePropertySource(抽象):继承MapPropertySource,提供抽象refresh(),能看出它是一个能刷新的kv数据源
    2. PortalDBPropertySource、BizDBPropertySource继承RefreshablePropertySource,refresh()时会从数据库抓取所有配置,并放入PropertySource,分别给portal和admin、config service使用
    3. RefreshableConfig(抽象):上面是两个代表数据源,这个是代表具体数据配置,对应也有两个实现,这里每60s会刷新数据源
    @PostConstruct
      public void setup() {
        // 获取数据源,因为要区分portal和admin config,所以在实现类提供
        propertySources = getRefreshablePropertySources();
        if (CollectionUtils.isEmpty(propertySources)) {
          throw new IllegalStateException("Property sources can not be empty.");
        }
    
        // 刷新数据源并置入environment
        for (RefreshablePropertySource propertySource : propertySources) {
          propertySource.refresh();
          environment.getPropertySources().addLast(propertySource);
        }
    
        // 定时刷新
        ScheduledExecutorService
            executorService =
            Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigRefresher", true));
    
        executorService
            .scheduleWithFixedDelay(() -> {
              try {
                propertySources.forEach(RefreshablePropertySource::refresh);
              } catch (Throwable t) {
                logger.error("Refresh configs failed.", t);
                Tracer.logError("Refresh configs failed.", t);
              }
            }, CONFIG_REFRESH_INTERVAL, CONFIG_REFRESH_INTERVAL, TimeUnit.SECONDS);
      }
    
    1. RefreshableConfig对应的两个实现比较简单,根据key获取各种配置
    2. portal service在页面上还提供了对ServerConfig表配置更新的操作,略

    相关文章

      网友评论

        本文标题:Apollo(3) Portal、Config、Admin Se

        本文链接:https://www.haomeiwen.com/subject/hjkofdtx.html