Apollo

作者: 追风还是少年 | 来源:发表于2023-08-27 00:21 被阅读0次

    Apollo组成

    image.png
    • Config Service
      提供配置的读取、推送等功能,服务对象是Apollo客户端
      提供配置更新推送接口(基于Http long polling)
      (1)服务端使用Spring DeferredResult实现异步化,从而大大增加长连接数量
      (2)目前使用的tomcat embed默认配置是最多10000个连接(可以调整),使用了4C8G的虚拟机实测可以支撑10000个连接,所以满足需求(一个应用实例只会发起一个长连接)。

    • Admin Service
      提供配置的修改、发布等功能,服务对象是Apollo Portal(管理界面)

    • Meta Server
      Portal通过域名访问Meta Server获取Admin Service服务列表(IP+Port)
      Client通过域名访问Meta Server获取Config Service服务列表(IP+Port)
      Meta Server从Eureka获取Config Service和Admin Service的服务信息,相当于是一个Eureka Client
      增设一个Meta Server的角色主要是为了封装服务发现的细节,对Portal和Client而言,永远通过一个Http接口获取Admin Service和Config Service的服务信息,而不需要关心背后实际的服务注册和发现组件
      Meta Server只是一个逻辑角色,在部署时和Config Service是在一个JVM进程中的,所以IP、端口和Config Service一致

    • Eureka
      基于Eureka和Spring Cloud Netflix提供服务注册和发现
      Config Service和Admin Service会向Eureka注册服务,并保持心跳
      为了简单起见,目前Eureka在部署时和Config Service是在一个JVM进程中的(通过Spring Cloud Netflix)

    • Portal
      提供Web界面供用户管理配置
      通过Meta Server获取Admin Service服务列表(IP+Port),通过IP+Port访问服务
      在Portal侧做load balance、错误重试

    • Client
      Apollo提供的客户端程序,为应用提供配置获取、实时更新等功能
      通过Meta Server获取Config Service服务列表(IP+Port),通过IP+Port访问服务
      在Client侧做load balance、错误重试

    Apollo运行

    image.png
    • 用户在配置中心对配置进行修改并发布
    • 配置中心通知Apollo客户端有配置更新
    • Apollo客户端从配置中心拉取最新的配置、更新本地配置并通知到应用

    服务端设计

    • 配置发布后的实时推送设计
      在配置中心中,一个重要的功能就是配置发布后实时推送到客户端。下面我们简要看一下这块是怎么设计实现的。


      image.png

    配置发布过程:
    (1)用户在Portal操作配置发布
    (2)Portal调用Admin Service的接口操作发布
    (3)Admin Service发布配置后,发送ReleaseMessage给各个Config Service
    (4)Config Service收到ReleaseMessage后,通知对应的客户端

    • 发送ReleaseMessage的实现方式
      Admin Service在配置发布后,需要通知所有的Config Service有配置发布,从而Config Service可以通知对应的客户端来拉取最新的配置。

    从概念上来看,这是一个典型的消息使用场景,Admin Service作为producer发出消息,各个Config Service作为consumer消费消息。通过一个消息组件(Message Queue)就能很好的实现Admin Service和Config Service的解耦。

    在实现上,考虑到Apollo的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。

    实现方式如下:

    1. Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender
    2. Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner
    3. Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration
    4. NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端
    image.png

    ReleaseMessageScanner:

    public class ReleaseMessageScanner implements InitializingBean {
      @Override
      public void afterPropertiesSet() throws Exception {
        databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
        maxIdScanned = loadLargestMessageId();
        executorService.scheduleWithFixedDelay((Runnable) () -> {
          Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
          try {
            scanMessages();
            transaction.setStatus(Transaction.SUCCESS);
          } catch (Throwable ex) {
            transaction.setStatus(ex);
            logger.error("Scan and send message failed", ex);
          } finally {
            transaction.complete();
          }
        }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
    
      }
      private void scanMessages() {
        boolean hasMoreMessages = true;
        while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
          hasMoreMessages = scanAndSendMessages();
        }
      }
    
      /**
       * scan messages and send
       *
       * @return whether there are more messages
       */
      private boolean scanAndSendMessages() {
        //current batch is 500
        List<ReleaseMessage> releaseMessages =
            releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
        if (CollectionUtils.isEmpty(releaseMessages)) {
          return false;
        }
        // 触发通知监听器处理“发布配置“消息
        fireMessageScanned(releaseMessages);
        int messageScanned = releaseMessages.size();
        maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
        return messageScanned == 500;
      }
      public void addMessageListener(ReleaseMessageListener listener) {
        if (!listeners.contains(listener)) {
          listeners.add(listener);
        }
      }
      private void fireMessageScanned(List<ReleaseMessage> messages) {
        for (ReleaseMessage message : messages) {
          for (ReleaseMessageListener listener : listeners) {
            try {
              listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
            } catch (Throwable ex) {
              Tracer.logError(ex);
              logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
            }
          }
        }
      }
    }
    

    ConfigServiceAutoConfiguration:

        public ReleaseMessageScanner releaseMessageScanner() {
          ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
          //0. handle release message cache
          releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
          //1. handle gray release rule
          releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
          //2. handle server cache
          releaseMessageScanner.addMessageListener(configService);
          releaseMessageScanner.addMessageListener(configFileController);
          //3. notify clients,把notificationControllerV2到releaseMessageScanner消息监听器
         releaseMessageScanner.addMessageListener(notificationControllerV2);
          releaseMessageScanner.addMessageListener(notificationController);
          return releaseMessageScanner;
        }
    
    @RestController
    @RequestMapping("/notifications/v2")
    public class NotificationControllerV2 implements ReleaseMessageListener {
      @GetMapping
      public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
          @RequestParam(value = "appId") String appId,
          @RequestParam(value = "cluster") String cluster,
          @RequestParam(value = "notifications") String notificationsAsString,
          @RequestParam(value = "dataCenter", required = false) String dataCenter,
          @RequestParam(value = "ip", required = false) String clientIp) {
        List<ApolloConfigNotification> notifications = null;
    
        try {
          notifications =
              gson.fromJson(notificationsAsString, notificationsTypeReference);
        } catch (Throwable ex) {
          Tracer.logError(ex);
        }
    
        if (CollectionUtils.isEmpty(notifications)) {
          throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
        }
    
        DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
        Set<String> namespaces = Sets.newHashSet();
        Map<String, Long> clientSideNotifications = Maps.newHashMap();
        Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);
    
        for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
          String normalizedNamespace = notificationEntry.getKey();
          ApolloConfigNotification notification = notificationEntry.getValue();
          namespaces.add(normalizedNamespace);
          clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
          if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
            deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
          }
        }
    
        if (CollectionUtils.isEmpty(namespaces)) {
          throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
        }
    
        Multimap<String, String> watchedKeysMap =
            watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
    
        Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
    
        /**
         * 1、set deferredResult before the check, for avoid more waiting
         * If the check before setting deferredResult,it may receive a notification the next time
         * when method handleMessage is executed between check and set deferredResult.
         */
        deferredResultWrapper
              .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
    
        deferredResultWrapper.onCompletion(() -> {
          //unregister all keys
          for (String key : watchedKeys) {
            deferredResults.remove(key, deferredResultWrapper);
          }
          logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
        });
    
        //register all keys
        for (String key : watchedKeys) {
          this.deferredResults.put(key, deferredResultWrapper);
        }
    
        logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
        logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
            watchedKeys, appId, cluster, namespaces, dataCenter);
    
        /**
         * 2、check new release
         */
        List<ReleaseMessage> latestReleaseMessages =
            releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
    
        /**
         * Manually close the entity manager.
         * Since for async request, Spring won't do so until the request is finished,
         * which is unacceptable since we are doing long polling - means the db connection would be hold
         * for a very long time
         */
        entityManagerUtil.closeEntityManager();
    
        List<ApolloConfigNotification> newNotifications =
            getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
                latestReleaseMessages);
    
        if (!CollectionUtils.isEmpty(newNotifications)) {
          deferredResultWrapper.setResult(newNotifications);
        }
    
        return deferredResultWrapper.getResult();
      }
      @Override
      public void handleMessage(ReleaseMessage message, String channel) {
        logger.info("message received - channel: {}, message: {}", channel, message);
    
        String content = message.getMessage();
        Tracer.logEvent("Apollo.LongPoll.Messages", content);
        if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
          return;
        }
    
        String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
    
        if (Strings.isNullOrEmpty(changedNamespace)) {
          logger.error("message format invalid - {}", content);
          return;
        }
    
        if (!deferredResults.containsKey(content)) {
          return;
        }
    
        //create a new list to avoid ConcurrentModificationException
        List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
    
        ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
        configNotification.addMessage(content, message.getId());
    
        //do async notification if too many clients
        if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
          largeNotificationBatchExecutorService.submit(() -> {
            logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
                bizConfig.releaseMessageNotificationBatch());
            for (int i = 0; i < results.size(); i++) {
              if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
                try {
                  TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
                } catch (InterruptedException e) {
                  //ignore
                }
              }
              logger.debug("Async notify {}", results.get(i));
              // 设置NotificationControllerV2  HTTP接口/notifications/v2的返回结果DeferredResult
              results.get(i).setResult(configNotification);
            }
          });
          return;
        }
    
        logger.debug("Notify {} clients for key {}", results.size(), content);
    
        for (DeferredResultWrapper result : results) {
         // 设置NotificationControllerV2  HTTP接口/notifications/v2的返回结果DeferredResult
          result.setResult(configNotification);
        }
        logger.debug("Notification completed");
      }
    }
    
    • Config Service通知客户端的实现方式
      NotificationControllerV2在得知有配置发布后是如何通知到客户端的呢?
      实现方式如下:
      (1)客户端会发起一个Http请求到Config Service的notifications/v2接口,也就是NotificationControllerV2,参见RemoteConfigLongPollService
      (2)NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult把请求挂起
      (3)如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端
      (4)如果有该客户端关心的配置发布,NotificationControllerV2会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置。

    DeferredResult
    当一个请求到达API接口,如果该API接口的return返回值是DeferredResult,在没有超时或者DeferredResult对象设置setResult时,接口不会返回,但是Servlet容器线程会结束,DeferredResult另起线程来进行结果处理(即这种操作提升了服务短时间的吞吐能力),并setResult,如此以来这个请求不会占用服务连接池太久,如果超时或设置setResult,接口会立即返回。

    客户端设计

    Apollo客户端的实现原理:

    • 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过Http Long Polling实现)
    • 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。
      (1)这是一个fallback机制,为了防止推送机制失效导致配置不更新
      (2)客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
      (3)定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property: apollo.refreshInterval来覆盖,单位为分钟。
    • 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
    • 客户端会把从服务端获取到的配置在本地文件系统缓存一份
      在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置
    • 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知

    RemoteConfigRepository:

      public RemoteConfigRepository(String namespace) {
        m_namespace = namespace;
        m_configCache = new AtomicReference<>();
        m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
        m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
        m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
        remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
        m_longPollServiceDto = new AtomicReference<>();
        m_remoteMessages = new AtomicReference<>();
        m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
        m_configNeedForceRefresh = new AtomicBoolean(true);
        m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
            m_configUtil.getOnErrorRetryInterval() * 8);
        gson = new Gson();
        this.trySync();
        // 开启定时从服务端获取最新配置
        this.schedulePeriodicRefresh();
       // 调用RemoteConfigLongPollService开启长轮询
        this.scheduleLongPollingRefresh();
      }
    
      private void schedulePeriodicRefresh() {
        logger.debug("Schedule periodic refresh with interval: {} {}",
            m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
        m_executorService.scheduleAtFixedRate(
            new Runnable() {
              @Override
              public void run() {
                Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
                logger.debug("refresh config for namespace: {}", m_namespace);
                // 调用Config Service的HTTP接口 /configs获取最新配置
                trySync();
                Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
              }
            }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
            m_configUtil.getRefreshIntervalTimeUnit());
      }
    
      private void scheduleLongPollingRefresh() {
        remoteConfigLongPollService.submit(m_namespace, this);
      }
       //接收长轮询的通知
      public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
        m_longPollServiceDto.set(longPollNotifiedServiceDto);
        m_remoteMessages.set(remoteMessages);
        m_executorService.submit(new Runnable() {
          @Override
          public void run() {
            m_configNeedForceRefresh.set(true);
            trySync();
          }
        });
      }
    

    RemoteConfigLongPollService:

      public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
        boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
        m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
        if (!m_longPollStarted.get()) {
          startLongPolling();
        }
        return added;
      }
    
      private void startLongPolling() {
        if (!m_longPollStarted.compareAndSet(false, true)) {
          //already started
          return;
        }
        try {
          final String appId = m_configUtil.getAppId();
          final String cluster = m_configUtil.getCluster();
          final String dataCenter = m_configUtil.getDataCenter();
          final String secret = m_configUtil.getAccessKeySecret();
          final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
          m_longPollingService.submit(new Runnable() {
            @Override
            public void run() {
              if (longPollingInitialDelayInMills > 0) {
                try {
                  logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
                  TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
                } catch (InterruptedException e) {
                  //ignore
                }
              }
              // 调用Config Service的HTTP接口 /notifications/v2获取有变更的配置
             // 如果有变更的配置,通知RemoteConfigRepository拉取最新配置
              doLongPollingRefresh(appId, cluster, dataCenter, secret);
            }
          });
        } catch (Throwable ex) {
          m_longPollStarted.set(false);
          ApolloConfigException exception =
              new ApolloConfigException("Schedule long polling refresh failed", ex);
          Tracer.logError(exception);
          logger.warn(ExceptionUtil.getDetailMessage(exception));
        }
      }
    

    相关文章

      网友评论

          本文标题:Apollo

          本文链接:https://www.haomeiwen.com/subject/lfwrmdtx.html