美文网首页
soul网关学习11-配置数据同步1-HttpLongPolli

soul网关学习11-配置数据同步1-HttpLongPolli

作者: niuxin | 来源:发表于2021-01-24 22:40 被阅读0次

    在上篇中我们分析了配置数据同步中HttpLongPollingsoul-bootstrap端的源码分析。在这一篇中,我们会分析soul-admin端的源码。
    进入正题。。。

    找切入点

    • soul-bootstrap端在长轮询中调用了soul-admin的两个接口:
    # 拉取特定类型的配置
    /configs/fetch
    # 配置变更的监听
    /configs/listener
    
    • 全局搜/configs是怎么提供的服务
      search-cibfugs
    • 我们定位到org.dromara.soul.admin.controller.ConfigController
      ConfigController

    拉取配置fetchConfigs

    分析

    • org.dromara.soul.admin.listener.AbstractDataChangedListener
        public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
            // 配置数据的缓存
            ConfigDataCache config = CACHE.get(groupKey.name());
            // 不同类型则传入对应类型,返回configData
            switch (groupKey) {
                case APP_AUTH:
                    List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
                    }.getType());
                    // 对于每次的数据更新都有记录cache的md5值,最后更新时间
                    return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
                case PLUGIN:
                    List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
                    }.getType());
                    return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
                case RULE:
                    List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
                    }.getType());
                    return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
                case SELECTOR:
                    List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
                    }.getType());
                    return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
                case META_DATA:
                    List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
                    }.getType());
                    return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
                default:
                    throw new IllegalStateException("Unexpected groupKey: " + groupKey);
            }
        }
    
    • 这里注意到,对应配置数据是直接从内存cache中拿的,那什么时候将配置数据放到内存cache的?
    • 先来寻找cache的使用情况
      AbstractDataChangedListener.cache
    • 找到updateCache
        protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
            String json = GsonUtils.getInstance().toJson(data);
            ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
            ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
            log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
        }
    
    • 看起来这里没啥东西,没有找到出处;继续找updateCache的使用之处
      updateCache.usage
    • 点进去看一个,到updateSelectorCache,再继续往上找onSelectorChanged,再到org.dromara.soul.admin.listener.DataChangedEventDispatcher
      DataChangedEventDispatcher
    • DataChangedEventDispatcher使用了spring的内存应用事件机制,为事件消费端,再找下事件发布端
      DataChangedEventDispatcher.event
    • 查找关键字DataChangedEvent,看下事件发布的地方
    • 差不多可以了,找到了源头的地方,下面总结一下

    总结

    1. ConfigController提供接口配置获取/configs/fetch,供soul-bootstrap调用
    2. http长轮询数据变更监听器HttpLongPollingDataChangedListener,提供fetchConfig方法,其中,所有配置数据是存放在其成员变量cache中的;拉取特定类型的配置,只需要从cache中取出来就行了
    3. 关于配置数据的存放,则是用户在soul-admin的web界面,对配置数据更新时,会通过spring的应用事件机制,将变更的数据发布出来,事件为DataChangedEvent;而监听器端则监听DataChangedEvent事件,实现对应数据变更的存放
    4. 上述是增量数据的处理;
    5. 全量数据是如何加载到cache中的?
    6. 仔细看HttpLongPollingDataChangedListener,发现在实例化的过程中,会创建一个定时任务线程池,其提供一个后台守护线程,默认情况下会每隔5min钟会从数据库中拉取配置数据加载到内存。
       /**
        * Instantiates a new Http long polling data changed listener.
        * @param httpSyncProperties the HttpSyncProperties
        */
       public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
           this.clients = new ArrayBlockingQueue<>(1024);
           // 后台定期reload数据库配置数据的线程池
           this.scheduler = new ScheduledThreadPoolExecutor(1,
                   SoulThreadFactory.create("long-polling", true));
           this.httpSyncProperties = httpSyncProperties;
       }
    
       @Override
       protected void afterInitialize() {
           long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
           // Periodically check the data for changes and update the cache
           // 启动这个定时任务线程池,用于reload数据库配置到本地缓存
           scheduler.scheduleWithFixedDelay(() -> {
               log.info("http sync strategy refresh config start.");
               try {
                   this.refreshLocalCache();
                   log.info("http sync strategy refresh config success.");
               } catch (Exception e) {
                   log.error("http sync strategy refresh config error!", e);
               }
           }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
           log.info("http sync strategy refresh interval: {}ms", syncInterval);
       }
    
       private void refreshLocalCache() {
           this.updateAppAuthCache();
           this.updatePluginCache();
           this.updateRuleCache();
           this.updateSelectorCache();
           this.updateMetaDataCache();
       }
    
    • 该操作只会reload,并不会生成update的事件,通知给soul-bootstrap
      现在就只剩下一个问题了,当本地缓存数据有更新时,是如何通知到soul-bootstrap的呢?下面我们来分析这个问题。

    配置变更的监听与响应

    分析

    • 我们知道soul-bootstrap是通过回调长轮询的方式完成配置的监听,那实际上我们只要跟踪监听的接口逻辑就行
    • 监听接口/config/listener中调用HttpLongPollingDataChangedListener.doLongPolling
    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    
            // compare group md5
            // 根据监听传入的md5与更新时间戳找到变化的配置数据
            List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
            String clientIp = getRemoteIp(request);
    
            // response immediately.
            // 如果此次存在变化的配置数据,则直接响应请求,将变化的配置类型返回给soul-bootstrap
            if (CollectionUtils.isNotEmpty(changedGroup)) {
                this.generateResponse(response, changedGroup);
                log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
                return;
            }
    
            // listen for configuration changed.
            // 否则将当前请求异步化
            final AsyncContext asyncContext = request.startAsync();
    
            // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
            // 不设置超时
            asyncContext.setTimeout(0L);
    
            // block client's thread.
            // 通过调度线程池去执行监听长轮询任务,这里的execute是立即执行的
            scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
        }
    
    • 接入的请求会开启异步(servelet3.0支持),并将其封装成长轮询客户端LongPollingClient后丢给调度线程池,并立即执行
    • LongPollingClient中的run方法有点精巧,里边的执行逻辑并没有立即执行,而是先丢给调度线程池,并延迟60s执行;同时LongPollingClient会添加到长轮询队列clients
           public void run() {
                // 这里并没有立即执行,会将其丢到调度线程池,延迟60s执行
                this.asyncTimeoutFuture = scheduler.schedule(() -> {
                    // 执行时,先将当前长轮询的client从长轮询队列队列中移除
                    clients.remove(LongPollingClient.this);
                    // 检查是否存在变更的配置
                    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
                    // 返回结果
                    sendResponse(changedGroups);
                }, timeoutTime, TimeUnit.MILLISECONDS);
                // 将当前长轮询的client放入长轮询队列中
                clients.add(this);
            }
    
    • 上述做法的目的是,在这延迟的60s中,如果有配置变更产生,则会由配置变更的任务DataChangeTask,遍历现有的长轮询队列clients,依次移除,并完成LongPollingClient的返回结果设置,将异步化的请求操作完结掉;
            public void run() {
                // 遍历当前所有正在长轮询的client,将变更的数据作为此次轮询响应的结果返回给长轮询的client
                //TODO question 这里是否会存在配置丢失的情况?
                // 如果两次间隔很近的配置变更过来,第一次配置变更还在返回给client,此时的client并没有重新轮询进来,
                // 则会导致第二次配置变更没有通知到第一次已通知的client,从而使得某些client节点丢失配置
                // 在admin是集群的情况下,该数据同步机制可能更不可靠
                for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
                    // 从长轮询队列中移除client
                    LongPollingClient client = iter.next();
                    iter.remove();
                    // 并将变更的数据返回给长轮询client
                    client.sendResponse(Collections.singletonList(groupKey));
                    log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
                }
            }
    
    • LongPollingClient 的返回结果设置
           void sendResponse(final List<ConfigGroupEnum> changedGroups) {
                // cancel scheduler
                // 如果在延迟60s的窗口中,存在配置变更的数据,则会提前结束,把变更的数据给到长轮询client;
                // 这里的asyncTimeoutFuture便会为空,从而可以取消当前延迟执行的任务
                if (null != asyncTimeoutFuture) {
                    asyncTimeoutFuture.cancel(false);
                }
                generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
                asyncContext.complete();
            }
    
    • 分析结束,做下总结

    总结

    1. soul-bootstrapHttpLongPollingTask中采用请求回调轮询的方式,去轮询soul-admin中的配置监听接口/configs/listener,其中每次请求的超时时间为90s
    2. soul-admin中通过HttpLongPollingDataChangedListener.doLongPolling方法开启请求的异步支持request.startAsync(),避免阻塞住soul-admin端的请求Acceptor线程;
    3. 将异步化请求AsyncContext封装成长轮询客户端任务LongPollingClient,通过调度线程池scheduler执行。
    4. 长轮询客户端任务LongPollingClientrun方法,将自身逻辑丢给调度线程池scheduler延迟60s执行,这样就实现请求/configs/listener至少会保持60s
    5. 长轮询客户端任务LongPollingClient还会将自身加入到长轮询客户端队列clients
    6. 如果在请求保持的60s中,存在有配置变更产生(产生来源是用户在配置web界面操作,包括对插件、选择器、规则的增删改,此类操作会自动触发配置变更事件;还有web端提供的一些强制同步功能,如各个插件中的同步、插件管理的同步,也会产生配置变更事件),则会由数据变更任务DataChangeTask,遍历现有的长轮询队列clients,依次移除,并完成长轮询客户端任务LongPollingClient的返回结果设置,将异步化的请求操作完结掉
    7. 上述就是soul-boostrapsoul-admin之间,http长轮询HttpLongPolling同步方式的配置监听与响应的大致流程。

    相关文章

      网友评论

          本文标题:soul网关学习11-配置数据同步1-HttpLongPolli

          本文链接:https://www.haomeiwen.com/subject/sjjmzktx.html