美文网首页
soul网关学习10-配置数据同步1-HttpLongPolli

soul网关学习10-配置数据同步1-HttpLongPolli

作者: niuxin | 来源:发表于2021-01-23 22:47 被阅读0次

    前言

    我们知道soul-bootstrap作为网关入口,需要能承载这些流量,同时又能实现网关插件功能(路由、限流、熔断)的动态配置,其配置动态生效的原理大致如下:

    1. 网关soul-boostrap会将配置数据存放到内存HashMap中,当流量请求进来的时候,直接从内存中匹配对应配置,从而实现插件功能的逻辑
    2. 网关soul-admin控制台,提供了可视化的管理界面,能够供开发和运维人员维护功能插件的配置数据,该配置数据是会存储到soul-admin数据库的
    3. 通过soul-boostrapsoul-admin中数据同步机制,将配置数据从soul-admin同步到soul-boostrap
    4. 实现流程如下:


      config-data-sync
    5. 源码模块如下:
      soul-sync-data-center
      接下来我们分析HttpLongPolling的配置同步方式。

    HttpLongPolling(Http长轮询)

    使用

    soul-bootstrap

    • 引入依赖,把数据同步相关的starter引入进来
       <!--soul data sync start use zookeeper-->
            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
                <version>${project.version}</version>
            </dependency>
    
            <!--soul data sync start use websocket-->
            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
                <version>${project.version}</version>
            </dependency>
    
            <!--soul data sync start use http-->
            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
                <version>${project.version}</version>
            </dependency>
    
            <!--soul data sync start use nacos-->
            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
                <version>${project.version}</version>
            </dependency>
    
    • 配置


      bootstrap.applicaiton.sync-http

    soul-admin

    • 配置


      admin.applcaiton.sync-http

    使用注意

    1. 启动soul-bootstrap程序时需注意:采用http长轮询同步,需确保soul-admin是正常启动的,否则会导致soul-bootstrap启动失败
    2. 具体采用哪种同步方式,需注意soul-adminsoul-bootstrap两边配置的同步方式要保持一致,否则会同步异常
    3. 如果soul-bootstrap启动输出如下日志,则表明成功使用HttpLongPoling的数据同步方式
    2021-01-24 08:27:45.086  INFO 5667 --- [           main] .s.s.b.s.s.d.h.HttpSyncDataConfiguration : you use http long pull sync soul data
    2021-01-24 08:27:45.702  INFO 5667 --- [           main] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
    2021-01-24 08:27:46.324  INFO 5667 --- [onPool-worker-1] o.d.s.s.d.h.refresh.AppAuthDataRefresh   : clear all appAuth data cache
    

    源码分析

    soul-bootstrap

    • 引入starter依赖。把四种同步类型的starter全部引入也没有关系,因为最终生效还需要对应属性配置有进行配置才可。具体可参考对应同步类型starter中的xxxSyncDataConfiguration
    // 配置生效关系
    HttpLongPoling -> HttpSyncDataConfiguration -> soul.sync.http.url
    Websocket -> WebsocketSyncDataConfiguration -> soul.sync.websocket.url
    Zookeeper -> ZookeeperSyncDataConfiguration -> soul.sync.zookeeper.url
    Nacos -> NacosSyncDataConfiguration -> soul.sync.nacos.url
    
    • 启动类入口为org.dromara.soul.sync.data.http.HttpSyncDataService。其关键逻辑:HttpSyncDataService实例化 -> 启动 start ->拉取配置 fetchGroupConfig -> 执行拉取配置doFetchGroupConfig ->更新内存配置updateCache -> 启动长轮询线程池 -> 线程池长轮询任务HttpLongPollingTask -> 长轮询逻辑实现doLongPolling
    • 更详细一些的逻辑就需要看下面的源码注释拉。
    • HttpSyncDataService实例化
        public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
                                   final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
            this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
            this.httpConfig = httpConfig;
            // 多个admin配置用,分割
            this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
            this.httpClient = createRestTemplate();
            // 实例化过程会调用启动逻辑,启动时admin必须在线,否则会导致bootstrap也启动不起来
            this.start();
        }
    
    • 启动start
        private void start() {
            // It could be initialized multiple times, so you need to control that.
            if (RUNNING.compareAndSet(false, true)) {
                // fetch all group configs.
                // 初始启动时拉取一次
                this.fetchGroupConfig(ConfigGroupEnum.values());
                // TODO question:  如果admin的配置使用的是负载均衡呢?这个逻辑是不是就会存在问题了?
                // 线程池的大小为admin服务器数目,需要分别使用一个线程去连接每个admin,去做长轮询
                int threadSize = serverList.size();
                this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(),
                        SoulThreadFactory.create("http-long-polling", true));
                // start long polling, each server creates a thread to listen for changes.
                // 开启长轮询,每个server都创建一个线程去监听配置变化
                // 这里与启动过程拉配置是不一样的,启动过程拉取初始配置,则需要从某一台admin机器拉到就行;
                // 监听配置变更则不一样,因为用户操作时,不知道是在哪台节点上面响应的,所有需要全部监听
                this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
            } else {
                log.info("soul http long polling was started, executor=[{}]", executor);
            }
        }
    
    
    • 拉取配置fetchGroupConfig -> 执行拉取配置doFetchGroupConfig
        private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
            // 从配置的多台服务器遍历拉取配置,如果只要有一台拉取成功,则直接结束遍历
            for (int index = 0; index < this.serverList.size(); index++) {
                String server = serverList.get(index);
                try {
                    this.doFetchGroupConfig(server, groups);
                    break;
                } catch (SoulException e) {
                    // no available server, throw exception.
                    // 如果为最后一台的异常,则向上抛出异常,会中断启动程序
                    if (index >= serverList.size() - 1) {
                        throw e;
                    }
                    log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
                }
            }
        }
    
        private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
            // 即使有多个group的配置,也是一次请求拉取到的
            StringBuilder params = new StringBuilder();
            for (ConfigGroupEnum groupKey : groups) {
                params.append("groupKeys").append("=").append(groupKey.name()).append("&");
            }
            String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
            log.info("request configs: [{}]", url);
            String json = null;
            try {
                // 拉取admin配置
                json = this.httpClient.getForObject(url, String.class);
            } catch (RestClientException e) {
                String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
                log.warn(message);
                throw new SoulException(message, e);
            }
            // update local cache
            // 拉取成功后,则更新本地配置缓存
            boolean updated = this.updateCacheWithJson(json);
            if (updated) {
                log.info("get latest configs: [{}]", json);
                return;
            }
            // not updated. it is likely that the current config server has not been updated yet. wait a moment.
            log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
            // 若从当前配置服务器没有拉到配置,则睡眠30s后再继续执行
            // TODO questtion: 如果每个server都没有拉到配置,然后都会阻塞30s,会导致整个bootstrap启动很慢,是否考虑将启动时拉取配置逻辑异步化?
            ThreadUtils.sleep(TimeUnit.SECONDS, 30);
        }
    
    • 更新内存配置updateCache:调用数据更新的策略工厂
    /**
         * update local cache.
         * @param json the response from config server.
         * @return true: the local cache was updated. false: not updated.
         */
        private boolean updateCacheWithJson(final String json) {
            JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
            JsonObject data = jsonObject.getAsJsonObject("data");
            // if the config cache will be updated?
            // factory为数据更新的策略功能
            return factory.executor(data);
        }
    
    • 更新内存配置updateCache:DataRefreshFactory数据更新策略工厂,工厂会调用不同类型数据的更新操作
        public boolean executor(final JsonObject data) {
            // TODO  question 为什么会用一个奇怪的数组?
            final boolean[] success = {false};
            // 对于不同的数据类型,去进行数据更新;这里使用了并行流,实现并行更新配置
            ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
            return success[0];
        }
    
    • 更新内存配置updateCache:数据更新的类图


      dataRefresh
    • 启动长轮询线程池。参考 start流程后部分
    • 线程池长轮询HttpLongPollingTask
        public void run() {
                // 需判断bootstrap的状态是否正常,避免出现当bootstap停止了,http长轮询的线程还在跑,这是无意义的
                while (RUNNING.get()) {
                    for (int time = 1; time <= retryTimes; time++) {
                        try {
                            // 长轮询失败的情况下,会进行重试,每隔5s重试一次,重试3次
                            doLongPolling(server);
                        } catch (Exception e) {
                            // print warnning log.
                            if (time < retryTimes) {
                                log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                                        time, retryTimes - time, e.getMessage());
                                ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                                continue;
                            }
                            // print error, then suspended for a while.
                            log.error("Long polling failed, try again after 5 minutes!", e);
                            // 3次重试失败后,则先睡眠5min钟,再次去轮询
                            ThreadUtils.sleep(TimeUnit.MINUTES, 5);
                        }
                    }
                }
                log.warn("Stop http long polling.");
            }
    
    • 长轮询逻辑doLongPolling
        private void doLongPolling(final String server) {
            MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
            for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
                ConfigData<?> cacheConfig = factory.cacheConfigData(group);
                // 将缓存的md5值与最后更新时间传递到admin
                String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
                params.put(group.name(), Lists.newArrayList(value));
            }
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
            HttpEntity httpEntity = new HttpEntity(params, headers);
            String listenerUrl = server + "/configs/listener";
            log.debug("request listener configs: [{}]", listenerUrl);
            JsonArray groupJson = null;
            try {
                // 调用admin的配置监听接口,如果admin有配置变更则会返回变更的配置数据类型给到bootstrap
                String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
                log.debug("listener result: [{}]", json);
                groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
            } catch (RestClientException e) {
                String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
                throw new SoulException(message, e);
            }
            if (groupJson != null) {
                // fetch group configuration async.
                // 如果存在结果返回,则自己再去拉取相应配置
                ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
                if (ArrayUtils.isNotEmpty(changedGroups)) {
                    log.info("Group config changed: {}", Arrays.toString(changedGroups));
                    // 拉取有变化的group的配置,这里的group有:APP_AUTH、PLUGIN、RULE、SELECTOR、META_DATA
                    this.doFetchGroupConfig(server, changedGroups);
                }
            }
        }
    

    至此,soul-bootstrap端的http长轮询就分析完了,接下来看看soul-admin

    To be continued ...

    相关文章

      网友评论

          本文标题:soul网关学习10-配置数据同步1-HttpLongPolli

          本文链接:https://www.haomeiwen.com/subject/idluzktx.html