美文网首页
Dubbo服务暴露

Dubbo服务暴露

作者: 爱健身的兔子 | 来源:发表于2021-02-08 19:15 被阅读0次

    1 Dubbo服务暴露介绍

    Dubbo服务暴露的整体流程如下:

    2 Dubbo服务暴露源码

    2.1 延时暴露

    ServiceConfig#export

        public synchronized void export() {
            //检查和更新附属的配置文件
            checkAndUpdateSubConfigs();
            // shouldExport 默认值为 true    如果export==null?true:export
            //如果只想本地启动服务,不把服务暴露出去给别人使用,可以配置<dubbo:provider export="false" />禁止服务导出
            if (!shouldExport()) {
                return;
            }
            //如果应该延迟导出,就去延迟导出 ,这里不研究
            if (shouldDelay()) {
                DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
            } else {
                // 立即导出服务
                doExport();
            }
    }
    

    如果设置了 delay 参数,Dubbo 的处理方式是启动一个守护线程在 sleep 指定时间后再 doExport。

    2.2 暴露检查

    ServiceConfig#doExport

     protected synchronized void doExport() {
            // 这个服务不需要暴露  
            if (unexported) {
                throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
            }
            //这个服务已经暴露了
            if (exported) {
                return;
            }
            exported = true;
            //如果path为空,给它赋值
            if (StringUtils.isEmpty(path)) {
                path = interfaceName;
            }
            //多协议多注册中心暴露服务
            doExportUrls();
        }
    

    判断服务是否已经暴露或者不需要暴露。

    2.3 多协议多注册中心暴露服务

    ServiceConfig#doExportUrls

    //Dubbo允许我们使用不同的协议暴露服务,也允许我们向多个    注册中心注册服务
        //多协议多注册中心暴露服务
        private void doExportUrls() {
            //通过loadRegistries加载注册中心链接
            List<URL> registryURLs = loadRegistries(true);
            //遍历协议、在每个协议下暴露服务
            for (ProtocolConfig protocolConfig : protocols) {
                // 通过(路径+guoup+version)构建一个string类型的 路径key  从协议配置类中,获取上下文路径.
                String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
                //提供者模型
                ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
                //ApplicationModel持有所有的提供者模型 .init初始化提供者模型 
                ApplicationModel.initProviderModel(pathKey, providerModel);
                //组装URL传入协议配置类 注册集合
                doExportUrlsFor1Protocol(protocolConfig, registryURLs);
            }
        }
    

    loadRegistries方法的主要是构造注册配置类的URL。

    2.4 组装URL

    ServiceConfig#doExportUrlsFor1Protocol

        private void   doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    //省略URL 的构造 (协议名、IP、端口、上下文路径、map ),map包含了url的参数(如generic,token)。
    //------------------------ 暴露Dubbo服务-----------------------------
            if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .hasExtension(url.getProtocol())) {
                // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
                url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
            }
            //  作用范围三个取值(none:什么都不做、scope != remote,暴露到本地scope != local,暴露到远程)
            String scope = url.getParameter(SCOPE_KEY);
            //如果scope !=none 则进入方法
            if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
                // scope != remote 暴露到本地
                if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                    //导出到本地
                    exportLocal(url);
                }
                //  scope != local  暴露到远程
                if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    
                    if (CollectionUtils.isNotEmpty(registryURLs)) {
                        for (URL registryURL : registryURLs) {
                            //if protocol is only injvm ,not register
                            // 如果协议只是 injvm , 而不是redister
                            if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                                continue;
                            }
                            url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                            // 加载监视器链接
                            URL monitorUrl = loadMonitor(registryURL);
                            // 如果 monitor不等于null     将监视器链接作为参数添加到URL中
                            if (monitorUrl != null) {
                                url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                            }
                            // 对于提供程序,这用于启用自定义代理来生成调用程序
                            String proxy = url.getParameter(PROXY_KEY);
    
                            if (StringUtils.isNotEmpty(proxy)) {
                                registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                            }
                            // 为服务提供类(ref),生成Imvoker
                            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                            // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                            //暴露 服务  并生成exporter
                            Exporter<?> exporter = protocol.export(wrapperInvoker);
                            exporters.add(exporter);
                        }
                        //不存在注册中心 ,仅仅暴露服务
                    } else {
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                    //发布元数据
                    WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                    if (metadataService != null) {
                        metadataService.publishServiceDefinition(url);
                    }
                }
            }
        }
    
    

    注意

    如果配置 scope=none, 则不会进行服务暴露;如果没有配置 scope 或者 scope=local,则会进行本地暴露。本地暴露时使用 injvm 协议,injvm 协议是一个伪协议,它不开启端口,不能被远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链。

    上面远程暴露服务会根据是有有注册中心分两种情况:

    1. 直接暴露服务

    2. 通过RegistryProtocol暴露远程服务后,暴露服务到注册中心。

    2.5 创建Invoker

    在服务暴露之前需要创建InvokerInvoker是实体域,它是Dubbo的核心模型,其他模型都向它靠拢,或者转换成它。它代表一个可执行体,可向它发起invoke调用,这个调用有可能是一个本地的实现,也有可能是一个远程的实现,也有可能是一个集群的实现。 InvokerProxyFactory创建的,Dubbo的默认代理工厂实现类是JavassistProxyFactory

    JavassistProxyFactory#getInvoker

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // 为目标类创建 Wrapper(方法就是,从缓存中获取Wrepper ,如果缓存中没有,创建,写入缓存)
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    
    2.6 本地暴露

    ServiceConfig#exportLocal

     private void exportLocal(URL url) {
            URL local = URLBuilder.from(url)
                    .setProtocol(LOCAL_PROTOCOL)   //设置协议头为 injvm
                    .setHost(LOCALHOST_VALUE)      //设置Host为localhost
                    .setPort(0)
                    .build();
            // 创建 Invoker,并暴露服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
            Exporter<?> exporter = protocol.export(
                    PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
        }
    

    本地暴露是通过InjvmProtocol暴露。

    2.7 远程暴露

    远程暴露的协议通过ExtensionLoader加载,同时会加载ProtocolFilterWrapperProtocolListenerWrapper作为Protocol的包装类。

    • ProtocolFilterWrapper:包装Context,token,echo等的FilterInvoker

    • ProtocolListenerWrapper:包装ExporterListener';

    2.8 注册服务

    如果使用了注册中心,则在通过具体协议(如 Dubbo 协议)暴露服务之后进入服务注册流程,将服务节点注册到注册中心。

    RegistryProtocol#export

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //向Zookeeper注册节点
        registry.register(registedProviderUrl);
        // 订阅override数据
        // 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }
    
    private Registry getRegistry(final Invoker<?> originInvoker){
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
    }
    

    3 DubboProtocol

    Dubbo协议的暴露的服务是通过Netty远程通信实现服务调用。

    DubboProtocol#export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
    
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    
        openServer(url);
    
        return exporter;
    }
    

    DubboProtocol#openServer

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }
    
    private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
    

    Exchanger (默认 HeaderExchanger)封装请求响应模式,同步转异步,以 Request、Response 为中心。

    HeaderExchager#bind

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
    

    Transporters#bind

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }
    

    底层传输默认使用 NettyTransporter,最终是创建 NettyServer。

    NettyTransporter#bind

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
    

    至此Netty连接创建。

    相关文章

      网友评论

          本文标题:Dubbo服务暴露

          本文链接:https://www.haomeiwen.com/subject/bhucxltx.html