美文网首页
soul从入门到放弃17--长轮询同步(二)

soul从入门到放弃17--长轮询同步(二)

作者: 滴流乱转的小胖子 | 来源:发表于2021-02-04 07:03 被阅读0次

    一、前戏

    上篇以服务端(soul-admin)视角,分析了增量同步(推),数据监听被拉的过程。 本篇以客户端(soul-bootstrap)视角,分析下他是怎么拉数据的。

    二、网关启动拉去数据

    • 启动配置与初始化

    soul-spring-boot-starter-sync-data-http项目下HttpSyncDataConfiguration$httpSyncDataService方法

    • 启动开启线程

    HttpSyncDataService中会调用start方法,会全量拉去信息 + 开启线程池监听数据变化。

    private void start() {
        // 防止启动多次的cas锁操作
        if (RUNNING.compareAndSet(false, true)) {
            // 启动时,全量拉取数据
            this.fetchGroupConfig(ConfigGroupEnum.values());
            int threadSize = serverList.size();
            // 根据soul-admin数量开启相同线程数的线程线程池,轮询监听
            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    SoulThreadFactory.create("http-long-polling", true));
            // 开启监听线程,每一个线程监听soul-admin
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
        } else {
            log.info("soul http long polling was started, executor=[{}]", executor);
        }
    }
    
    • 循环向所有soul-admin去拉去数据,具体拉数据的请求逻辑在doFetchGroupConfig
    private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
        for (int index = 0; index < this.serverList.size(); index++) {
            String server = serverList.get(index);
            try {
                this.doFetchGroupConfig(server, groups);
                break;
            } catch (SoulException e) {
                // no available server, throw exception.
                if (index >= serverList.size() - 1) {
                    throw e;
                }
                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
            }
        }
    }
    
    • doFetchGroupConfig中对单一soul-admin的多组,进行循环调用后台的 /configs/fetch 接口, 拿到某个类型的数据, 并更新缓存。
    • 更新缓存前会检测是否变动, 如果变动则结束, 数据未发生变动、远端数据过期则睡眠30s ,由于是第一次启动, 数据为空的情况下肯定会更新缓存, 所以会直接结束)
    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        StringBuilder params = new StringBuilder();
        // 根据ConfigGroupEnum循环拉取每组的数据,组指的是plugin、rule、selector 等
        for (ConfigGroupEnum groupKey : groups) {
            params.append("groupKeys").append("=").append(groupKey.name()).append("&");
        }
        // 请求后台soul-admin的地址
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
        log.info("request configs: [{}]", url);
        String json = null;
        try {
            json = this.httpClient.getForObject(url, String.class);
        } catch (RestClientException e) {
            String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
            log.warn(message);
            throw new SoulException(message, e);
        }
        // 后台拉取数据成功,更新缓存数据,并返回是否更新成功
        boolean updated = this.updateCacheWithJson(json);
        if (updated) {
            log.info("get latest configs: [{}]", json);
            return;
        }
        // 没有发生更新,或是远端已经过期,则睡30s
        log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
        ThreadUtils.sleep(TimeUnit.SECONDS, 30);
    }
    
    • updateCacheWithJson反序列化json
    /**
     * update local cache.
     * @param json the response from config server.
     * @return true: the local cache was updated. false: not updated.
     */
    private boolean updateCacheWithJson(final String json) {
        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
        JsonObject data = jsonObject.getAsJsonObject("data");
        // 调用更新方法
        return factory.executor(data);
    }
    
    • DataRefreshFactory$executor,调用各类型数据刷新类
    public boolean executor(final JsonObject data) {
        final boolean[] success = {false};
        // 所有数据类型循环的 DataRefresh 全调用
        ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
        return success[0];
      } 
    
    • AbstractDataRefresh$refresh, 调用各AbstractDataRefresh实现类的updateCacheIfNeed判断数据是否有更新,如果有更新调用各自的refresh方法更新数据
    • refresh中调用响应的数据更新事件
    public Boolean refresh(final JsonObject data) {
        boolean updated = false;
        JsonObject jsonObject = convert(data);
        if (null != jsonObject) {
            ConfigData<T> result = fromJson(jsonObject);
            // 判断不同类型数据是否更新
            if (this.updateCacheIfNeed(result)) {
                updated = true;
                // 实际刷新逻辑
                refresh(result.getData());
            }
        }
        return updated;
    }
    
    image image

    三、网关轮询监听变化

    • 网关启动时调用start方法,start中开启线程池调用HttpLongPollingTask
    • 在HttpLongPollingTask$run中循环调用doLongPolling,实现重试3次的调用。每次低啊用失败,则睡五秒再调用。
    public void run() {
        while (RUNNING.get()) {
        // 循环重试测试调用
            for (int time = 1; time <= retryTimes; time++) {
                try {
                    doLongPolling(server);
                } catch (Exception e) {
                    // 调用失败,就沉睡5秒,进行重试再次调用
                    if (time < retryTimes) {
                        log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                                time, retryTimes - time, e.getMessage());
                        ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                        continue;
                    }
                    // 
                    log.error("Long polling failed, try again after 5 minutes!", e);
                    ThreadUtils.sleep(TimeUnit.MINUTES, 5);
                }
            }
        }
        log.warn("Stop http long polling.");
    }
    
    • HttpLongPollingTask$doLongPolling,发送数据更新签名到soul-admin进行验证,判断数据是否更新,如果已更新,网关得到相应结果,则调用数据拉去方法进行更新
    private void doLongPolling(final String server) {
      // 从缓存中获取数据
      MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
      for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
        ConfigData<?> cacheConfig = factory.cacheConfigData(group);
        // 生成本地数据更新签名,用于与远端验证,减少传输数据量
        String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
        params.put(group.name(), Lists.newArrayList(value));
      }
      // 构建 http 请求信息
      HttpHeaders headers = new HttpHeaders();
      headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
      HttpEntity httpEntity = new HttpEntity(params, headers);
      String listenerUrl = server + "/configs/listener";
      log.debug("request listener configs: [{}]", listenerUrl);
      JsonArray groupJson = null;
      try {
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
      } catch (RestClientException e) {
        String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
        throw new SoulException(message, e);
      }
      // 得到变化的类型
      if (groupJson != null) {
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {
          log.info("Group config changed: {}", Arrays.toString(changedGroups));
          // 拉取后台对应类型的数据
          this.doFetchGroupConfig(server, changedGroups);
        }
      }
    }
    

    四、小结

    • 通过这两篇的分析,发现数据更新主要是三个触发点

    1.网关启动时的全量拉

    2.网关定时检验数据更新,增量拉

    3.后台管理的某类型数据变化,远端推送

    • 日拱一卒,收拾下上班
    • soul的四大块: 数据同步、插件、客户端注解、指标监控,下一个写点啥?

    相关文章

      网友评论

          本文标题:soul从入门到放弃17--长轮询同步(二)

          本文链接:https://www.haomeiwen.com/subject/rxbvtltx.html