美文网首页码农的世界程序员
Dubbo 服务发布流程解析

Dubbo 服务发布流程解析

作者: 匠丶 | 来源:发表于2018-09-30 17:29 被阅读1776次

    本文将详细分析Dubbo的服务发布流程,建议结合文章Dubbo SPI 机制解析一起阅读。

    在开始分析之前,有必须熟悉一下Dubbo源码的目录结构,以及各模块的功能。


    模块说明:
    dubbo-common 公共逻辑模块:包括 Util 类和通用模型。
    dubbo-remoting 远程通讯模块:相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。
    dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
    dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
    dubbo-registry 注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
    dubbo-monitor 监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
    dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。
    dubbo-container 容器模块:是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。

    Spring 对外留出的扩展

    dubbo是基于spring 配置来实现服务的发布的,那么一定是基于spring的扩展来写了一套自己的标签。在dubbo配置文件中看到的<dubbo:service> ,就是属于自定义扩展标签。

    要实现自定义扩展,有三个步骤(在spring中定义了两个接口,用来实现扩展)
    1.NamespaceHandler: 注册一堆BeanDefinitionParser,利用他们来进行解析
    2.BeanDefinitionParser:用于解析每个element的内容
    3.Spring默认会加载jar包下的META-INF/spring.handlers文件寻找对应的NamespaceHandler。

    以下是Dubbo-config模块下dubbo-config-spring的配置:



    也就是说会通过DubboNamespaceHandler去解析dubbo自定义的标签。DubboBeanDefinitionParser用于把不同的配置分别转化成spring容器中的bean对象。

    public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    
        static {
            Version.checkDuplicate(DubboNamespaceHandler.class);
        }
    
        public void init() {
            registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
            registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
            registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
            registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
            registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
            registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
            registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
            registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
            registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
            registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
        }
    
    }
    

    为了在spring启动的时候,也相应的启动了发布服务和注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean ServiceBean、ReferenceBean。分别继承了ServiceConfig和ReferenceConfig。并分别实现了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware。

    InitializingBean为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
    DisposableBean bean被销毁的时候,spring容器会自动执行destory方法,比如释放资源。
    ApplicationContextAware 实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来。
    ApplicationListener ApplicationEvent事件监听,spring容器启动后会发一个事件通知。
    BeanNameAware 获得自身初始化时,本身的bean的id属性。

    由此可以看出,Dubbo 的服务发布流程的实现思路是:
    1.利用spring的解析收集xml中的配置信息,然后把这些配置信息存储到serviceConfig中。
    2.调用ServiceConfig的export方法来进行服务的发布和注册。

    ServiceConfig的export

    delay的使用

    export()中有一个delay参数,用于判断服务是否需要延迟加载。而延迟的方式也很直截了当,Thread.sleep(delay)。

     public synchronized void export() {
            if (provider != null) {
                if (export == null) {
                    export = provider.getExport();
                }
                if (delay == null) {
                    delay = provider.getDelay();
                }
            }
     if (delay != null && delay > 0) {
                Thread thread = new Thread(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(delay);
                        } catch (Throwable e) {
                        }
                        doExport();
                    }
                });
                thread.setDaemon(true);
                thread.setName("DelayExportServiceThread");
                thread.start();
            } else {
                doExport();
            }
    

    doExportUrls

    export方法先判断是否需要延迟暴露,然后执行doExport方法。doExport方法先执行一系列的检查方法,然后调用doExportUrls方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。

        @SuppressWarnings({ "unchecked", "rawtypes" })
        private void doExportUrls() {
            //是不是获得注册中心的配置
            List<URL> registryURLs = loadRegistries(true);
            //是不是支持多协议发布
            for (ProtocolConfig protocolConfig : protocols) { 
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    

    doExportUrls方法先调用loadRegistries获取所有的注册中心url,然后遍历调用doExportUrlsFor1Protocol方法。对于在标签中指定了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中心。
    获取注册中心url,会把注册的信息都放在一个URL对象中,URL内容如下:

    registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=&owner=&pid=2939&registry=zookeeper&timestamp=1488898049284
    

    如果scope没有配置或者配置local、remote,dubbo会将服务export到本地,URL内容如下:

    dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=&owner=&pid=2939&side=provider&timestamp=1488898464953
    

    发布服务

         //通过proxyFactory来获取Invoker对象
         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, 
         registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
         //注册服务
         Exporter<?> exporter = protocol.export(invoker);
    

    这个地方可以做一个小结:
    1.Invoker – 执行具体的远程调用
    2.Protocol – 服务地址的发布和订阅
    3.Exporter – 暴露服务或取消暴露

    Protocol 的变化过程

    第一步,在ServiceConfig中有一个静态变量protocol,通过Dubbo SPI 机制解析一文可知,此时protocol是Protocol$Adaptive。

     private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 
    

    第二步,调用Protocol$Adaptive中的export方法

      public Exporter export(Invoker arg0) throws Invoker {
        if (arg0 == null) throw new IllegalArgumentException("Invoker argument == null");
    
        if (arg0.getUrl() == null) throw new IllegalArgumentException("Invoker argument getUrl() == null");URL url = arg0.getUrl();
    
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    
        if(extName == null) throw new IllegalStateException("Fail to get extension(Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    
        Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
    
        return extension.export(arg0);
      }
    

    上面这段代码做两个事情
    1.从url中获得protocol的协议地址,如果protocol为空,表示已dubbo协议发布服务,否则根据配置的协议类型来发布服务。
    2.调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName)。
    getExtension()这个方法的主要作用是用来获取ExtensionLoader实例代表的扩展的指定实现,首先从cachedInstances缓存中查找是否已经创建了实例,如果没有调用createExtension()

     private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                //对获取的的和实例进行依赖注入
                injectExtension(instance);
                //cachedWrapperClasses是在loadFile中进行赋值的
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                // 对实例进行包装,分别调用带Protocol参数的构造函数创建实例,然后进行依赖注入。
                if (wrapperClasses != null && wrapperClasses.size() > 0) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
    

    由此可以看出,当我们传入的值是registry时,得到的是RegistryProtocol;
    当我们传入的值是dubbo时,会通过Wrapper对Protocol进行装饰,装饰器为ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper(ProtocolListenerWrapper(DubboProtocol))。

    ProtocolFilterWrapper

    这个类非常重要,dubbo机制里面日志记录、超时等等功能都是在这一部分实现的。这个类有3个特点,
    1.它有一个参数为Protocol protocol的构造函数;
    2.它实现了Protocol接口;
    3.它使用责任链模式,对export和refer函数进行了封装。
    ProtocolFilterWrapper会根据条件获取当前扩展可自动激活的实现,主要包括以下实现类:

    echo=com.alibaba.dubbo.rpc.filter.EchoFilter
    generic=com.alibaba.dubbo.rpc.filter.GenericFilter
    genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
    token=com.alibaba.dubbo.rpc.filter.TokenFilter
    accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
    activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
    classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
    context=com.alibaba.dubbo.rpc.filter.ContextFilter
    consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
    exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
    executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
    deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
    compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
    timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
    

    这其中涉及到很多功能,包括权限验证、异常、超时、计算调用时间等都在这些类实现。

    DubboProtocol.export
     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
            // export service.
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.put(key, exporter);
            openServer(url);
            return exporter;
    }
    

    openServer开启服务:

       private void openServer(URL url) {
            // find server.
            String key = url.getAddress();
            //client 也可以暴露一个只有server可以调用的服务。
            boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
            if (isServer) {
                ExchangeServer server = serverMap.get(key);
                if (server == null) {
                    serverMap.put(key, createServer(url));
                } else {
                    //server支持reset,配合override功能使用
                    server.reset(url);
                }
            }
        }
    

    createServer创建服务,开启心跳检测,默认使用netty

     private ExchangeServer createServer(URL url) {
            //默认开启server关闭时发送readonly事件
            url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
            //默认开启heartbeat
            url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
            String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
            if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
                throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
            url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
            ExchangeServer server;
            try {
                server = Exchangers.bind(url, requestHandler);
            } catch (RemotingException e) {
                throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
            }
            str = url.getParameter(Constants.CLIENT_KEY);
            if (str != null && str.length() > 0) {
                Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
                if (!supportedTypes.contains(str)) {
                    throw new RpcException("Unsupported client type: " + str);
                }
            }
            return server;
        }
    

    调用Exchangers.bind()方法,Exchanger是扩展点,此时会自适应加载默认扩展点HeaderExchanger。

      public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    

    调用Transporters.bind()方法,Transporter也是扩展点,此时会自适应加载默认扩展点NettyTransporter。通过NettyTranport创建基于Netty的server服务。

     public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
    RegistryProtocol.export
     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
            //export invoker
            final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
            //registry provider
            final Registry registry = getRegistry(originInvoker);
            //得到需要注册到zk上的协议地址,也就是dubbo://
            final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
            registry.register(registedProviderUrl);
    

    getRegistry()方法是invoker的地址获取registry实例

      private Registry getRegistry(final Invoker<?> originInvoker){
            URL registryUrl = originInvoker.getUrl(); //registry://
            if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
                //得到zookeeper的协议地址
                String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
                registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
            }///registryUrl就会变成了zookeeper://
            return registryFactory.getRegistry(registryUrl);
        }
    

    通过前面这段代码的分析,其实就是把registry的协议头改成服务提供者配置的协议地址,也就是我们配置的
    <dubbo:registry address=”zookeeper://ip:port”/>,然后registryFactory.getRegistry的目的,就是通过协议地址匹配到对应的注册中心。

    RegistryFactory是扩展点,此时会自适应加载默认扩展点ZookeeperRegistryFactory,ZookeeperRegistryFactory中并没有getRegistry方法,而是在父类AbstractRegistryFactory。AbstractRegistryFactory首先从缓存REGISTRIES中,根据key获得对应的Registry。如果没有则调用子类,创建一个注册中心。

        public Registry createRegistry(URL url) {
            return new ZookeeperRegistry(url, zookeeperTransporter);
        }
    

    代码分析到这里,我们对于getRegistry得出了一个结论,根据当前注册中心的配置信息,获得一个匹配的注册中心,也就是ZookeeperRegistry。接下来会调用registry.register(registedProviderUrl),将dubbo://的协议地址注册到zookeeper上。
    因为ZookeeperRegistry这个类中并没有register这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的register方法。
    FailbackRegistry.register
    FailbackRegistry,从名字上来看,是一个失败重试机制。FailbackRegistry.register会先调用父类的register方法,将当前url添加到缓存集合中。然后调用调用doRegister方法,是一个抽象方法,会由ZookeeperRegistry子类实现。
    ZookeeperRegistry.doRegister创建节点

        protected void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    Invoker

    前面花了很大的篇幅讲解protocol.export()方法,下面将讲解Invoker的实现过程。

        /**
         * 暴露远程服务:
         * @param <T> 服务的类型
         * @param invoker 服务的执行体
         * @return exporter 暴露服务的引用,用于取消暴露
         * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    

    服务实现类转换成Invoker,大概的步骤是:

    根据proxyFactory方法调用具体的ProxyFactory实现类的getInvoker方法获取Invoker。
    getInvoker的过程是,首先对实现类做一个包装,生成一个包装后的类。
    然后新创建一个Invoker实例,这个Invoker中包含着生成的Wrapper类,Wrapper类中有具体的实现类。

    第一步,在ServiceConfig中有一个静态变量proxyFactory,是一个自适应扩展点。

    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    
    使用JavassistProxyFactory获取Invoker
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        //第一步封装一个Wrapper类
        //该类是手动生成的
        //如果类是以$开头,就使用接口类型获取,其他的使用实现类获取
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
        //关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    
    使用JdkProxyFactory获取invoker

    JdkProxyFactory的getInvoker方法,直接返回一个AbstractProxyInvoker实例,没有做处理,只是使用反射调用具体的方法。

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }
    

    总结

    加载dubbo配置文件,解析生成ServiceConfig,进行服务发布。重点关注ServiceConfig里的两个成员变量protocol和proxyFactory。
    1、首先将服务的实现封装成一个Invoker,Invoker中封装了服务的实现类。
    2、将Invoker封装成Exporter,并缓存起来,缓存里使用Invoker的url作为key。
    3、服务端Server启动,监听端口。(请求来到时,根据请求信息生成key,到缓存查找Exporter,就找到了Invoker,就可以完成调用。)

    相关文章

      网友评论

        本文标题:Dubbo 服务发布流程解析

        本文链接:https://www.haomeiwen.com/subject/znswoftx.html