下面我们来说一下Dubbo的服务暴露。当我们启动一个服务后,都需要将服务暴露出去,那么这个服务暴露的过程是怎么完成的呢?现在我们就来看一下Dubbo是怎么完成服务暴露的。
先来介绍一个接口Exporter,这个接口有一个方法getInvoker,获取服务调用者。AbstractExporter实现了Exporter接口。InjvmExporter是本地调用的暴露实现类,DubboExporter是远程调用实现类。我们先来看一下InjvmExporter
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
}
在InjvmProtocol类中初始化InjvmInvoker,这个类是本地调用协议的实现类。Protocol接口是协议接口,定义了所有通信的协议。dubbo默认支持的协议有dubbo协议,webservice协议,http协议,thrift协议等。下面我们再来说一下dubbo都有哪几层。
配置层(Config):该层是将业务方的service信息,配置文件的信息收集起来,主要是以ServiceConfig和ReferenceConfig为中心,ServiceConfig是服务提供方的配置,当Spring启动的时候会相应的启动provider服务发布和注册的过程,主要是加入一个ServiceBean继承ServiceConfig在Spring注册。同理ReferenceConfig是consumer方的配置,当消费方启动时,会启动consumer的发现服务订阅服务的过程,当然也是使用一个ReferenceBean继承ReferenceConfig注册在spring上。
服务代理层(Proxy):对服务接口进行透明代理,生成服务的客户端和服务器端,使服务的远程调用就像在本地调用一样。默认使用JavassistProxyFactory,返回一个Invoker,Invoker则是个可执行核心实体,Invoker的invoke方法通过反射执行service方法。
服务注册层(Registry):封装服务地址的注册和发现,以服务URL为中心,基于zk。
集群层(Cluster):提供多个节点并桥接注册中心,主要负责loadBanlance、容错。
监控层(Monitor):RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor和MonitorService。
远程调用层(Protocol):封装RPC调用,provider通过export方法进行暴露服务/consumer通过refer方法调用服务。而Protocol依赖的是Invoker。通过上面说的Proxy获得的Invoker,包装成Exporter。
信息交换层(Exchange):该层封装了请求响应模型,将同步转为异步,信息交换层依赖Exporter,最终将通过网络传输层接收调用请求RequestFuture和ResponseFuture。
网络传输层(Transport):抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec。
数据序列化层:该层无需多言,将数据序列化反序列化。
通过上面的框架了解我们大致知道了dubbo是怎么工作的,接下来我们来通过代码来具体看看dubbo的服务发布过程,进一步理解dubbo的工作原理。
我们先来看一下ServiceConfig这个类。这个类是服务的配置类。我们来看一下具体的实现
private void doExportUrls() {
// 加载注册中心 URL 数组
List<URL> registryURLs = loadRegistries(true);
// 循环 `protocols` ,向逐个注册中心分组暴露服务。
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
到doExportUrlsFor1Protocol方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 协议名
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
// 将 `side`,`dubbo`,`timestamp`,`pid` 参数,添加到 `map` 集合中。
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 将各种配置对象,添加到 `map` 集合中。
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY); // ProviderConfig ,为 ServiceConfig 的默认属性,因此添加 `default` 属性前缀。
appendParameters(map, protocolConfig);
appendParameters(map, this);
// 将 MethodConfig 对象数组,添加到 `map` 集合中。
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// 将 MethodConfig 对象,添加到 `map` 集合中。
appendParameters(map, method, method.getName());
// 当 配置了 `MethodConfig.retry = false` 时,强制禁用重试
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 将 ArgumentConfig 对象数组,添加到 `map` 集合中。
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) { // 指定了类型
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) { // 找到指定方法
Class<?>[] argTypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) { // 指定单个参数的位置 + 类型
if (argTypes[argument.getIndex()].getName().equals(argument.getType())) {
// 将 ArgumentConfig 对象,添加到 `map` 集合中。
appendParameters(map, argument, method.getName() + "." + argument.getIndex()); // `${methodName}.${index}`
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argTypes.length; j++) {
Class<?> argClazz = argTypes[j];
if (argClazz.getName().equals(argument.getType())) {
// 将 ArgumentConfig 对象,添加到 `map` 集合中。
appendParameters(map, argument, method.getName() + "." + j); // `${methodName}.${index}`
if (argument.getIndex() != -1 && argument.getIndex() != j) { // 多余的判断,因为 `argument.getIndex() == -1` 。
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) { // 指定单个参数的位置
// 将 ArgumentConfig 对象,添加到 `map` 集合中。
appendParameters(map, argument, method.getName() + "." + argument.getIndex()); // `${methodName}.${index}`
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// generic、methods、revision
if (ProtocolUtils.isGeneric(generic)) {
map.put("generic", generic);
map.put("methods", Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision); // 修订本
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 获得方法数组
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) { // true || default 时,UUID 随机生成
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
}
}
// 协议为 injvm 时,不注册,不通知。
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// host、port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 创建 Dubbo URL 对象
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 服务本地暴露
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 服务远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 获得监控中心 URL
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
} else { // 用于被服务消费者直连服务提供者
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
把要暴露的url添加到集合中,然后Invorker执行调用就行了。然后我们来看一下InjvmProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
再来看一下DubboProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 创建 DubboExporter 对象,并添加到 `exporterMap` 。
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// TODO 【8005 sub】
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 初始化序列化优化器
optimizeSerialization(url);
return exporter;
}
再到openServer方法
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
再到createServer方法
private ExchangeServer createServer(URL url) {
// 默认开启 server 关闭时发送 READ_ONLY 事件
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 默认开启 heartbeat
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 校验 Server 的 Dubbo SPI 拓展是否存在
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 设置编解码器为 Dubbo ,即 DubboCountCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 启动服务器
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 校验 Client 的 Dubbo SPI 拓展是否存在
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
看完源码,我们已经知道了dubbo的主要发布过程,现在我们回过头来结合dubbo的总体架构和源码的分析,总结一下dubbo服务发布。服务发布过程总共五个步骤:
业务方将服务接口和实现编写定义好,添加dubbo相关配置文件。
Config层加载配置文件形成上下文,Config层包括:ServiceConfig、ProviderConfig、RegistryConfig等。
ServiceConfig根据Protocol类型,根据ProtocolConfig、ProviderConfig加载registry,根据加载的registry创建dubbo的URL。
然后是ProxyFactory生成代理对象,dubbo中有两种代理方式,JDK代理和Javassist代理,默认使用Javassist代理,Proxy代理类根据dubbo配置信息获取到接口信息、通过动态代理方式将接口的所有方法交给Proxy代理类进行代理,并封装进Invorker里面。
将所有需要暴露的service封装的Invoker组成一个list传给信息交换层提供给服务方进行调用。
服务暴露的分析就到这里了。
网友评论