美文网首页
[Soul 源码之旅] 1.4 Soul数据同步

[Soul 源码之旅] 1.4 Soul数据同步

作者: AndyWei123 | 来源:发表于2021-01-18 21:56 被阅读0次

    1.4.1 数据同步方式

    我们知道Soul数据同步可以配置为 Websocket,Http 长轮询,Zookeeper 和 Nacos 这四种方式,我们从更新Selector的接口作为切入点,根据调用链一步步分析各种数据同步方式。

    1.4.1 通用处理链路

    在决定使用那种方式发送前都会经过一个通用的分配流程,我们从新Selector的接口出发,SelectorController 的 updateSelector 方法会被端调用进行服务更新。这里调用了 SelectorServiceImpl服务进行更新。


    image.png

    这里先执行数据库操作,将对应的Selector更新,然后将旧的rule删除,然后再逐条插入新的规则,最终调用到了eventPubliser的publishEvent方法,这里使用了SpringBoot的消息发布机制,最终发布了一个DataChangedEvent类型的消息,我们接着看消息消费者 DataChangedEventDispatcher 。


    image.png
    DataChangedEventDispatcher 实现了 ApplicationListener<DataChangedEvent> 这个接口,它重写了OnApplicationEvent方法,该方法就是接收消息的方法。同时 DataChangedEventDispatcher 也实现了InitializingBean 接口,他会在所有的Bean都准备就绪后触发 afterPropertiesSet 方法。
    image.png

    我们看它主要就是查询里面所有实现了 DataChangedListener 方法的实现类,其主要实现类有如下几个。其实就是我们接下来要说的几种数据同步方式。


    image.png
    我们回过头来看一下 OnApplicationEvent 做了什么,我们可以看到,它主要是轮询所有注册了的Listener,然后根据不同的Group Key 调用对应的方法。listener的方法。
    image.png
    我们看这里的listener是如何注册进ApplicationContext的呢。我们看它被引用的地方 DataSyncConfiguration ,这里根据我们的配置进行对应的加载,我们看一下websocket的流程:但开启soul.sync.websocket.enabled 的时候会进行bean的加载。这里包括三部分,WebsocketDataChangedListener 即我们刚才被调用的listener;WebsocketCollector 这是个WebSocket 的客户端,主要负责收发数据;ServerEndpointExporter 这个是Spring-WebSocket的一个类,他也实现了SmartInitializingSingleton 接口,他会在每个单例bean注册完成后对其进行扫描是否使用了 ServerEndpoint 注解,然后将该类注入到serverContainer中。
    image.png

    1.4.2 websocket Server 端更新流程

    由于我们是更新selector , 所以会调用listener 的 onSelectorChanged 方法,这里最终还是调用了WebsocketCollector 的send 方法。


    源码

    我们可以看出来,在Spring中使用WebSocket非常简单,我们只需要使用ServerEndpoint 这个注解定义这个是websocket处理类,OnOpen 这个注解类定义在websocket链接建立后生产对应的session然后回调这个服务,这里将session作为静态属性持有,OnMessage 接收消息处理方法,onClose 链接关闭处理方法。


    image.png
    我们可以看到Souladmin这里会持有所有的WebSocket链接。
    image.png

    在发送的时候会给所有的客户端进行发送,这里还有对自定义类型的处理。


    发送

    1.4.2 websocket Client 端

    我们再来看看接收端,我们可以看到使用websocket同步数据,BootStrap需要使用以下依赖

            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
                <version>${project.version}</version>
            </dependency>
    

    我们找到该项目,这是一个标准的springboot-starter,这里定义个一个自动配置类。


    image.png

    我们来看一下配置类 WebsocketSyncDataConfiguration, 它主要是初始化了两个Bean 一个是 websocketSyncDataService 即为数据同步服务, 一个是 websocketConfig 即websocket的一些配置项。这里会将以soul.sync.websocket 开头的配置注入到这个类中,这里只有一个属性 urls。


    image.png
    websocketSyncDataService 这里使用到了 ObjectProvider 这个是SpringFactory到一个扩展点,我们知道在一个方法前Autowrite也可以实现注入,但是假如存在多个参数类是以这里到常见到话Autowrite就懵,
    • 如果注入实例为空时,使用ObjectProvider则避免了强依赖导致的依赖对象不存在异常;主要是 getIfAvailable 这个方法。
    • 如果有多个实例,ObjectProvider的方法会根据Bean实现的Ordered接口或@Order注解指定的先后顺序获取一个Bean。从而了提供了一个更加宽松的依赖注入方式。主要通过 orderedStream 方法返回一个根据order注解的一个bean 的 stream。
        @Bean
        public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                               final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
            log.info("you use websocket sync soul data.......");
            return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                    metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
        }
    

    我们根据debug数据看一下主要注入的参数,PluginDataSubscriber 这个主要是各个插件的Handler 主要负责插件内的数据更新。metaDataSubscribers 元数据订阅者,authDataSubscribers 认证数据订阅。


    image.png

    这里会根据websocket的配置生成一个SoulWebsocketClient 客户端,然后尝试链接服务端。


    image.png
    这里会触发服务端的OnMessage方法,然后触发SysnAll方法,向客户端发送所有数据。我们看调用栈,这里会包含所有信息同步给客户端。
    synAll

    这里包括了插件数据,selector 数据和 rule 数据。

        @Override
        public boolean syncAll(final DataEventTypeEnum type) {
            appAuthService.syncData();
            List<PluginData> pluginDataList = pluginService.listAll();
            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
            List<SelectorData> selectorDataList = selectorService.listAll();
            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
            List<RuleData> ruleDataList = ruleService.listAll();
            eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
            metaDataService.syncData();
            return true;
        }
    

    我们再看看SoulWebsocketClient 做了什么,其主要是在接收到OnMessge的时候调用websocketDataHandler 进行消息处理。


    image.png

    websocketDataHandler 里面再注册了各种数据的处理器,如图 包括plugin selector rule app_auth meta 这几种数据的更新。


    image.png
    我们看handle 是一个default方法,主要是调用各个实现类的refresh方法
    handle

    我们更新selector主要是做以下操作,更新内存数据,然后通知各个插件数据更新。


    image.png

    1.4.4 总结

    在这期的学习过程中,我学到很一些Spring的新特性如 ObjectProvider,还知道了在SpringBoot中如何写一个WebSocket的客户端。

    相关文章

      网友评论

          本文标题:[Soul 源码之旅] 1.4 Soul数据同步

          本文链接:https://www.haomeiwen.com/subject/chwxzktx.html