美文网首页
dubbo源码2-注册中心

dubbo源码2-注册中心

作者: modou1618 | 来源:发表于2019-01-01 20:35 被阅读0次

    一 注册中心

    dubbo注册中心主要是保存和管理服务的provider信息和consumer信息。
    dubbo提供的注册中心有zookeeper,multicast,redis等。
    

    下面主要介绍zookeeper注册中心,类继承关系如下。


    image.png

    1.1 AbstractRegistry

    1.1.1 注册,订阅数据缓存

    • 在register(),unregister()方法调用时,缓存相应数据到ConcurrentHashSet<URL>中
    • 在subscribe(),unsubscribe()方法调用时,缓存相应数据到ConcurrentMap<URL, Set<NotifyListener>>中
    • 缓存数据用于连接抖动断开重连时恢复zk上的相关配置。

    1.1.2 订阅结果处理

    • 通知notify()
      变更的url和订阅的url只有isMatch()方法匹配才会调用回调listener函数。主要是group和version。其中consumer端的配置支持*为匹配任意的provider的配置。
    public static boolean isMatch(URL consumerUrl, URL providerUrl) {
            String consumerInterface = consumerUrl.getServiceInterface();
            String providerInterface = providerUrl.getServiceInterface();
            if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface)))
                return false;
    
            if (!isMatchCategory(providerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY),
                    consumerUrl.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY))) {
                return false;
            }
            if (!providerUrl.getParameter(Constants.ENABLED_KEY, true)
                    && !Constants.ANY_VALUE.equals(consumerUrl.getParameter(Constants.ENABLED_KEY))) {
                return false;
            }
    
            String consumerGroup = consumerUrl.getParameter(Constants.GROUP_KEY);
            String consumerVersion = consumerUrl.getParameter(Constants.VERSION_KEY);
            String consumerClassifier = consumerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
    
            String providerGroup = providerUrl.getParameter(Constants.GROUP_KEY);
            String providerVersion = providerUrl.getParameter(Constants.VERSION_KEY);
            String providerClassifier = providerUrl.getParameter(Constants.CLASSIFIER_KEY, Constants.ANY_VALUE);
            return (Constants.ANY_VALUE.equals(consumerGroup) || StringUtils.isEquals(consumerGroup, providerGroup) || StringUtils.isContains(consumerGroup, providerGroup))
                    && (Constants.ANY_VALUE.equals(consumerVersion) || StringUtils.isEquals(consumerVersion, providerVersion))
                    && (consumerClassifier == null || Constants.ANY_VALUE.equals(consumerClassifier) || StringUtils.isEquals(consumerClassifier, providerClassifier));
        }
    
    • 内存缓存
      变更的url信息,按照category属性分类。
      category分为:providers(服务提供者),consumers(服务使用者),routers(服务路由),configurators(动态配置)。
      缓存到ConcurrentMap<URL, Map<String/分类/, List<URL>>>中
    • 文件缓存,
      key为save.file的值表示是否同步执行本地文件缓存
      key为file的值表示本地文件路径,为空则不做本地文件缓存
      通过FileLock锁避免对缓存文件的并发修改
    File lockfile = new File(file.getAbsolutePath() + ".lock");
                if (!lockfile.exists()) {
                    lockfile.createNewFile();
                }
                RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
                try {
                    FileChannel channel = raf.getChannel();
                    try {
                        FileLock lock = channel.tryLock();
                        if (lock == null) {
                            throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                        }
                        // Save
                        try {
                            if (!file.exists()) {
                                file.createNewFile();
                            }
                            FileOutputStream outputFile = new FileOutputStream(file);
                            try {
                                properties.store(outputFile, "Dubbo Registry Cache");
                            } finally {
                                outputFile.close();
                            }
                        } finally {
                            lock.release();
                        }
                    } finally {
                        channel.close();
                    }
                } finally {
                    raf.close();
                }
    
    • 查询
      可通过lookup()方法查询目标节点的变更数据记录

    1.2 FailbackRegistry

    在注册,注销,订阅,解订阅的调用接口中,缓存失败的相关信息。
    异步线程5s间隔周期重试缓存的失败操作
    

    1.3 ZookeeperRegistry

    • 屏蔽不同zkclient的库,提供统一的接口
    • 连接zookeeper,并监听连接状态,调用AbstractRegistry.recover()在重连成功后恢复zookeeper上的注册和订阅等信息。
    • 订阅 doSubscribe()


      image.png

    二 注册流程

    在<dubbo:reference/>和<dubbo:service/>的配置解析时,根据URL协议信息调用Protocol接口的实现类方法。默认使用dubbo协议。
    Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()

    调用流程如下,接口信息的注册是在RegistryProtocol类中完成。


    Protocol调用流程.png

    2.1 URL组装

    • AbstractInterfaceConfig.loadRegistries()根据registry和application配置信息,对每个注册中心地址创建一个URL。
    • 在registry URL的refer参数存储<dubbo:reference/>接口配置
    • 在registry URL的export参数存储<dubbo:service/>接口配置

    2.2 <dubbo:reference/>之refer()

    2.2.1 refer

    • 工厂类获取zookeeperRegistry
    • 接口是注册中心接口,则返回接口代理
    • group配置为匹配多个的,则使用MergeableCluster,支持合并多个provider返回结果。
    • cluster通过依赖注入,默认为failoverCluster,支持失败重试。详情后续在cluster层源码解析时介绍。
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
            Registry registry = registryFactory.getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
            String group = qs.get(Constants.GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                        || "*".equals(group)) {
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            return doRefer(cluster, registry, type, url);
        }
    

    2.2.2 doRefer

    • 构建RegistryDirectory,用于存储接口的路由,动态配置,provider信息。
    • 注册url为接口consumers的子节点
    • 订阅接口providers,routers,overriders节点的子节点变更,在RegistryDirectory中做相应处理。
    • 返回clusterInvoker,配置解析层在invoker基础上创建代理层,交给spring管理
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            Invoker invoker = cluster.join(directory);
            ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    

    2.2.3 RegistryDirectory

    接口的几个category子节点信息变更时,触发notify回调,按照url的category配置分别处理。

    • URL overrideDirectoryUrl存储动态配置信息。

      1. 合并实际的接口url配置即为实际使用的配置。
    • List<Router> routers 存储接口路由信息
      1.添加默认路由routers.add(new MockInvokersSelector()); 用于筛选mock协议的provider。
      2.更新methodInvokerMap时,根据路由配置,过滤不匹配的provider
      3.若router配置runtime为true,则每次consumer调用接口时,获取provider列表后,都需要调用该router过滤掉不匹配的provider。

    • refreshInvoker()更新缓存的provider信息
      1.Map<String/url/, Invoker<T>> urlInvokerMap 以url为key存储的provider信息
      2.Map<String, List<Invoker<T>>> methodInvokerMap urlInvokerMap 以方法名为key存储的provider信息。实际接口调用时,使用此处缓存的数据。
      3.删除下线的provider,对新增的provider url,调用协议层refer接口处理,主要是设置创建netty client,和provider建立连接。

    public synchronized void notify(List<URL> urls) {
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category)
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators
            if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            if (routerUrls != null && !routerUrls.isEmpty()) {
                List<Router> routers = toRouters(routerUrls);
                if (routers != null) { // null - do nothing
                    setRouters(routers);
                }
            }
            List<Configurator> localConfigurators = this.configurators; // local reference
            // merge override parameters
            this.overrideDirectoryUrl = directoryUrl;
            if (localConfigurators != null && !localConfigurators.isEmpty()) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // providers
            refreshInvoker(invokerUrls);
        }
    

    2.3 <dubbo:service/>之export()

    • doLocalExport()
      1 ConcurrentHashMap bounds缓存service接口信息,通过同步锁避免并发问题。
      2 调用dubbo协议接口,主要是创建本地的netty server。后续协议层解析时介绍详细内容。
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return exporter;
        }
    
    • 注册接口


      接口注册.png

    动态配置变更时,原接口url合并动态配置,调用doChangeLocalExport()更新对应的协议层的配置信息。

    相关文章

      网友评论

          本文标题:dubbo源码2-注册中心

          本文链接:https://www.haomeiwen.com/subject/bxcolqtx.html