美文网首页
Dubbo源码剖析之服务发布

Dubbo源码剖析之服务发布

作者: jerrik | 来源:发表于2018-07-15 22:32 被阅读0次

说明

由于代码篇幅的问题,我会对一些非核心代码进行省略,只保留真正核心代码.对一些有省略的部分,我会留下...字样,如果读者有兴趣可以自行去查看。

准备工作

  • IDEA
  • 从github下载dubbox源码
    笔者采用的是当当的dubbox分支,直接用源码调试会方便很多,Dubbo中adaptive机制重新编译了动态类,也方便实时输出,对一些核心方法也可以增加自定义输出日志,方便理解。
  • 本地启动zookeeper
    因为我们还是采用的zk作为Dubbo的注册中心,所以需要启动一个zk.
    下载zk,配置默认端口、数据、日志文件目录,然后点击zkServer.cmd即可。
  • 编写Dubbo服务发布代码
        // 服务实现
        IUserService userService = new UserServiceImpl();

        // 当前应用配置
        ApplicationConfig application = new ApplicationConfig();
        application.setName("public-user");

        // 连接注册中心配置
        RegistryConfig registry = new RegistryConfig();
        registry.setAddress("zookeeper://localhost:2181");

        // 服务提供者协议配置
        ProtocolConfig protocol = new ProtocolConfig();
        protocol.setName("dubbo");
        protocol.setPort(20880);
        protocol.setThreads(200);
        ServiceConfig<IUserService> service = new ServiceConfig<IUserService>();
        service.setApplication(application);
        service.setRegistry(registry); // 多个注册中心可以用setRegistries()
        service.setProtocol(protocol); // 多个协议可以用setProtocols()
        service.setInterface(IUserService.class);
        service.setRef(userService);
        service.setVersion("1.0.0");

        // 暴露及注册服务
        service.export();

        System.out.println("--------------------------服务启动--------------------------");

启动服务

右键Debug运行即可启动服务。闲话少说,我们直接进入ServiceConfig.export().

    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && ! export.booleanValue()) {
            return;
        }
        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();
        }
    }

重点关注delay,我们知道jvm里有一个预热的概念,服务启动时,jvm可能还没有达到最佳性能,所以Dubbo为我们提供了一个参数来设置延迟发布,我们可以通过 ServiceConfig.setDelay(int delay)去指定,当delay!=null & delay>0的时候,Dubbo这里单独起一个守护线程去做的延迟发布。继续跟进doExport()方法:

    protected synchronized void doExport() {
        ...
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        doExportUrls();
    }

前面省略了很多方法.其大体做了如下几件事情:

  1. 判断服务是否已经发布
  2. 对即将发布的服务进行校验(接口是否存在)
  3. 对Dubbo的核心配置对象ProviderConfig、ModuleConfig、ApplicationConfig、MonitorConfig进行非空判断。如果对象不为空,则获取其依赖的其他配置项.
       if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }

4.判断服务接口是否实现了GenericService泛化接口

那几个以 check开头的方法又是做什么的呢?看一下checkRegistry

       // 兼容旧版本
        if (registries == null || registries.size() == 0) {
            String address = ConfigUtils.getProperty("dubbo.registry.address");
            if (address != null && address.length() > 0) {
                registries = new ArrayList<RegistryConfig>();
                String[] as = address.split("\\s*[|]+\\s*");
                for (String a : as) {
                    RegistryConfig registryConfig = new RegistryConfig();
                    registryConfig.setAddress(a);
                    registries.add(registryConfig);
                }
            }
        }
        if ((registries == null || registries.size() == 0)) {
            throw new IllegalStateException((getClass().getSimpleName().startsWith("Reference") 
                    ? "No such any registry to refer service in consumer " 
                        : "No such any registry to export service in provider ")
                                                    + NetUtils.getLocalHost()
                                                    + " use dubbo version "
                                                    + Version.getVersion()
                                                    + ", Please add <dubbo:registry address=\"...\" /> to your spring config. If you want unregister, please set <dubbo:service registry=\"N/A\" />");
        }
        for (RegistryConfig registryConfig : registries) {
            appendProperties(registryConfig);
        }

原来是做非空判断,如果数据为空,则尝试着从配置文件中去获取,如果配置文件中依然不存在,则直接抛出异常。那下面的appendProperties()又是在做什么呢?

    protected static void appendProperties(AbstractConfig config) {
        String prefix = "dubbo." + getTagName(config.getClass()) + ".";
        Method[] methods = config.getClass().getMethods();
        for (Method method : methods) {
            try {
                String name = method.getName();
                //如果Config中的方法以set开头
                if (name.length() > 3 && name.startsWith("set") && Modifier.isPublic(method.getModifiers()) 
                        && method.getParameterTypes().length == 1 && isPrimitive(method.getParameterTypes()[0])) {
                    String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "-");

                    String value = null;
                    if (config.getId() != null && config.getId().length() > 0) {
                        //从系统环境变量里获取值
                        value = System.getProperty(pn);
                    }
                    ...
                    if (value == null || value.length() == 0) {
                        //通过反射拿getXX()方法,如果找不到,就获取isXX()方法
                        //如果能获取到get或者is方法,则尝试着去从AbstractConfig中获取值,如果获取不到,则从配置中获取
                        if (getter != null) {
                            if (getter.invoke(config, new Object[0]) == null) {
                                if (config.getId() != null && config.getId().length() > 0) {
                                    value = ConfigUtils.getProperty(prefix + config.getId() + "." + property);
                                }
                                 
                                //如果还是获取不到value,则修改指定的key,由原来的prefix+configId+property变成prefix+property的方式
                            }
                        }
                    }
                    //如果能够获取到value,则直接将value通过setXXX()方法设置到AbstractConfig中
                    if (value != null && value.length() > 0) {
                        method.invoke(config, new Object[] {convertPrimitive(method.getParameterTypes()[0], value)});
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }

可以总结成一句话:如果传入的AbstractConfig具有set开头的方法,则通过反射查看是否具有getis方法.如果存在该方法,则通过反射获取到返回值,如果返回值为空,则尝试着从配置文件中获取,如果能获取到最新值,则通过 set方法将属性进行更新。
接下来重点分析doExportUrls()方法:

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        logger.info("doExportUrls 【loadRegistries】 " + registryURLs);
        logger.info("doExportUrls 【protocols】 " + protocols);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

首先加载所以registryURL,由于现在是服务发布,true代表provider.
loadRegistries()做了如下几件事情:

  1. 轮询设置的注册中心列表(服务启动时已经设置)
    获取地址信息,如果配置文件中存在该配置,则将其替换。
  2. 将path设置成RegisterService,还设置version、pid、protocol等相关属性
  3. 然后将address转换成List<URL>
  4. 然后转换协议成RegistryProtocol,准备开始服务的注册

然后循环protocols,针对所有的协议都进行发布(这里只有dubbo协议),然后执行doExportUrlsFor1Protocol方法。
首先会获取主机名和端口,如果主机名无效,则尝试通过registerURLs中去获取,然后通过socket.connect()去连接该端口来检验主机名和端口是否有效,如果连接异常,则记录一个警告。

      if (NetUtils.isInvalidLocalHost(host)) {
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        try {
                            Socket socket = new Socket();
                            try {
                                SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
                                socket.connect(addr, 1000);
                                host = socket.getLocalAddress().getHostAddress();
                                break;
                            } finally {
                                try {
                                    socket.close();
                                } catch (Throwable e) {}
                            }
                        } catch (Exception e) {
                            logger.warn(e.getMessage(), e);
                        }
                    }
       }

如果没有设置端口,那就设置成发布协议的默认端口,如果port还是为空或者小于0,则随机生成一个端口,然后通过new ServerSocket()检验该端口是否可用。

        Integer port = protocolConfig.getPort();
        if (provider != null && (port == null || port == 0)) {
            port = provider.getPort();
        }
        final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
        if (port == null || port == 0) {
            port = defaultPort;
        }
        if (port == null || port <= 0) {
            port = getRandomPort(name);
            if (port == null || port < 0) {
                port = NetUtils.getAvailablePort(defaultPort);
                putRandomPort(name, port);
            }
            logger.warn("Use random available port(" + port + ") for protocol " + name);
        }

然后又设置协议相关的参数:pid、version、side、token、methods等,并调用appendParameters方法来拼装参数。最后又重新创建了一个 URL对象:

URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map)

该url是实现服务暴露的最终数据,从url中取出scope,如果scope不为none且不为remote,则进行本地暴露,如果scope不为local,则进行远程暴露

        String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置为none不暴露
        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                logger.info("exportLocal begin()");
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    logger.info("handle registry protocol");
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        logger.info("export registry protocol success,register zk.");
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

这段代码也是是服务发布的核心。我们一段一段来剖析,先看 exportLocal

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);

            // 只是将ref的class存入ThreadLocal
           ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));

            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, local);

            Exporter<?> exporter = protocol.export(invoker);
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
        }
    }

当protocol不为injvm时,进入方法.这里的protocol是registry,显然可以进入if判断,然后将协议转换成injvm,准备开始本地暴露。重点来看Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, local);这里的proxyFactory是SPI的一个adaptive实现,我们先看一下ProxyFactory接口:

    @SPI("javassist")
    public interface ProxyFactory {

        @Adaptive({Constants.PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker) throws RpcException;

        @Adaptive({Constants.PROXY_KEY})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    }

可知ProxyFactory的默认实现是javassist,而且它的方法上都加了Adaptive注解,value为proxy.根据SPI的规则,如果方法上加了Adaptive注解,且getInvoker参数中包含URL的话,则直接通过url.getParameter("proxy", "javassist")来获取extName,我们查看动态编译的代码发现事实也是如此:

    public com.alibaba.dubbo.rpc.Invoker getInvoker(Object ref, Class     interfaceClass, com.alibaba.dubbo.common.URL arg2) throws     com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(ref, interfaceClass, arg2);
    }

熟悉SPI的童鞋都知道,当调用getExtension方法的时候会实际上先获取指定extName的所有包装类,然后通过反射实现一层一层的注入:

     private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
             ...
            //如果存在有参构造函数(参数为type),然后循环创建包装类的实例,然后将实例赋值给instance,
            //最终的instance是new ProtocolListenerWrapper(new ProtocolFilterWrapper(new DubboProtocol()))
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }

由于ProxyFactory存在包装类StubProxyFactoryWrapper,所以getExtension('javassist')会先调用到StubProxyFactoryWrapper中。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        LOGGER.info("StubProxyFactoryWrapper getInvoker begin()");
        return proxyFactory.getInvoker(proxy, type, url);
    }

此时StubProxyFactoryWrapper中的 proxyFactory才是真正的JavassistProxyFactory,跟进去:

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        LOGGER.info("JavassistProxyFactory getInvoker begin()");
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                //反射
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

即所有方法的调用都会回调到doInvoke()方法,然后通过反射调到目标方法,原来本地暴露是通过反射来实现的。然后回到本地暴露的protocol.export(invoker)方法。这里的protocol也是一个适配类,它的包装类是ProtocolListenerWrapperProtocolFilterWrapper,invoker的协议是injvm,所以这个方法的调用链是ProtocolListenerWrapper->ProtocolFilterWrapper->InjvmProtocol。先看一下ProtocolListenerWrapper:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        LOGGER.info("ProtocolListenerWrapper protocol export begin()");
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

此时的协议是injvm,则继续调用protocol.export(invoker)进行本地暴露,然后返回一个ListenerExporterWrapper,此时持有的protocol引用是ProtocolFilterWrapper,跟进去:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        LOGGER.info("ProtocolFilterWrapper protocol export begin()");
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
       //这里的protocol才是最终的'InjvmProtocol'
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

那这里的buildInvokerChain是做什么的呢?

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        LOGGER.info("ProtocolFilterWrapper buildInvokerChain begin()");
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

原来是获取group为provider和 key为service.filter的所有激活的Filter列表。具体机制是先保存invoker到last变量,然后赋值给next变量,当调用invoke的时候,实际上是先调用filter,然后将上次保存的next变量传入,形成了一个链。即所有的Filter都会在之前执行完最后才执行真正的invoker.
然后调用到InjvmProtocol通过new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap)包装成一个Exporter返回:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        logger.info("injvm protocol export begin()");
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

整个本地暴露的流程完毕。本地暴露也是针对provider和consumer在同一台机器上的情况,可以直接通过反射来访问,而不需要网络传输的开销。


下面开始分析远程暴露。

    for (URL registryURL : registryURLs) {
           url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
           URL monitorUrl = loadMonitor(registryURL);
            if (monitorUrl != null) {
                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
            }
                       
           Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

           Exporter<?> exporter = protocol.export(invoker);
           logger.info("export registry protocol success,register zk.");
           exporters.add(exporter);
    }

这里的registryURLs是通过zookeeper://进行转换过来的,具体协议转换的代码在上面分析的loadRegistries()方法中,将其转换成了registry协议。远程暴露和本地暴露的代码类似,只是协议不同,这里将不再赘述.最终的调用链为:ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol,所以这里直接查看RegistryProtocol

     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        logger.info("registryProtocol export begin()");
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

这个方法实现了向ZK的注册和Netty服务的启动。先来看doLocalExport方法:

     private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

先根据originInvoker生成cacheKey,然后双重加锁初始化InvokerDelegete,这里用到了getProviderUrl(originInvoker)方法,里面用到了origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY),那这个Constants.EXPORT_KEY又是哪里设置进去的呢?回到远程暴露的入口:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()))

原来是在远程暴露之前就已经设置该值了。所以这里的getProviderUrl返回的是Dubbo协议的url.所以protocol.export最终的调用链变成了ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol,一切都清楚了。然后将结果包装成ExporterChangeableWrapper返回。
下面理所当然的跟进去DubboProtocol

     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        logger.info("dubbo export begin()");
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        // modified by lishen
        optimizeSerialization(url);

        return exporter;
    }

openServer格外显眼,即Netty服务的启动入口位置,真相大白!

总结

今天分析了Dubbo服务发布的大体流程,在以后的章节中将会更加详细的介绍Dubbo网络通信、ZK服务注册等内容!由于文章篇幅的问题,今天就先到这里!感谢阅读!

相关文章

网友评论

      本文标题:Dubbo源码剖析之服务发布

      本文链接:https://www.haomeiwen.com/subject/dwippftx.html