美文网首页dubbo
Dubbo ZookeeperRegistry分析

Dubbo ZookeeperRegistry分析

作者: 晴天哥_王志 | 来源:发表于2019-10-20 18:07 被阅读0次

    开篇

     这篇文章的目的是在于梳理Dubbo ZookeeperRegistry的注册流程,通过这个流程的分析能够延伸到更多的注册中心。

     核心的关注点在于Registry和RegistryFactory对象,RegistryFactory负责动态创建Registry对象,Registry对象负责执行注册中心的注册。

      ServiceConfig类的Protocol的动态代理根据类型返回Protocol对象,注册中心的Protocol对象是RegistryProtocol。

    源码分析过程

    public class ServiceConfig<T> extends AbstractServiceConfig {
        // protocol是Protocol$Adaptive对象
        private static final Protocol protocol = 
          ExtensionLoader.getExtensionLoader(Protocol.class)
         .getAdaptiveExtension();
    
    
        private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
            
            String scope = url.getParameter(SCOPE_KEY);
    
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            // 执行Protocol$Adaptive的export()方法
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                    } else {
    
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 执行Protocol$Adaptive的export()方法
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
            }
        }
    }
    
    • ServiceConfig的doExportUrlsFor1Protocol()方法执行export()方法。
    • protocol.export()中的protocol指的是Protocol$Adaptive对象。
    • 继续阅读Protocol$Adaptive的export()方法。
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    
        public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
            
            if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
            //  这里的url内容是registry:xxx,如下面的
            org.apache.dubbo.common.URL url = arg0.getUrl();
            // extName指的是url的协议名,这里是取的registry
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            // 返回RegistryProtocol对象
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            
            return extension.export(arg0);
        }   
    }
    
    
    
    
    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
    application=dubbo-demo-api-provider&dubbo=2.0.2
    &export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
    anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
    &bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
    &interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=24212
    &release=&side=provider&timestamp=1571482218716&pid=24212
    &registry=zookeeper&timestamp=1571482215899
    
    • Protocol$Adaptive的export()方法内部根据URL获取扩展名,url的协议名是registry,返回的扩展是RegistryProtocol对象。
    • 继续阅读RegistryProtocol的export()方法。
    public class RegistryProtocol implements Protocol {
    
        private Cluster cluster;
        private Protocol protocol;
        // registryFactory是RegistryFactory$Adaptive对象
        private RegistryFactory registryFactory;
        private ProxyFactory proxyFactory;
    
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            URL registryUrl = getRegistryUrl(originInvoker);
            URL providerUrl = getProviderUrl(originInvoker);
    
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    
            providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
    
            // url to registry
            final Registry registry = getRegistry(originInvoker);
            final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
            ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                    registryUrl, registeredProviderUrl);
            //to judge if we need to delay publish
            boolean register = providerUrl.getParameter(REGISTER_KEY, true);
            if (register) {
                register(registryUrl, registeredProviderUrl);
                providerInvokerWrapper.setReg(true);
            }
    
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    
            exporter.setRegisterUrl(registeredProviderUrl);
            exporter.setSubscribeUrl(overrideSubscribeUrl);
    
            return new DestroyableExporter<>(exporter);
        }
    
    
    
        private Registry getRegistry(final Invoker<?> originInvoker) {
            URL registryUrl = getRegistryUrl(originInvoker);
    
            return registryFactory.getRegistry(registryUrl);
        }
    
    
        public void register(URL registryUrl, URL registeredProviderUrl) {
            Registry registry = registryFactory.getRegistry(registryUrl);
            registry.register(registeredProviderUrl);
        }
    }
    
    
    
    public class RegistryFactory$Adaptive implements RegistryFactory {
    
        public Registry getRegistry(URL uRL) {
            String string;
            if (uRL == null) {
                throw new IllegalArgumentException("url == null");
            }
    
            URL uRL2 = uRL;
            String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
            if (string == null) {
                throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (").append(uRL2.toString()).append(") use keys([protocol])").toString());
            }
    
            RegistryFactory registryFactory = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
            
            return registryFactory.getRegistry(uRL);
        }
    }
    
    providerUrl的例子:
    dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
    anyhost=true&application=dubbo-demo-api-provider&deprecated=false&dubbo=2.0.2
    &dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService
    &methods=sayHello&pid=25662&release=&side=provider&timestamp=1571485618358
    
    registryUrl的例子:
    zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
    application=dubbo-demo-api-provider&dubbo=2.0.2
    &export=dubbo://192.168.1.5:20880/org.apache.dubbo.demo.DemoService?
    anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.1.5
    &bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
    &interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=31349
    &release=&side=provider&timestamp=1571498589827&pid=31349&timestamp=1571498589815
    
    • RegistryProtocol的export()方法内部执行核心三个步骤,获取Reigstry对象、执行register()、执行subscribe(),provider同时订阅configurators的节点
    • providerUrl和registryUrl的内容例子如上图。
    • RegistryProtocol的getRegistry()方法通过registryFactory.getRegistry()返回ZookeeperRegistry对象。
    • registryFactory对象是RegistryFactory$Adaptive对象。
    • RegistryFactory$Adaptive的getRegistry()的URL参数协议是zookeeper,通过扩展返回的是ZookeeperRegistryFactory对象。
    • registryFactory.getRegistry(uRL)执行ZookeeperRegistryFactory.getRegistry()返回ZookeeperRegistry对象。
    • RegistryProtocol获取Reigstry对象是ZookeeperRegistry,执行ZookeeperRegistry的register()和subscribe()方法。
    • ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string)中string是"zookeeper",返回的ZookeeperRegistryFactory对象。
    public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
    
        private ZookeeperTransporter zookeeperTransporter;
    
        public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
            this.zookeeperTransporter = zookeeperTransporter;
        }
    
        @Override
        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    }
    
    
    public abstract class AbstractRegistryFactory implements RegistryFactory {
    
        public Registry getRegistry(URL url) {
            url = URLBuilder.from(url)
                    .setPath(RegistryService.class.getName())
                    .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(EXPORT_KEY, REFER_KEY)
                    .build();
            String key = url.toServiceStringWithoutResolving();
            // Lock the registry access process to ensure a single instance of the registry
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                //create registry by spi/ioc
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // Release the lock
                LOCK.unlock();
            }
        }
    }
    
    • ZookeeperRegistryFactory作为AbstractRegistryFactory的子类,父类AbstractRegistryFactory的getRegistry会调用子类的createRegistry()方法返回ZookeeperRegistry对象。
    • 执行ZookeeperRegistry的register()和subscribe()方法。
    public class ZookeeperRegistry extends FailbackRegistry {
    
        private final static String DEFAULT_ROOT = "dubbo";
        private final String root;
        private final Set<String> anyServices = new ConcurrentHashSet<>();
        private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
        private final ZookeeperClient zkClient;
    
        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(PATH_SEPARATOR)) {
                group = PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);
            zkClient.addStateListener(state -> {
                if (state == StateListener.RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            });
        }
    
        @Override
        public void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    
        public void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                            for (String child : currentChilds) {
                                child = URL.decode(child);
                                if (!anyServices.contains(child)) {
                                    anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(root, false);
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (CollectionUtils.isNotEmpty(services)) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    List<URL> urls = new ArrayList<>();
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                            zkListener = listeners.get(listener);
                        }
                        zkClient.create(path, false);
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    }
    
    
    
    public abstract class FailbackRegistry extends AbstractRegistry {
    
        public void register(URL url) {
            super.register(url);
            removeFailedRegistered(url);
            removeFailedUnregistered(url);
            try {
                doRegister(url);
            } catch (Exception e) {
            }
        }
    
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // Sending a subscription request to the server side
                doSubscribe(url, listener);
            } catch (Exception e) {
               
            }
        }
    }
    
    • ZookeeperRegistry是FailbackRegistry的子类,父类FailbackRegistry统一的register()和subscribe()入口,具体的实现在子类ZookeeperRegistry。
    • 子类ZookeeperRegistry的doRegister()和doSubscribe()是执行具体的注册和订阅工作。
    • ZookeeperRegistry的doRegister()核心是在zookeeper节点上创建zk临时节点。
    • ZookeeperRegistry的doSubscribe()过于复杂,在consumer端进行分析。

    Registry类图

    Registry
    RegistryFactory

    相关文章

      网友评论

        本文标题:Dubbo ZookeeperRegistry分析

        本文链接:https://www.haomeiwen.com/subject/rmmnmctx.html