美文网首页
soul网关学习7-dubbo协议转换2

soul网关学习7-dubbo协议转换2

作者: niuxin | 来源:发表于2021-01-21 00:06 被阅读0次

继续dubbo协议转换未完成的流程分析。

三、soul-bootstrap端接收配置同步的处理

  1. pom文件引入配置同步的依赖


    sync-data-dependency
  2. 我们这里只分析websocket同步配置的逻辑,相应的自动配置类为org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
  3. 从上图可以看到,websocket数据同步在bootstrap端最核心的类就为org.dromara.soul.plugin.sync.data.weboscket.WebsocketSyncDataService#WebsocketSyncDataService
    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        // 获取soul-admin的地址,多个admin则以,分割
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        // 采用定时调度的线程池,其核心线程数为admin的地址数,且创建的线程为守护线程
        // 该线程池是为了保证bootstrap与admin之间的websocket能一直连接。
        // 因为如果websocket只是在启动的时候建立连接,在运行过程中发生一些异常导致bootstrap与admin之间的连接断开
        // 那么是有必要去进行重连的,否则只要一断,就失去联系了,不科学
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        // 为每一个admin都生成一个websocketClient,与之进行连接
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            // webSocketClient启动连接
            for (WebSocketClient client : clients) {
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                // 与admin之间的websocket定时重连机制
                // 这里是每隔30s就会去检测websocket连接是否还有效
                executor.scheduleAtFixedRate(() -> {
                    try {
                        // 如果client状态为已关闭,则需要重新连接
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error.....");
                            }
                        }
                    } catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }

    }
  1. 从以上代码得知WebsocketSyncDataService还只是解决了websocket的连接机制,至于通过websocket发送的消息,其发送与处理还在org.dromara.soul.plugin.sync.data.weboscket.client.SoulWebsocketClient
    SoulWebsocketClient
  2. 从上图得知与admin数据同步的处理都在org.dromara.soul.plugin.sync.data.weboscket.handler.WebsocketDataHandler
    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers,
                                final List<AuthDataSubscriber> authDataSubscribers) {
        /**
         * 对于不同类型的数据,有对应的数据处理器
         * PLUGIN-PluginDataHandler
         * SELECTOR-SelectorDataHandler
         * RULE-RuleDataHandler
         * APP_AUTH-AuthDataHandler
         * META_DATA-MetaDataHandler
         */
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
    }
  1. 根据配置类型用相应的处理器来处理
    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        // 根据配置类型用相应的处理器来处理
        ENUM_MAP.get(type).handle(json, eventType);
    }
  1. 数据处理器使用模版方法设计模式


    AbstractDataHandler
  2. 数据处理器相关类图


    DataHandler-classes
  3. 继续跟踪,发现处理逻辑又来到了org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber
    
    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        // 当classData不为空时则执行ifPresent里边的逻辑,这种写法规避NPE
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cachePluginData(pluginData);
                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removePluginData(pluginData);
                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
                }
            } else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
                }
            } else if (data instanceof RuleData) {
                RuleData ruleData = (RuleData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    BaseDataCache.getInstance().cacheRuleData(ruleData);
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    BaseDataCache.getInstance().removeRuleData(ruleData);
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
                }
            }
        });
    }
  1. 当前已有的插件处理类图


    已有的插件处理类图

TODO

四、soul-bootstrap接收Http请求

五、soul-bootstrap解析Http请求并转换成后端dubbo服务调用

相关文章

  • soul网关学习7-dubbo协议转换2

    继续dubbo协议转换未完成的流程分析。 三、soul-bootstrap端接收配置同步的处理 pom文件引入配置...

  • soul网关学习6-dubbo协议转换1

    我们知道协议转换也是API网关常见的一个功能,这次我们看下soul网关是如何实现协议转换的。 请求流图 大致流程:...

  • soul网关学习8-dubbo协议转换3

    继续上篇分析,这里只分析dubbo插件是如何处理配置数据的。 dubbo插件配置数据处理器 回到源码org.dro...

  • soul网关学习9-dubbo协议转换4

    继续上篇 从之前的3篇文章中,我们可以知道,我们的dubbo服务启动,会将服务注册到网关soul-admin,并生...

  • 网关的作用

    网关定义 1.网关又称网间连接器,协议转换器2.网关在网络层以上实现网络互联,是最复杂的网络连接设备3.网关既可以...

  • Soul网关限流插件Sentinel和Resilience4J扫

    Soul网关限流插件Sentinel和Resilience4J扫盲 Soul网关限流插件Sentinel扫盲 首先...

  • Soul网关使用感受

    阶段性 Soul网关使用感受 通过一段时间的Soul网关的接触,总结了一些我认为Soul网关存在的优势 使用上 使...

  • 2020-02-14 网关

    网关的定义 网关(Gateway)又称网间连接器、协议转换器。顾名思义,网关(Gateway)就是一个网络连接到另...

  • 网关调研

    1. 什么是网关? 网关(Gateway)又称网间连接器、协议转换器。网关在传输层上以实现网络互连,是最复杂的网络...

  • Soul网关的探活--基于zookeeper同步数据的解析

    Soul网关的探活--基于zookeeper同步数据的解析 Soul网关的探活,主要分为两部分,第一部分是soul...

网友评论

      本文标题:soul网关学习7-dubbo协议转换2

      本文链接:https://www.haomeiwen.com/subject/rwwkzktx.html