美文网首页
Dubbo源码分析(二) 服务暴露

Dubbo源码分析(二) 服务暴露

作者: skyguard | 来源:发表于2018-11-12 10:31 被阅读0次

下面我们来说一下Dubbo的服务暴露。当我们启动一个服务后,都需要将服务暴露出去,那么这个服务暴露的过程是怎么完成的呢?现在我们就来看一下Dubbo是怎么完成服务暴露的。
先来介绍一个接口Exporter,这个接口有一个方法getInvoker,获取服务调用者。AbstractExporter实现了Exporter接口。InjvmExporter是本地调用的暴露实现类,DubboExporter是远程调用实现类。我们先来看一下InjvmExporter

InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
    super(type, url);
    this.key = key;
    this.exporterMap = exporterMap;
}

在InjvmProtocol类中初始化InjvmInvoker,这个类是本地调用协议的实现类。Protocol接口是协议接口,定义了所有通信的协议。dubbo默认支持的协议有dubbo协议,webservice协议,http协议,thrift协议等。下面我们再来说一下dubbo都有哪几层。
配置层(Config):该层是将业务方的service信息,配置文件的信息收集起来,主要是以ServiceConfig和ReferenceConfig为中心,ServiceConfig是服务提供方的配置,当Spring启动的时候会相应的启动provider服务发布和注册的过程,主要是加入一个ServiceBean继承ServiceConfig在Spring注册。同理ReferenceConfig是consumer方的配置,当消费方启动时,会启动consumer的发现服务订阅服务的过程,当然也是使用一个ReferenceBean继承ReferenceConfig注册在spring上。
服务代理层(Proxy):对服务接口进行透明代理,生成服务的客户端和服务器端,使服务的远程调用就像在本地调用一样。默认使用JavassistProxyFactory,返回一个Invoker,Invoker则是个可执行核心实体,Invoker的invoke方法通过反射执行service方法。
服务注册层(Registry):封装服务地址的注册和发现,以服务URL为中心,基于zk。
集群层(Cluster):提供多个节点并桥接注册中心,主要负责loadBanlance、容错。
监控层(Monitor):RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor和MonitorService。
远程调用层(Protocol):封装RPC调用,provider通过export方法进行暴露服务/consumer通过refer方法调用服务。而Protocol依赖的是Invoker。通过上面说的Proxy获得的Invoker,包装成Exporter。
信息交换层(Exchange):该层封装了请求响应模型,将同步转为异步,信息交换层依赖Exporter,最终将通过网络传输层接收调用请求RequestFuture和ResponseFuture。
网络传输层(Transport):抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec。
数据序列化层:该层无需多言,将数据序列化反序列化。
通过上面的框架了解我们大致知道了dubbo是怎么工作的,接下来我们来通过代码来具体看看dubbo的服务发布过程,进一步理解dubbo的工作原理。
我们先来看一下ServiceConfig这个类。这个类是服务的配置类。我们来看一下具体的实现

private void doExportUrls() {
    // 加载注册中心 URL 数组
    List<URL> registryURLs = loadRegistries(true);
    // 循环 `protocols` ,向逐个注册中心分组暴露服务。
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

到doExportUrlsFor1Protocol方法

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 协议名
    String name = protocolConfig.getName();
    if (name == null || name.length() == 0) {
        name = "dubbo";
    }

    // 将 `side`,`dubbo`,`timestamp`,`pid` 参数,添加到 `map` 集合中。
    Map<String, String> map = new HashMap<String, String>();
    map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    // 将各种配置对象,添加到 `map` 集合中。
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider, Constants.DEFAULT_KEY); // ProviderConfig ,为 ServiceConfig 的默认属性,因此添加 `default` 属性前缀。
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    // 将 MethodConfig 对象数组,添加到 `map` 集合中。
    if (methods != null && !methods.isEmpty()) {
        for (MethodConfig method : methods) {
            // 将 MethodConfig 对象,添加到 `map` 集合中。
            appendParameters(map, method, method.getName());
            // 当 配置了 `MethodConfig.retry = false` 时,强制禁用重试
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 将 ArgumentConfig 对象数组,添加到 `map` 集合中。
            List<ArgumentConfig> arguments = method.getArguments();
            if (arguments != null && !arguments.isEmpty()) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) { // 指定了类型
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) { // 找到指定方法
                                    Class<?>[] argTypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    if (argument.getIndex() != -1) { // 指定单个参数的位置 + 类型
                                        if (argTypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            // 将 ArgumentConfig 对象,添加到 `map` 集合中。
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex()); // `${methodName}.${index}`
                                        } else {
                                            throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        for (int j = 0; j < argTypes.length; j++) {
                                            Class<?> argClazz = argTypes[j];
                                            if (argClazz.getName().equals(argument.getType())) {
                                                // 将 ArgumentConfig 对象,添加到 `map` 集合中。
                                                appendParameters(map, argument, method.getName() + "." + j); // `${methodName}.${index}`
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) { // 多余的判断,因为 `argument.getIndex() == -1` 。
                                                    throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) { // 指定单个参数的位置
                        // 将 ArgumentConfig 对象,添加到 `map` 集合中。
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex()); // `${methodName}.${index}`
                    } else {
                        throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    // generic、methods、revision
    if (ProtocolUtils.isGeneric(generic)) {
        map.put("generic", generic);
        map.put("methods", Constants.ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision); // 修订本
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 获得方法数组
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }

    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) { // true || default 时,UUID 随机生成
            map.put("token", UUID.randomUUID().toString());
        } else {
            map.put("token", token);
        }
    }
    // 协议为 injvm 时,不注册,不通知。
    if ("injvm".equals(protocolConfig.getName())) {
        protocolConfig.setRegister(false);
        map.put("notify", "false");
    }
    // export service
    String contextPath = protocolConfig.getContextpath();
    if ((contextPath == null || contextPath.length() == 0) && provider != null) {
        contextPath = provider.getContextpath();
    }

    // host、port
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);

    // 创建 Dubbo URL 对象
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);


    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(Constants.SCOPE_KEY);
    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        // 服务本地暴露
        // export to local if the config is not remote (export to remote only when config is remote)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            exportLocal(url);
        }

        // 服务远程暴露
        // export to remote if the config is not local (export to local only when config is local)
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
                    url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                    // 获得监控中心 URL
                    URL monitorUrl = loadMonitor(registryURL); 
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    }
                    // 使用 ProxyFactory 创建 Invoker 对象
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                    // 创建 DelegateProviderMetaDataInvoker 对象
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 使用 Protocol 暴露 Invoker 对象
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    // 添加到 `exporters`
                    exporters.add(exporter);
                }
            } else { // 用于被服务消费者直连服务提供者
                // 使用 ProxyFactory 创建 Invoker 对象
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                // 创建 DelegateProviderMetaDataInvoker 对象
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                // 使用 Protocol 暴露 Invoker 对象
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                // 添加到 `exporters`
                exporters.add(exporter);
            }
        }
    }
    this.urls.add(url);
}

把要暴露的url添加到集合中,然后Invorker执行调用就行了。然后我们来看一下InjvmProtocol的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

再来看一下DubboProtocol的export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // 创建 DubboExporter 对象,并添加到 `exporterMap` 。
    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    // TODO 【8005 sub】
    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 启动服务器
    openServer(url);

    // 初始化序列化优化器
    optimizeSerialization(url);
    return exporter;
}

再到openServer方法

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client 也可以暴露一个只有server可以调用的服务。
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
    if (isServer) {
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            server.reset(url);
        }
    }
}

再到createServer方法

private ExchangeServer createServer(URL url) {
    // 默认开启 server 关闭时发送 READ_ONLY 事件
    // send readonly event when server closes, it's enabled by default
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // 默认开启 heartbeat
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 校验 Server 的 Dubbo SPI 拓展是否存在
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    // 设置编解码器为 Dubbo ,即 DubboCountCodec
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

    // 启动服务器
    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    // 校验 Client 的 Dubbo SPI 拓展是否存在
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

看完源码,我们已经知道了dubbo的主要发布过程,现在我们回过头来结合dubbo的总体架构和源码的分析,总结一下dubbo服务发布。服务发布过程总共五个步骤:

业务方将服务接口和实现编写定义好,添加dubbo相关配置文件。
Config层加载配置文件形成上下文,Config层包括:ServiceConfig、ProviderConfig、RegistryConfig等。
ServiceConfig根据Protocol类型,根据ProtocolConfig、ProviderConfig加载registry,根据加载的registry创建dubbo的URL。
然后是ProxyFactory生成代理对象,dubbo中有两种代理方式,JDK代理和Javassist代理,默认使用Javassist代理,Proxy代理类根据dubbo配置信息获取到接口信息、通过动态代理方式将接口的所有方法交给Proxy代理类进行代理,并封装进Invorker里面。
将所有需要暴露的service封装的Invoker组成一个list传给信息交换层提供给服务方进行调用。
服务暴露的分析就到这里了。

相关文章

网友评论

      本文标题:Dubbo源码分析(二) 服务暴露

      本文链接:https://www.haomeiwen.com/subject/cszoxqtx.html