美文网首页
十一、soul源码学习-http长轮训数据同步机制详解-2

十一、soul源码学习-http长轮训数据同步机制详解-2

作者: 风洛洛 | 来源:发表于2021-01-28 07:31 被阅读0次

    上一节我们讲解了HttpSyncDataService在初始化后,start的fetch流程,接下来,我们在看下他的定时监听流程。

    private void start() {
      // It could be initialized multiple times, so you need to control that.
      if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        //初始化一个线程池
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                                               new LinkedBlockingQueue<>(),
                                               SoulThreadFactory.create("http-long-polling", true));
        // 循环所有的server地址,创建HttpLongPollingTask异步执行长轮训任务
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
      } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
      }
    }
    

    我们看下HttpLongPollingTask,主要是循环的根据RUNNING状态判断是否需要进行调用

    //org.dromara.soul.sync.data.http.HttpSyncDataService.HttpLongPollingTask
    class HttpLongPollingTask implements Runnable {
    
      private String server;
    
      private final int retryTimes = 3;
    
      HttpLongPollingTask(final String server) {
        this.server = server;
      }
    
      @Override
      public void run() {
        while (RUNNING.get()) {
          for (int time = 1; time <= retryTimes; time++) {
            try {
              doLongPolling(server);
            } catch (Exception e) {
              // 如果拉取服务器配置的时候报错,则重试,最多重试3次,并等待5秒钟
              if (time < retryTimes) {
                log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                         time, retryTimes - time, e.getMessage());
                ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                continue;
              }
              重试超过3次后,报错,并等待5分钟
              log.error("Long polling failed, try again after 5 minutes!", e);
              ThreadUtils.sleep(TimeUnit.MINUTES, 5);
            }
          }
        }
        log.warn("Stop http long polling.");
      }
    }
    

    doLongPolling这个方法用到了我们之前在SoulAdmin看到的/configs/listener

    //org.dromara.soul.sync.data.http.HttpSyncDataService#doLongPolling
    private void doLongPolling(final String server) {
      MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
      //循环所有的ConfigGroup,并从之前fetch拉取到的数据中获取本地的配置数据
      for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
        ConfigData<?> cacheConfig = factory.cacheConfigData(group);
        //构造参数value值
        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
        params.put(group.name(), Lists.newArrayList(value));
      }
      HttpHeaders headers = new HttpHeaders();
      headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
      HttpEntity httpEntity = new HttpEntity(params, headers);
      //http调用SoulAdmin的configs/listener
      String listenerUrl = server + "/configs/listener";
      log.debug("request listener configs: [{}]", listenerUrl);
      JsonArray groupJson = null;
      try {
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
      } catch (RestClientException e) {
        String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
        throw new SoulException(message, e);
      }
      //获取到结果的json
      if (groupJson != null) {
        // 发现哪些是需要变化的group
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {
          //再通过fetch拉取对应group的最新数据
          log.info("Group config changed: {}", Arrays.toString(changedGroups));
          this.doFetchGroupConfig(server, changedGroups);
        }
      }
    }
    

    到这里我们看到了,HttpSyncDataService在start的时候,刚开始会全量fetch,并启动一个异步线程,对所有的SoulAdmin继续进行异步的listener。

    我们总结一下流程图:

    image.png

    到这里我们将SoulAdmin配置相关同步机制都讲完了

    相关文章

      网友评论

          本文标题:十一、soul源码学习-http长轮训数据同步机制详解-2

          本文链接:https://www.haomeiwen.com/subject/izdozktx.html