美文网首页
Soul源码阅读 数据同步之WebSocket同步【第七天】

Soul源码阅读 数据同步之WebSocket同步【第七天】

作者: cutieagain | 来源:发表于2021-01-22 01:22 被阅读0次

首先,WebSocket同步使用了WebsocketSyncDataConfiguration的配置

    // 加载了对应的websocket配置,插件变动订阅者,元数据变动订阅者,权限变动订阅者
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use websocket sync soul data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

WebsocketSyncDataService 用WebSocket来同步配置信息的服务。使用了ObjectProvider加载需要使用的对象。连接的时候如果没有连接上,会进行重连,进行同步信息。


image.png

SyncDataService:所有同步方法需要实现
AutoCloseable:实现close方法执行关闭操作

public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        // 获取注册的url列表
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        // 线程池
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        // 新增插件订阅Websocket订阅者信息
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            for (WebSocketClient client : clients) {
                // Websocket连接,30s
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                // 定时连接同步配置信息
                executor.scheduleAtFixedRate(() -> {
                    try {
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error.....");
                            }
                        }
                    } catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }

    }

WebSocketPlugin的加载时机是在DividePluginConfiguration

    @Bean
    public WebSocketPlugin webSocketPlugin(final WebSocketClient webSocketClient, final WebSocketService webSocketService) {
        return new WebSocketPlugin(webSocketClient, webSocketService);
    }
    @Override
    // 使用react执行同步信息
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        // 获取注册的DivideUpstream信息
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        // 获取soulContext
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        // 如果upstreamList是空的或者获取soulContext是空的,返回错误信息
        if (CollectionUtils.isEmpty(upstreamList) || Objects.isNull(soulContext)) {
            log.error("divide upstream configuration error:{}", rule.toString());
            return chain.execute(exchange);
        }
        // 规则
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        // 获取主机ip
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        // 获取负载均衡的divideUpstream信息
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        // 如果divideUpstream为空
        if (Objects.isNull(divideUpstream)) {
            log.error("websocket has no upstream");
            // 找不到对应的divideUpstream
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 获取divideUpstream中的请求url
        URI wsRequestUrl = UriComponentsBuilder.fromUri(URI.create(buildWsRealPath(divideUpstream, soulContext))).build().toUri();
        log.info("you websocket urlPath is :{}", wsRequestUrl.toASCIIString());
        // 获取exchange中的headers
        HttpHeaders headers = exchange.getRequest().getHeaders();
        // 执行对应的注册方法
        return this.webSocketService.handleRequest(exchange, new SoulWebSocketHandler(
                wsRequestUrl, this.webSocketClient, filterHeaders(headers), buildWsProtocols(headers)));
    }
 // 新增插件订阅Websocket订阅者信息
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));

SoulWebsocketClient中接收到订阅的信息进行处理

@SuppressWarnings("ALL")
    private void handleResult(final String result) {
        // SoulWebsocketClient中接收到订阅的信息进行处理,结果转成websocketData
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        // 获取同步的配置的类型
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        // 获取事件类型,即操作类型:DataEventTypeEnum :DELETE CREATE UPDATE REFRESH MYSELF
        String eventType = websocketData.getEventType();
        // 获取各类同步的数据
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 执行同步方法
        websocketDataHandler.executor(groupEnum, json, eventType);
    }

WebsocketDataHandler

    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers,
                                final List<AuthDataSubscriber> authDataSubscribers) {
        // 存放配置类型
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
    }

    /**
     * Executor.
     *
     * @param type      the type
     * @param json      the json
     * @param eventType the event type
     */
    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        // 调用配置中的handle方法进行同步
        ENUM_MAP.get(type).handle(json, eventType);
    }

相关文章

网友评论

      本文标题:Soul源码阅读 数据同步之WebSocket同步【第七天】

      本文链接:https://www.haomeiwen.com/subject/stgjzktx.html