美文网首页
Soul源码阅读 数据同步之Zookeeper同步【第八天】

Soul源码阅读 数据同步之Zookeeper同步【第八天】

作者: cutieagain | 来源:发表于2021-01-23 01:06 被阅读0次

    ZookeeperSyncDataConfiguration

        @Bean
        // Zookeeper同步的bean初始化
        public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                               final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
            log.info("you use zookeeper sync soul data.......");
            return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                    metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
        }
    

    ZookeeperSyncDataService,订阅变更数据

        // 监听所有的数据:插件,选择器,规则
        private void watcherData() {
            // 插件顶层目录
            final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
            // 获取当前目录下的子列表【都是插件】
            List<String> pluginZKs = zkClientGetChildren(pluginParent);
            for (String pluginName : pluginZKs) {
                // 遍历监听所有的插件变动
                watcherAll(pluginName);
            }
            zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
                if (CollectionUtils.isNotEmpty(currentChildren)) {
                    for (String pluginName : currentChildren) {
                        // 监听子插件变动
                        watcherAll(pluginName);
                    }
                }
            });
        }
    

    中途还发现一个监听错误

        // 监听元数据
        private void watchMetaData() {
            final String metaDataPath = ZkPathConstants.META_DATA;
            List<String> childrenList = zkClientGetChildren(metaDataPath);
            if (CollectionUtils.isNotEmpty(childrenList)) {
                childrenList.forEach(children -> {
                    String realPath = buildRealPath(metaDataPath, children);
                    cacheMetaData(zkClient.readData(realPath));
                    subscribeMetaDataChanges(realPath);
                });
            }
            // 监听类型错误 cutie 20200123
            subscribeChildChanges(ConfigGroupEnum.APP_AUTH, metaDataPath, childrenList);
        }
    

    admin模块中的需要发布变动的地方都使用了ApplicationEventPublisher发布消息
    DataChangedEventDispatcher实现了ApplicationListener来接收变动信息

    @Component
    public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
    
        private ApplicationContext applicationContext;
    
        private List<DataChangedListener> listeners;
    
        public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void onApplicationEvent(final DataChangedEvent event) {
            for (DataChangedListener listener : listeners) {
                switch (event.getGroupKey()) {
                    case APP_AUTH:
                        listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                        break;
                    case PLUGIN:
                        listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                        break;
                    case RULE:
                        listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                        break;
                    case SELECTOR:
                        listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                        break;
                    case META_DATA:
                        listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                        break;
                    default:
                        throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
                }
            }
        }
    
        @Override
        public void afterPropertiesSet() {
            Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
            this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
        }
    
    }
    

    里面的监听器针对不同的groupKey进行对应的监听操作。所有的数据监听都实现了DataChangedListener接口,包括ZooKeeper使用的ZookeeperDataChangedListener
    Zookeeper节点的变动都在这里进行操作,其他zk客户端监听zk的变动之后就会进行数据同步,以插件变动为例:

    @Override
        public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
            for (PluginData data : changed) {
                final String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
                // delete
                if (eventType == DataEventTypeEnum.DELETE) {
                    deleteZkPathRecursive(pluginPath);
                    final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
                    deleteZkPathRecursive(selectorParentPath);
                    final String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
                    deleteZkPathRecursive(ruleParentPath);
                    continue;
                }
                //create or update
                upsertZkNode(pluginPath, data);
            }
        }
    

    相关文章

      网友评论

          本文标题:Soul源码阅读 数据同步之Zookeeper同步【第八天】

          本文链接:https://www.haomeiwen.com/subject/yjhyzktx.html