继续dubbo协议转换未完成的流程分析。
三、soul-bootstrap端接收配置同步的处理
-
pom文件引入配置同步的依赖
sync-data-dependency - 我们这里只分析websocket同步配置的逻辑,相应的自动配置类为
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
- 从上图可以看到,
websocket
数据同步在bootstrap端最核心的类就为org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService#WebsocketSyncDataService
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 获取soul-admin的地址,多个admin则以,分割
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 采用定时调度的线程池,其核心线程数为admin的地址数,且创建的线程为守护线程
// 该线程池是为了保证bootstrap与admin之间的websocket能一直连接。
// 因为如果websocket只是在启动的时候建立连接,在运行过程中发生一些异常导致bootstrap与admin之间的连接断开
// 那么是有必要去进行重连的,否则只要一断,就失去联系了,不科学
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
// 为每一个admin都生成一个websocketClient,与之进行连接
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
// webSocketClient启动连接
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
// 与admin之间的websocket定时重连机制
// 这里是每隔30s就会去检测websocket连接是否还有效
executor.scheduleAtFixedRate(() -> {
try {
// 如果client状态为已关闭,则需要重新连接
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
- 从以上代码得知
WebsocketSyncDataService
还只是解决了websocket的连接机制,至于通过websocket发送的消息,其发送与处理还在org.dromara.soul.plugin.sync.data.weboscket.client.SoulWebsocketClient
中
SoulWebsocketClient - 从上图得知与admin数据同步的处理都在
org.dromara.soul.plugin.sync.data.weboscket.handler.WebsocketDataHandler
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
/**
* 对于不同类型的数据,有对应的数据处理器
* PLUGIN-PluginDataHandler
* SELECTOR-SelectorDataHandler
* RULE-RuleDataHandler
* APP_AUTH-AuthDataHandler
* META_DATA-MetaDataHandler
*/
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
- 根据配置类型用相应的处理器来处理
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
// 根据配置类型用相应的处理器来处理
ENUM_MAP.get(type).handle(json, eventType);
}
-
数据处理器使用模版方法设计模式
AbstractDataHandler -
数据处理器相关类图
DataHandler-classes - 继续跟踪,发现处理逻辑又来到了
org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
// 当classData不为空时则执行ifPresent里边的逻辑,这种写法规避NPE
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
-
当前已有的插件处理类图
已有的插件处理类图
网友评论