美文网首页Dubbo
Dubbo源码分析----暴露服务

Dubbo源码分析----暴露服务

作者: _六道木 | 来源:发表于2018-02-19 16:11 被阅读70次

    暴露服务的过程中,会涉及到两个Protocol

    1. DubboProtocol主要是做网络通信相关初始化
    2. RegistryProtocol主要是做zk的注册和订阅相关

    在提供一个服务的时候,需要在配置文件里声明如下xml

    <dubbo:service....
    

        然后Spring会根据对应关系执行对应的BeanDefinitionParser,然后实例化对应的类,提供一个服务的时候会实例化ServiceBean(具体对应关系看DubboNamespaceHandler类;spring解析自定义标签可以看下spring源码关于标签的处理,这里就不说了)

        ServiceBean实现了InitializingBean和ApplicationContextAware接口,所以会执行afterPropertiesSet和onApplicationEvent方法,这里就是入口,然后就会执行export方法暴露服务

        一路跟下去,都是设置一下属性值,然后到了doExportUrls方法便开始主要的逻辑

        private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);//获取注册中心的url
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    

        在Zookeeper为注册中心的情况下,registryURLs值如下
    [registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=test&dubbo=2.0.0&owner=william&pid=4444&registry=zookeeper&timestamp=1488886235790]

        进入doExportUrlsFor1Protocol方法,看下暴露服务的主要逻辑

            if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
    
                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
                //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    if (registryURLs != null && registryURLs.size() > 0
                            && url.getParameter("register", true)) {
                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
                    } else {
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                }
            }
    

    1.本地暴露

        会先使用exportLocal暴露本地服务

        private void exportLocal(URL url) {
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)
                        .setHost(NetUtils.LOCALHOST)
                        .setPort(0);
                Exporter<?> exporter = protocol.export(
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
                exporters.add(exporter);
                logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
            }
        }
    

        看了Dubbo的扩展机制会知道,ProxyFactory默认会使用接口上@Spi注解声明的服务,为了容易理解,我把注解上的@SPI设置成jdk,那么就会使用jdk对应的实现类,即JdkProxyFactory
        JdkProxyFactory的getInvoker方法如下:

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName, 
                                          Class<?>[] parameterTypes, 
                                          Object[] arguments) throws Throwable {
                    Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                    return method.invoke(proxy, arguments);
                }
            };
        }
    

        返回一个AbstractProxyInvoker类的对象,这个AbstractProxyInvoker主要是接收消费方的请求后,执行本地方法的一个Invoker,其中是使用了反射机制来调用了本地方法
        获取到Invoker之后,需要使用Protocol的export来暴露这个服务,在讲Dubbo扩展机制的时候,Protocol外面有两个装饰类,那么export会先调用ProtocolListenerWrapper

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                    Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                            .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
        }
    

        这时的url为
    injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=4444&side=provider&timestamp=1488887324869
        对象为

    image.png
        那么if条件不满足,将调用ProtocolFilterWrapper的export方法
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
    

        If条件还是不满足,执行下面的代码
        注意:这里的protocol就是InjvmProtocol了

        然后看下buildInvokerChain方法,这个方法建立了一个个的filter,使用了责任链模式,一个普通的Invoker调用也会经历这些filter,每个filter都有自己特殊的功能

        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
                        public void destroy() {
                            invoker.destroy();
                        }
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    

        这里的group是provider,key是service.filter
        看下getActivateExtension方法实现

        public List<T> getActivateExtension(URL url, String key, String group) {
            String value = url.getParameter(key);
            return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
        }
    
        public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList<T>();
            List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
            if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
                getExtensionClasses();
                for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                    String name = entry.getKey();
                    Activate activate = entry.getValue();
                    if (isMatchGroup(group, activate.group())) {
                        T ext = getExtension(name);
                        if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                                && isActive(activate, url)) {
                            exts.add(ext);
                        }
                    }
                }
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            List<T> usrs = new ArrayList<T>();
            for (int i = 0; i < names.size(); i ++) {
                String name = names.get(i);
                if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                    if (Constants.DEFAULT_KEY.equals(name)) {
                        if (usrs.size() > 0) {
                            exts.addAll(0, usrs);
                            usrs.clear();
                        }
                    } else {
                        T ext = getExtension(name);
                        usrs.add(ext);
                    }
                }
            }
            if (usrs.size() > 0) {
                exts.addAll(usrs);
            }
            return exts;
        }
    

        由于url中没有service.filter的key,所以values为[]
        cachedActivates记录了接口实现中带有Activate注解的类,需要筛选出group为provider的实现类,最后进行排序


    filter.png

        过滤器调用的顺序和上图的顺序一样

        最后会调用InjvmProtocol的export方法,将invoker封装成InjvmExporter返回,得到Exporter之后,放到List里

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
        }
    

    2.远程暴露

    回到doExportUrlsFor1Protocol方法,暴露本地服务之后,根据注册中心的地址暴露远程服务

        for (URL registryURL : registryURLs) {
            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
            URL monitorUrl = loadMonitor(registryURL);
            if (monitorUrl != null) {
                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
            }
            if (logger.isInfoEnabled()) {
                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
            }
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
            Exporter<?> exporter = protocol.export(invoker);
            exporters.add(exporter);
        }
    

    获取Invoker的方式和之前一样
    而实际调用的是哪个Protocol对象呢?根据Invoker中protocol属性的值(值为registry)和Dubbo扩展机制可以知道调用的是RegistryProtocol

        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
            //registry provider
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            registry.register(registedProviderUrl);
            // 订阅override数据
            // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
            //保证每次export都返回一个新的exporter实例
            return new Exporter<T>() {
                public Invoker<T> getInvoker() {
                    return exporter.getInvoker();
                }
                public void unexport() {
                    try {
                        exporter.unexport();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                    try {
                        registry.unregister(registedProviderUrl);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                    try {
                        overrideListeners.remove(overrideSubscribeUrl);
                        registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            };
        }
    

    2.1 doLocalExport

    先看下doLocalExport方法

        private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return (ExporterChangeableWrapper<T>) exporter;
        }
    

    originInvoker的url值为:
    registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=test&dubbo=2.0.0&export=dubbo%3A%2F%2F10.1.87.36%3A20888%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dtest%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3Dwilliam%26pid%3D14204%26side%3Dprovider%26threads%3D1%26timestamp%3D1518580683573&owner=william&pid=14204&registry=zookeeper&timestamp=1518580683551
    url是代表注册中心相关信息,getCacheKey是获取url中的provider属性,即provider的url:
    dubbo://10.1.87.36:20888/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=14204&side=provider&threads=1&timestamp=1518580683573
    第一次进来的时候,bounds中该key对应的值为空,所以根据provider的url和originInvoker封装成新的Invoker,此时Invoker的url就是provider的url,其中protocol的值为dubbo,那么将调用DubboProtocol的export方法,一开始已经介绍,DubboProtocol主要是做网络通信相关初始化
    DubboProtocol的export方法如下:

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
            
            Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice){
                String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                    if (logger.isWarnEnabled()){
                        logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
                } else {
                    stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
                }
            }
    
            openServer(url);
            
            return exporter;
        }
    

    key=全类名+dubbo监听端口号
    然后将Invoker封装成Export,然后放到map中
    所以看下来,Export和invoker是一对一的关系
    exporterMap主要是将服务名和export进行关联,看到这里其实已经差不多结束了,Dubbo接收到consumer的请求时,会在exporterMap中找到对应的exporter,然后找到对应的Invoker,有了invoker就可以调用本地的服务

    2.2 getRegistry

    再回到RegistryProtocol的exprot方法中,接下来就是执行getRegistry(originInvoker)这句代码了,主要是做注册中心的初始化

        /**
         * 根据invoker的地址获取registry实例
         * @param originInvoker
         * @return
         */
        private Registry getRegistry(final Invoker<?> originInvoker){
            URL registryUrl = originInvoker.getUrl();
            if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
                String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
                registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
            }
            return registryFactory.getRegistry(registryUrl);
        }
    

    在上面我们看到invoker的url中,registry属性是zookeeper,根据spi机制,该RegistryFactory为ZookeeperRegistryFactory,getRegistry会调用到父类AbstractRegistryFactory的方法

        public Registry getRegistry(URL url) {
            url = url.setPath(RegistryService.class.getName())
                    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
            String key = url.toServiceString();
            // 锁定注册中心获取过程,保证注册中心单一实例
            LOCK.lock();
            try {
                Registry registry = REGISTRIES.get(key);
                if (registry != null) {
                    return registry;
                }
                registry = createRegistry(url);
                if (registry == null) {
                    throw new IllegalStateException("Can not create registry " + url);
                }
                REGISTRIES.put(key, registry);
                return registry;
            } finally {
                // 释放锁
                LOCK.unlock();
            }
        }
    

    createRegistry在父类中没有实现,调用到ZookeeperRegistryFactory中

        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    

    先看下Registry的继承结构


    image.png

    可以看到ZookeeperRegistry有两个父类,所以ZookeeperRegistry会先调用父类的构造方法,这个过程主要做了几件事:

    1. 加载服务缓存文件(AbstractRegistry)
    2. 异步(默认)更新缓存文件(AbstractRegistry)
    3. 定时重试失败的动作:注册失败,取消注册失败,订阅失败,取消订阅失败,通知失败(FailbackRegistry)
    4. 初始化zk通信相关(ZookeeperRegistry)
    加载服务缓存文件:
        public AbstractRegistry(URL url) {
            setUrl(url);
            // 启动文件保存定时器
            syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
            String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
                    if(! file.getParentFile().mkdirs()){
                        throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            loadProperties();
            notify(url.getBackupUrls());
        }
    

    loadProperties主要是将dubbo的服务缓存文件加载进来,转换成Properties对象
    Key=服务名 Value=url

    更新缓存文件:

    AbstractRegistry类中还有一个doSaveProperties方法,主要用来更新缓存文件,在更新前会先把本地文件的内容先更新到properties对象中,然后再进行更新操作

        public void doSaveProperties(long version) {
            if(version < lastCacheChanged.get()){
                return;
            }
            if (file == null) {
                return;
            }
            Properties newProperties = new Properties();
            // 保存之前先读取一遍,防止多个注册中心之间冲突
            InputStream in = null;
            try {
                if (file.exists()) {
                    in = new FileInputStream(file);
                    newProperties.load(in);
                }
            } catch (Throwable e) {
            } finally {
                ....
            }     
         // 保存
            try {
                newProperties.putAll(properties);
                File lockfile = new File(file.getAbsolutePath() + ".lock");
                if (!lockfile.exists()) {
                    lockfile.createNewFile();
                }
                RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
                try {
                    FileChannel channel = raf.getChannel();
                    try {
                        FileLock lock = channel.tryLock();
                        if (lock == null) {
                            throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                        }
                        // 保存
                        try {
                            if (! file.exists()) {
                                file.createNewFile();
                            }
                            FileOutputStream outputFile = new FileOutputStream(file);  
                            try {
                                newProperties.store(outputFile, "Dubbo Registry Cache");
                            } finally {
                                outputFile.close();
                            }
                        } finally {
                            lock.release();
                        }
                    } finally {
                        channel.close();
                    }
                } finally {,
                }
            } catch (Throwable e) {
                if (version < lastCacheChanged.get()) {
                    return;
                } else {
                    registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
                }
            }
        }
    

    更新的时候会对.lock文件进行加锁,这样其他线程/进程就无法修改缓存文件,如果tryLock返回空,证明该文件正在被修改,那么增加版本号进行重试
    doSaveProperties方法开头的时候有句代码:

    if(version < lastCacheChanged.get()){return;}
    

    可以看到保存的时候有版本号的判断,lastCacheChanged这个是在saveProperties方法中增加的,如果当条件满足,那么证明在这个线程执行saveProperties方法之后还没执行doSaveProperties操作的时候,又有线程执行了saveProperties操作,把version+1了,那么当前线程直接return,让后续线程继续

    定时重试失败的动作

    构造方法里起了一个定时器,定时进行重试操作,代码简化如下:

        // 重试失败的动作
        protected void retry() {
            if (! failedRegistered.isEmpty()) {
                Set<URL> failed = new HashSet<URL>(failedRegistered);
                for (URL url : failed) {
                        doRegister(url);
                        failedRegistered.remove(url);
                }
            }
            if (! failedUnregistered.isEmpty()) {
                Set<URL> failed = new HashSet<URL>(failedUnregistered);
                if (logger.isInfoEnabled()) {
                    logger.info("Retry unregister " + failed);
                }
                for (URL url : failed) {
                        doUnregister(url);
                        failedUnregistered.remove(url);
                }
            }
            if (! failedSubscribed.isEmpty()) {
                Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        doSubscribe(url, listener);
                        listeners.remove(listener);
                    }
                }
            }
            if (! failedUnsubscribed.isEmpty()) {
                Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doUnsubscribe(url, listener);
                            listeners.remove(listener);
                        } catch (Throwable t) { // 忽略所有异常,等待下次重试
                            logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                }
            }
            if (! failedNotified.isEmpty()) {
                Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
                for (Map<NotifyListener, List<URL>> values : failed.values()) {
                    for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                        NotifyListener listener = entry.getKey();
                        List<URL> urls = entry.getValue();
                        listener.notify(urls);
                        values.remove(listener);
                    }
                }
            }
        }
    

    从对面失败的集合从取数据执行相应的重试

    初始化zk通信相关

    这里会连接zk,然后增加监听器

    2.3 register

    再回到RegistryProtocol的export方法,registry初始化完成之后,接下来会调用register方法,仍然是先调用父类的方法
    AbstractRegistry.register:将url加入到registered集合中
    FailbackRegistry.register:

        @Override
        public void register(URL url) {
            super.register(url);
            failedRegistered.remove(url);
            failedUnregistered.remove(url);
            try {
                // 向服务器端发送注册请求
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // 如果开启了启动时检测,则直接抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
                // 将失败的注册请求记录到失败列表,定时重试
                failedRegistered.add(url);
            }
        }
    

    从代码上看到,可以分为几步

    1. failedRegistered和failedUnregistered中移除该url
    2. 发送注册请求

    可以看到注册会失败,如果设置了check为false,那么放到重试队列中重试

    doRegister方法是在ZookeeperRegistry中实现的,主要是在zk上创建节点,创建逻辑如下:

        public void create(String path, boolean ephemeral) {
            int i = path.lastIndexOf('/');
            if (i > 0) {
                create(path.substring(0, i), false);
            }
            if (ephemeral) {
                createEphemeral(path);
            } else {
                createPersistent(path);
            }
        }
    

    Path格式为:
    /dubbo/服务名/providers(可能是configurators或者router节点)/url
    该方法会递归把每个父节点都创建完毕,可以看到除了dubbo这个节点是持久化节点,其他都是临时节点,那么当服务与zk断开连接一段时间后,zk会删除该节点,服务消费方就会得到通知,知道该提供者下线,做相应操作

    2.4 subscribe

    接下来看订阅方法subscribe,同理会先调用父类的方法
    AbstractRegistry.subscribe:将listener保存到subscribed中
    FailbackRegistry.subscribe:

        @Override
        public void subscribe(URL url, NotifyListener listener) {
            super.subscribe(url, listener);
            removeFailedSubscribed(url, listener);
            try {
                // 向服务器端发送订阅请求
                doSubscribe(url, listener);
            } catch (Exception e) {
                Throwable t = e;
    
                List<URL> urls = getCacheUrls(url);
                if (urls != null && urls.size() > 0) {
                    notify(url, listener, urls);
                    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
                } else {
                    // 如果开启了启动时检测,则直接抛出异常
                    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                            && url.getParameter(Constants.CHECK_KEY, true);
                    boolean skipFailback = t instanceof SkipFailbackWrapperException;
                    if (check || skipFailback) {
                        if(skipFailback) {
                            t = t.getCause();
                        }
                        throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                    } else {
                        logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                    }
                }
    
                // 将失败的订阅请求记录到失败列表,定时重试
                addFailedSubscribed(url, listener);
            }
        }
    

    和注册类似,区别是订阅失败的时候,首先会调用getCacheUrls获取url,这个方法就是从缓存的properties对象里获取服务的url,如果没有数据才执行和注册一样的操作,有则调用notify方法,这个后面会说到

    看下核心的ZookeeperRegistry的doSubscribe方法

        List<URL> urls = new ArrayList<URL>();
        for (String path : toCategoriesPath(url)) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                    }
                });
                zkListener = listeners.get(listener);
            }
            zkClient.create(path, false);
            List<String> children = zkClient.addChildListener(path, zkListener);
            if (children != null) {
                urls.addAll(toUrlsWithEmpty(url, path, children));
            }
        }
        notify(url, listener, urls);
    

    toCategoriesPath方法是从url中获取category属性的值,转换成zk上路径的形式,由于provider只有一个configurators,所以path如下
    /dubbo/服务名/configurators,接下来就是为该节点添加监听回调,然后返回子节点
    如果子节点为空,那么toUrlsWithEmpty方法会返回empty://....格式的url,即protocol为empty,这个协议后面会用到
    接下来notify一路会调用到AbstractRegistry的notify方法

        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
           ....
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                saveProperties(url);
                listener.notify(categoryList);
            }
        }
    

    主要看下最下面的循环,即遍历每个类目下的url,执行一遍saveProperties方法(把服务信息保存到文件),调用监听器的notify方法,listener最初初始化为OverrideListener

           public void notify(List<URL> urls) {
                List<URL> result = null;
                for (URL url : urls) {
                    URL overrideUrl = url;
                    if (url.getParameter(Constants.CATEGORY_KEY) == null
                            && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
                        // 兼容旧版本
                        overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
                    }
                    if (! UrlUtils.isMatch(subscribeUrl, overrideUrl)) {
                        if (result == null) {
                            result = new ArrayList<URL>(urls);
                        }
                        result.remove(url);
                    }
                }
                if (result != null) {
                    urls = result;
                }
                this.configurators = RegistryDirectory.toConfigurators(urls);
                List<ExporterChangeableWrapper<?>> exporters = new ArrayList<ExporterChangeableWrapper<?>>(bounds.values());
                for (ExporterChangeableWrapper<?> exporter : exporters){
                    Invoker<?> invoker = exporter.getOriginInvoker();
                    final Invoker<?> originInvoker ;
                    if (invoker instanceof InvokerDelegete){
                        originInvoker = ((InvokerDelegete<?>)invoker).getInvoker();
                    }else {
                        originInvoker = invoker;
                    }
                    
                    URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker);
                    URL newUrl = getNewInvokerUrl(originUrl, urls);
                    
                    if (! originUrl.equals(newUrl)){
                        RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
                    }
                }
            }
    

    主要看下最下面的循环,会比对invoker的url是新的url是否是一样,如果不一样,那么更新invoker的url

    相关文章

      网友评论

        本文标题:Dubbo源码分析----暴露服务

        本文链接:https://www.haomeiwen.com/subject/wgwitftx.html