dubbo剖析:一 服务发布

作者: 益文的圈 | 来源:发表于2018-03-25 12:42 被阅读928次

    注:文章中使用的dubbo源码版本为2.5.4

    零、服务发布的目的

    服务提供者向注册中心注册服务,将服务实现类以服务接口的形式提供出去,以便服务消费者从注册中心查阅并调用服务。

    • 服务发布入口
    • 几个关键概念
    • 服务发布流程详解
    • 整体流程图总结

    一、服务发布入口

    1.1 Spring配置及ServiceBean映射

    服务发布方在工程中会有如下Spring配置


    服务发布的spring配置

    其中demoService为Spirng中配置服务的具体实现,即Spring中的一个Bean

    <bean id="demoService"class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />
    

    而对于下方配置,spring容器在启动的过程中会解析自定义的schema元素dubbo:service将其转换为实际的配置实现ServiceBean ,并把服务暴露出去

    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref=”demoService”/>
    

    1.2 ServiceBean

    ServiceBean

    类结构与onApplicationEvent回调:
    ServiceBean除了继承dubbo自己的配置类ServiceConfig以外,还实现了一系列的spring接口用来参与到spring容器的启动以及bean创建过程中。其中包括ApplicationListener。

        public void onApplicationEvent(ApplicationEvent event) {
            if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
                if (isDelay() && !isExported() && !isUnexported()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started. service: " + getInterface());
                    }
                    export();
                }
            }
        }
    

    Spring容器ApplicationContext的启动最后一步会触发ContextRefreshedEvent事件, 而ServiceBean实现了ApplicationListener接口监听此事件,触发onApplicationEvent(ApplicationEvent event)方法,在这个方法中触发export方法来暴露服务。

    包含的属性:
    ServiceBean的父类ServiceConfig中包含了很多配置属性,这些属性通过Spring配置注入赋值。

    private transient String beanName;  //bean名称,对应xml中的id
    private String interfaceName;  //接口名称,对应xml中的interface
    private Class<?> interfaceClass;  //通过Class.forName(interfaceName)生成
    private T ref;  //  接口实现类引用,对应xml中的ref
    protected List<ProtocolConfig> protocols;  //协议列表
    protected List<RegistryConfig> registries;  //注册中心列表
    private final List<Exporter<?>> exporters;  //已发布服务列表
    private final List<URL> urls;  //已发布服务地址列表
    private static final Protocol protocol;
    private static final ProxyFactory proxyFactory;
    

    二、几个关键概念:

    2.1 Invoker

    public interface Invoker<T> extends Node {
    
        /**
         * get service interface.
         *
         * @return service interface.
         */
        Class<T> getInterface();
    
        /**
         * invoke.
         *
         * @param invocation
         * @return result
         * @throws RpcException
         */
        Result invoke(Invocation invocation) throws RpcException;
    
    }
    

    可执行对象的抽象,能够根据方法的名称、参数得到相应的执行结果。
    Invoker可分为三类:

    • AbstractProxyInvoker:本地执行类的Invoker,实际通过Java反射的方式执行原始对象的方法
    • AbstractInvoker:远程通信类的Invoker,实际通过通信协议发起远程调用请求并接收响应
    • AbstractClusterInvoker:多个远程通信类的Invoker聚合成的集群版Invoker,加入了集群容错和负载均衡策略
      Invoker继承图

    Invocation:包含了需要执行的方法和参数等重要信息,他有两个实现类RpcInvocation和MockInvocation。

    2.2 ProxyFactory

    public interface ProxyFactory {
    
        /**
         * create proxy.
         *
         * @param invoker
         * @return proxy
         */
        @Adaptive({Constants.PROXY_KEY})
        <T> T getProxy(Invoker<T> invoker) throws RpcException;
    
        /**
         * create invoker.
         *
         * @param <T>
         * @param proxy
         * @param type
         * @param url
         * @return invoker
         */
        @Adaptive({Constants.PROXY_KEY})
        <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    
    }
    

    服务接口代理抽象,用于生成一个接口的代理类。
    getInvoker方法:针对Server端,将服务对象(如DemoServiceImpl)包装成一个Invoker对象。
    getProxy方法:针对Client端,创建接口(如DemoService)的代理对象。

    2.3 Exporter

    public interface Exporter<T> {
    
        /**
         * get invoker.
         *
         * @return invoker
         */
        Invoker<T> getInvoker();
    
        /**
         * unexport.
         * <p>
         * <code>
         * getInvoker().destroy();
         * </code>
         */
        void unexport();
    
    }
    

    维护Invoker的生命周期,内部包含Invoker或者ExporterMap。

    2.4 Protocol

    @SPI("dubbo")
    public interface Protocol {
    
        /**
         * 获取缺省端口,当用户没有配置端口时使用。
         * @return 缺省端口
         */
        int getDefaultPort();
    
        /**
         * 暴露远程服务:
         * @param <T>     服务的类型
         * @param invoker 服务的执行体
         * @return exporter 暴露服务的引用,用于取消暴露
         * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        /**
         * 引用远程服务:
         * @param <T>  服务的类型
         * @param type 服务的类型
         * @param url  远程服务的URL地址
         * @return invoker 服务的本地代理
         * @throws RpcException 当连接服务提供方失败时抛出
         */
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        /**
         * 释放协议
         */
        void destroy();
    
    }
    

    协议抽象接口。封装RPC调用。
    exporter方法:暴露远程服务(用于服务端),就是将Invoker对象通过协议暴露给外部。
    refer方法:引用远程服务(用于客户端),通过Clazz、url等信息创建远程的动态代理Invoker。

    2.5 关系图

    服务发布相关接口关系图

    1)ServiceConfig包含ProxyFactoryProtocol,通过SPI的方式注入生成;
    2)ProxyFactory负责创建Invoker
    3)Protocol负责通过Invoker生成Exporter,将服务启动并暴露;

    三、服务发布流程详解:

    3.1 简洁流程图

    服务发布简洁流程图

    3.2 发布入口

    ServiceBean监听入口:

        public void onApplicationEvent(ApplicationEvent event) {
            if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
                if (isDelay() && !isExported() && !isUnexported()) {
                    if (logger.isInfoEnabled()) {
                        logger.info("The service ready on spring started. service: " + getInterface());
                    }
                    export();
                }
            }
        }
    

    ServiceBean的export()方法内部最终会执行到ServiceConfig的doExportUrls()方法-->

    ServiceConfig执行发布:
    1)加载所有注册中心URL
    2)遍历所有Protocol,进行发布

        private void doExportUrls() {
            List<URL> registryURLs = loadRegistries(true);
            for (ProtocolConfig protocolConfig : protocols) {
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    

    doExportUrls()方法内部最终执行下面两个重要步骤,即 “本地发布”“远程发布” -->

    3.3 本地发布:

    即将服务发布成本地可调用的服务。

                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                    exportLocal(url);
                }
    
        private void exportLocal(URL url) {
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)
                        .setHost(NetUtils.LOCALHOST)
                        .setPort(0);
                Exporter<?> exporter = protocol.export(
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
                exporters.add(exporter);
                logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
            }
        }
    

    重点:Exporter<?> exporter = protocol.export(
    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
    代码中的proxyFactory为通过ExtensionLoader动态生成的JavassistProxyFactory
    代码中的protocol为通过ExtensionLoader动态生成的InjvmProtocol

    ...ExtensionLoader相关原理会在后续文章专门讲解...

    JavassistProxyFactory创建Invoker:
    通过JavassistProxyFactory创建(new)了一个AbstractProxyInvoker的实现,其内部通过Java反射的方式执行原始对象proxy的方法。

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper类不能正确处理带$的类名
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    

    InjvmProtocol.export:
    new了一个InjvmExporter。就是单纯的将url、Exporter放入exporterMap中。

        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
        }
    

    3.4 远程发布:

    遍历所有注册中心URL,进行远程发布:

                        for (URL registryURL : registryURLs) {
                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                            URL monitorUrl = loadMonitor(registryURL);
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                            }
                            if (logger.isInfoEnabled()) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                            }
                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    
                            Exporter<?> exporter = protocol.export(invoker);
                            exporters.add(exporter);
                        }
    

    代码中的proxyFactory为通过ExtensionLoader动态生成的JavassistProxyFactory
    代码中的protocol为通过ExtensionLoader动态生成的RegistryProtocol

    JavassistProxyFactory创建Invoker:
    同本地发布,不赘述。

                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
    

    RegistryProtocol.export:
    通过RegistryProtocol将Invoker发布成Dubbo服务。
    1)doLocalExport 所做的事情,就是调用DubboProtocol生成DubboExporter,并发布Dubbo服务;
    2)后续代码所做的事情,就是创建注册中心,将发布的服务注册到注册中心(zk),并监听注册中心(zk)的变动;

        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
            //registry provider
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            registry.register(registedProviderUrl);
            // 订阅override数据
            // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
            //保证每次export都返回一个新的exporter实例
            return new Exporter<T>() {
                public Invoker<T> getInvoker() {
                    return exporter.getInvoker();
                }
    
                public void unexport() {
                    try {
                        exporter.unexport();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                    try {
                        registry.unregister(registedProviderUrl);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                    try {
                        overrideListeners.remove(overrideSubscribeUrl);
                        registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
            };
        }
    

    doLocalExport:
    bounds为providerurl <--> exporter的映射,如果exporter未被创建,则调用DubboProtocol创建exporter。

        private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
            String key = getCacheKey(originInvoker);
            ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                synchronized (bounds) {
                    exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                    if (exporter == null) {
                        final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                        exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                        bounds.put(key, exporter);
                    }
                }
            }
            return (ExporterChangeableWrapper<T>) exporter;
        }
    

    DubboProtocol.export:

    DubboProtocol.export

    总体做的事情就是:

    • 根据传入的Invoker创建一个DubboExporter并返回;
    • 使用HeaderExchanger创建交换层服务端HeaderExchangeServer。其底层依赖NettyTransporter创建网络层服务端NettyServer
    • NettyServer会完成启动网络服务并监听服务端口的工作;

    ...对于服务端Server的实现分析请参考文章dubbo剖析:三 网络通信之 -- Server实现 ...

    向注册中心注册/监听:

            //registry provider
            final Registry registry = getRegistry(originInvoker);
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            registry.register(registedProviderUrl);
            // 订阅override数据
            // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
            final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
            final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
            overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
            registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    

    ...注册中心相关内容不展开...

    四、整体流程图总结

    整体流程图总结
    总结起来,Dubbo的服务发布过程:
    1)通过Spring配置初始化ServiceBean并注入属性;
    2)通过监听Spring事件触发服务发布过程;
    3)ServiceConfig中的ProxyFactoryProtocol由Dubbo的spi动态生成;
    4)对所有的协议,所有的注册中心进行遍历,通过JavassistProxyFactory生成可执行对象Invoker;
    5)通过RegistryProtocol将Invoker对象转换为Exporter,同时完成服务的启动监听和注册;
    6)最终由ServiceConfig维护所有发布的Exporter与服务URL到本地内存;

    相关文章

      网友评论

      • 2abc50207617:兄弟,你这个服务暴露,是我看的最详细的一篇了!之前网上看了一大堆,感觉80%的人都是胡扯呢!
        益文的圈:@KmmShmily 感谢认可
      • 青芒v5:精辟,去肉留骨,很到位
      • 晴天哥_王志:感谢楼主分享。我也准备写个dubbo的源码系列,到时候参考楼主的。
      • 1d656d70b8e2:怎么打包放到服务器上?
      • tukangzheng:写的很不错
      • IT人故事会:写得太好了。老铁,动动手指收藏了
        益文的圈:@IT人故事会 感谢关注

      本文标题:dubbo剖析:一 服务发布

      本文链接:https://www.haomeiwen.com/subject/zthxcftx.html