在上篇中我们分析了配置数据同步中HttpLongPolling
,soul-bootstrap
端的源码分析。在这一篇中,我们会分析soul-admin
端的源码。
进入正题。。。
找切入点
-
soul-bootstrap
端在长轮询中调用了soul-admin
的两个接口:
# 拉取特定类型的配置
/configs/fetch
# 配置变更的监听
/configs/listener
- 全局搜
/configs
是怎么提供的服务
search-cibfugs - 我们定位到
org.dromara.soul.admin.controller.ConfigController
ConfigController
拉取配置fetchConfigs
分析
org.dromara.soul.admin.listener.AbstractDataChangedListener
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
// 配置数据的缓存
ConfigDataCache config = CACHE.get(groupKey.name());
// 不同类型则传入对应类型,返回configData
switch (groupKey) {
case APP_AUTH:
List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
}.getType());
// 对于每次的数据更新都有记录cache的md5值,最后更新时间
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
case PLUGIN:
List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
case RULE:
List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
case SELECTOR:
List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
case META_DATA:
List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
default:
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
- 这里注意到,对应配置数据是直接从内存cache中拿的,那什么时候将配置数据放到内存cache的?
- 先来寻找
cache
的使用情况
AbstractDataChangedListener.cache - 找到
updateCache
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
- 看起来这里没啥东西,没有找到出处;继续找
updateCache
的使用之处
updateCache.usage - 点进去看一个,到
updateSelectorCache
,再继续往上找onSelectorChanged
,再到org.dromara.soul.admin.listener.DataChangedEventDispatcher
DataChangedEventDispatcher -
DataChangedEventDispatcher
使用了spring的内存应用事件机制,为事件消费端,再找下事件发布端
DataChangedEventDispatcher.event - 查找关键字
DataChangedEvent
,看下事件发布的地方
- 差不多可以了,找到了源头的地方,下面总结一下
总结
-
ConfigController
提供接口配置获取/configs/fetch
,供soul-bootstrap
调用 - http长轮询数据变更监听器
HttpLongPollingDataChangedListener
,提供fetchConfig
方法,其中,所有配置数据是存放在其成员变量cache
中的;拉取特定类型的配置,只需要从cache
中取出来就行了 - 关于配置数据的存放,则是用户在
soul-admin
的web界面,对配置数据更新时,会通过spring
的应用事件机制,将变更的数据发布出来,事件为DataChangedEvent
;而监听器端则监听DataChangedEvent
事件,实现对应数据变更的存放 - 上述是增量数据的处理;
- 全量数据是如何加载到
cache
中的? - 仔细看
HttpLongPollingDataChangedListener
,发现在实例化的过程中,会创建一个定时任务线程池,其提供一个后台守护线程,默认情况下会每隔5min钟会从数据库中拉取配置数据加载到内存。
/**
* Instantiates a new Http long polling data changed listener.
* @param httpSyncProperties the HttpSyncProperties
*/
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
this.clients = new ArrayBlockingQueue<>(1024);
// 后台定期reload数据库配置数据的线程池
this.scheduler = new ScheduledThreadPoolExecutor(1,
SoulThreadFactory.create("long-polling", true));
this.httpSyncProperties = httpSyncProperties;
}
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
// 启动这个定时任务线程池,用于reload数据库配置到本地缓存
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}
- 该操作只会
reload
,并不会生成update
的事件,通知给soul-bootstrap
现在就只剩下一个问题了,当本地缓存数据有更新时,是如何通知到soul-bootstrap
的呢?下面我们来分析这个问题。
配置变更的监听与响应
分析
- 我们知道
soul-bootstrap
是通过回调长轮询的方式完成配置的监听,那实际上我们只要跟踪监听的接口逻辑就行 - 监听接口
/config/listener
中调用HttpLongPollingDataChangedListener.doLongPolling
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
// 根据监听传入的md5与更新时间戳找到变化的配置数据
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
// 如果此次存在变化的配置数据,则直接响应请求,将变化的配置类型返回给soul-bootstrap
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// listen for configuration changed.
// 否则将当前请求异步化
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
// 不设置超时
asyncContext.setTimeout(0L);
// block client's thread.
// 通过调度线程池去执行监听长轮询任务,这里的execute是立即执行的
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
- 接入的请求会开启异步(
servelet3.0
支持),并将其封装成长轮询客户端LongPollingClient
后丢给调度线程池,并立即执行 -
LongPollingClient
中的run
方法有点精巧,里边的执行逻辑并没有立即执行,而是先丢给调度线程池,并延迟60s执行;同时LongPollingClient
会添加到长轮询队列clients
中
public void run() {
// 这里并没有立即执行,会将其丢到调度线程池,延迟60s执行
this.asyncTimeoutFuture = scheduler.schedule(() -> {
// 执行时,先将当前长轮询的client从长轮询队列队列中移除
clients.remove(LongPollingClient.this);
// 检查是否存在变更的配置
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
// 返回结果
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
// 将当前长轮询的client放入长轮询队列中
clients.add(this);
}
- 上述做法的目的是,在这延迟的60s中,如果有配置变更产生,则会由配置变更的任务
DataChangeTask
,遍历现有的长轮询队列clients
,依次移除,并完成LongPollingClient
的返回结果设置,将异步化的请求操作完结掉;
public void run() {
// 遍历当前所有正在长轮询的client,将变更的数据作为此次轮询响应的结果返回给长轮询的client
//TODO question 这里是否会存在配置丢失的情况?
// 如果两次间隔很近的配置变更过来,第一次配置变更还在返回给client,此时的client并没有重新轮询进来,
// 则会导致第二次配置变更没有通知到第一次已通知的client,从而使得某些client节点丢失配置
// 在admin是集群的情况下,该数据同步机制可能更不可靠
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
// 从长轮询队列中移除client
LongPollingClient client = iter.next();
iter.remove();
// 并将变更的数据返回给长轮询client
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
-
LongPollingClient
的返回结果设置
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// cancel scheduler
// 如果在延迟60s的窗口中,存在配置变更的数据,则会提前结束,把变更的数据给到长轮询client;
// 这里的asyncTimeoutFuture便会为空,从而可以取消当前延迟执行的任务
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
- 分析结束,做下总结
总结
-
soul-bootstrap
在HttpLongPollingTask
中采用请求回调轮询的方式,去轮询soul-admin
中的配置监听接口/configs/listener
,其中每次请求的超时时间为90s
-
soul-admin
中通过HttpLongPollingDataChangedListener.doLongPolling
方法开启请求的异步支持request.startAsync()
,避免阻塞住soul-admin
端的请求Acceptor
线程; - 将异步化请求
AsyncContext
封装成长轮询客户端任务LongPollingClient
,通过调度线程池scheduler
执行。 - 长轮询客户端任务
LongPollingClient
的run
方法,将自身逻辑丢给调度线程池scheduler
延迟60s
执行,这样就实现请求/configs/listener
至少会保持60s
- 长轮询客户端任务
LongPollingClient
还会将自身加入到长轮询客户端队列clients
中 - 如果在请求保持的
60s
中,存在有配置变更产生(产生来源是用户在配置web界面操作,包括对插件、选择器、规则的增删改,此类操作会自动触发配置变更事件;还有web端提供的一些强制同步功能,如各个插件中的同步、插件管理的同步,也会产生配置变更事件),则会由数据变更任务DataChangeTask
,遍历现有的长轮询队列clients
,依次移除,并完成长轮询客户端任务LongPollingClient
的返回结果设置,将异步化的请求操作完结掉 - 上述就是
soul-boostrap
与soul-admin
之间,http
长轮询HttpLongPolling
同步方式的配置监听与响应的大致流程。
网友评论