首先,WebSocket同步使用了WebsocketSyncDataConfiguration的配置
// 加载了对应的websocket配置,插件变动订阅者,元数据变动订阅者,权限变动订阅者
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
WebsocketSyncDataService 用WebSocket来同步配置信息的服务。使用了ObjectProvider加载需要使用的对象。连接的时候如果没有连接上,会进行重连,进行同步信息。
image.png
SyncDataService:所有同步方法需要实现
AutoCloseable:实现close方法执行关闭操作
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 获取注册的url列表
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
// 线程池
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
// 新增插件订阅Websocket订阅者信息
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
// Websocket连接,30s
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
// 定时连接同步配置信息
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
WebSocketPlugin的加载时机是在DividePluginConfiguration
@Bean
public WebSocketPlugin webSocketPlugin(final WebSocketClient webSocketClient, final WebSocketService webSocketService) {
return new WebSocketPlugin(webSocketClient, webSocketService);
}
@Override
// 使用react执行同步信息
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 获取注册的DivideUpstream信息
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
// 获取soulContext
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
// 如果upstreamList是空的或者获取soulContext是空的,返回错误信息
if (CollectionUtils.isEmpty(upstreamList) || Objects.isNull(soulContext)) {
log.error("divide upstream configuration error:{}", rule.toString());
return chain.execute(exchange);
}
// 规则
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
// 获取主机ip
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
// 获取负载均衡的divideUpstream信息
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
// 如果divideUpstream为空
if (Objects.isNull(divideUpstream)) {
log.error("websocket has no upstream");
// 找不到对应的divideUpstream
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 获取divideUpstream中的请求url
URI wsRequestUrl = UriComponentsBuilder.fromUri(URI.create(buildWsRealPath(divideUpstream, soulContext))).build().toUri();
log.info("you websocket urlPath is :{}", wsRequestUrl.toASCIIString());
// 获取exchange中的headers
HttpHeaders headers = exchange.getRequest().getHeaders();
// 执行对应的注册方法
return this.webSocketService.handleRequest(exchange, new SoulWebSocketHandler(
wsRequestUrl, this.webSocketClient, filterHeaders(headers), buildWsProtocols(headers)));
}
// 新增插件订阅Websocket订阅者信息
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
SoulWebsocketClient中接收到订阅的信息进行处理
@SuppressWarnings("ALL")
private void handleResult(final String result) {
// SoulWebsocketClient中接收到订阅的信息进行处理,结果转成websocketData
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
// 获取同步的配置的类型
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 获取事件类型,即操作类型:DataEventTypeEnum :DELETE CREATE UPDATE REFRESH MYSELF
String eventType = websocketData.getEventType();
// 获取各类同步的数据
String json = GsonUtils.getInstance().toJson(websocketData.getData());
// 执行同步方法
websocketDataHandler.executor(groupEnum, json, eventType);
}
WebsocketDataHandler
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
// 存放配置类型
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
/**
* Executor.
*
* @param type the type
* @param json the json
* @param eventType the event type
*/
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
// 调用配置中的handle方法进行同步
ENUM_MAP.get(type).handle(json, eventType);
}
网友评论