美文网首页
soul网关学习7-dubbo协议转换2

soul网关学习7-dubbo协议转换2

作者: niuxin | 来源:发表于2021-01-21 00:06 被阅读0次

    继续dubbo协议转换未完成的流程分析。

    三、soul-bootstrap端接收配置同步的处理

    1. pom文件引入配置同步的依赖


      sync-data-dependency
    2. 我们这里只分析websocket同步配置的逻辑,相应的自动配置类为org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
    3. 从上图可以看到,websocket数据同步在bootstrap端最核心的类就为org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService#WebsocketSyncDataService
        public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                        final PluginDataSubscriber pluginDataSubscriber,
                                        final List<MetaDataSubscriber> metaDataSubscribers,
                                        final List<AuthDataSubscriber> authDataSubscribers) {
            // 获取soul-admin的地址,多个admin则以,分割
            String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
            // 采用定时调度的线程池,其核心线程数为admin的地址数,且创建的线程为守护线程
            // 该线程池是为了保证bootstrap与admin之间的websocket能一直连接。
            // 因为如果websocket只是在启动的时候建立连接,在运行过程中发生一些异常导致bootstrap与admin之间的连接断开
            // 那么是有必要去进行重连的,否则只要一断,就失去联系了,不科学
            executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
            // 为每一个admin都生成一个websocketClient,与之进行连接
            for (String url : urls) {
                try {
                    clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
                } catch (URISyntaxException e) {
                    log.error("websocket url({}) is error", url, e);
                }
            }
            try {
                // webSocketClient启动连接
                for (WebSocketClient client : clients) {
                    boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                    if (success) {
                        log.info("websocket connection is successful.....");
                    } else {
                        log.error("websocket connection is error.....");
                    }
                    // 与admin之间的websocket定时重连机制
                    // 这里是每隔30s就会去检测websocket连接是否还有效
                    executor.scheduleAtFixedRate(() -> {
                        try {
                            // 如果client状态为已关闭,则需要重新连接
                            if (client.isClosed()) {
                                boolean reconnectSuccess = client.reconnectBlocking();
                                if (reconnectSuccess) {
                                    log.info("websocket reconnect is successful.....");
                                } else {
                                    log.error("websocket reconnection is error.....");
                                }
                            }
                        } catch (InterruptedException e) {
                            log.error("websocket connect is error :{}", e.getMessage());
                        }
                    }, 10, 30, TimeUnit.SECONDS);
                }
                /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
            } catch (InterruptedException e) {
                log.info("websocket connection...exception....", e);
            }
    
        }
    
    1. 从以上代码得知WebsocketSyncDataService还只是解决了websocket的连接机制,至于通过websocket发送的消息,其发送与处理还在org.dromara.soul.plugin.sync.data.weboscket.client.SoulWebsocketClient
      SoulWebsocketClient
    2. 从上图得知与admin数据同步的处理都在org.dromara.soul.plugin.sync.data.weboscket.handler.WebsocketDataHandler
        public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
            /**
             * 对于不同类型的数据,有对应的数据处理器
             * PLUGIN-PluginDataHandler
             * SELECTOR-SelectorDataHandler
             * RULE-RuleDataHandler
             * APP_AUTH-AuthDataHandler
             * META_DATA-MetaDataHandler
             */
            ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
            ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
            ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
            ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
            ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
        }
    
    1. 根据配置类型用相应的处理器来处理
        public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
            // 根据配置类型用相应的处理器来处理
            ENUM_MAP.get(type).handle(json, eventType);
        }
    
    1. 数据处理器使用模版方法设计模式


      AbstractDataHandler
    2. 数据处理器相关类图


      DataHandler-classes
    3. 继续跟踪,发现处理逻辑又来到了org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber
        
        private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
            // 当classData不为空时则执行ifPresent里边的逻辑,这种写法规避NPE
            Optional.ofNullable(classData).ifPresent(data -> {
                if (data instanceof PluginData) {
                    PluginData pluginData = (PluginData) data;
                    if (dataType == DataEventTypeEnum.UPDATE) {
                        BaseDataCache.getInstance().cachePluginData(pluginData);
                        Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
                    } else if (dataType == DataEventTypeEnum.DELETE) {
                        BaseDataCache.getInstance().removePluginData(pluginData);
                        Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
                    }
                } else if (data instanceof SelectorData) {
                    SelectorData selectorData = (SelectorData) data;
                    if (dataType == DataEventTypeEnum.UPDATE) {
                        BaseDataCache.getInstance().cacheSelectData(selectorData);
                        Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                    } else if (dataType == DataEventTypeEnum.DELETE) {
                        BaseDataCache.getInstance().removeSelectData(selectorData);
                        Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                    }
                } else if (data instanceof RuleData) {
                    RuleData ruleData = (RuleData) data;
                    if (dataType == DataEventTypeEnum.UPDATE) {
                        BaseDataCache.getInstance().cacheRuleData(ruleData);
                        Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
                    } else if (dataType == DataEventTypeEnum.DELETE) {
                        BaseDataCache.getInstance().removeRuleData(ruleData);
                        Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
                    }
                }
            });
        }
    
    1. 当前已有的插件处理类图


      已有的插件处理类图

    TODO

    四、soul-bootstrap接收Http请求

    五、soul-bootstrap解析Http请求并转换成后端dubbo服务调用

    相关文章

      网友评论

          本文标题:soul网关学习7-dubbo协议转换2

          本文链接:https://www.haomeiwen.com/subject/rwwkzktx.html