示例代码
Dubbo中的服务调用实例代码
ReferenceConfig<HelloService> reference = new ReferenceConfig<>();
reference.setInterface(HelloService.class);
//reference.setGeneric("true");
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer"))
.registry(new RegistryConfig("zookeeper://1.2.3.4:32181"))
.protocol(new ProtocolConfig("dubbo", -1))
.reference(reference)
.start();
HelloService demoService = bootstrap.getCache().get(reference);
String message = demoService.sayHello("dubbo");
System.out.println(message);
服务调用示例比较简单
-
通过
ReferenceConfig
配置接口信息 -
配置注册中心地址后,启动服务调用者
-
通过
bootstrap.getCache().get(reference)
获取代理实现类,然后通过代理实现类调用接口方法
代理对象
上面主要的一步是获取代理实现类,我们看下代理实现类是怎么创建的呢?
从上面的代码中可以看到是从缓存中获取的代理实现类,可见其在服务调用者启动的过程中就已经创建好了
我们再看一眼启动流程
private synchronized Future startSync() throws IllegalStateException {
...
exportServices();
// refer services
referServices();
可以看到在导出服务之后就有一步引用服务
private void referServices() {
configManager.getReferences().forEach(rc -> {
try {
ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
referenceCache.get(rc, false);
}
}
可以看到引用服务中处理的就是ReferenceConfig
配置
public <T> T get(ReferenceConfigBase<T> rc, boolean check) {
String key = generator.generateKey(rc);
Class<?> type = rc.getInterfaceClass();
boolean singleton = rc.getSingleton() == null || rc.getSingleton();
T proxy = null;
// Check existing proxy of the same 'key' and 'type' first.
if (singleton) {
proxy = get(key, (Class<T>) type);
} ...
if (proxy == null) {
List<ReferenceConfigBase<?>> referencesOfType = ConcurrentHashMapUtils.computeIfAbsent(referenceTypeMap, type, _t -> Collections.synchronizedList(new ArrayList<>()));
referencesOfType.add(rc);
List<ReferenceConfigBase<?>> referenceConfigList = ConcurrentHashMapUtils.computeIfAbsent(referenceKeyMap, key, _k -> Collections.synchronizedList(new ArrayList<>()));
referenceConfigList.add(rc);
proxy = rc.get(check);
}
return proxy;
}
这里的get
方法和示例代码中从缓存获取代理类是一致的,都是先从缓存中获取代理对象,不存在就初始化缓存,然后执行代理对象的创建流程
protected synchronized void init(boolean check) {
...
ref = createProxy(referenceParameters);
...
}
private T createProxy(Map<String, String> referenceParameters) {
...
aggregateUrlFromRegistry(referenceParameters);
invoker = protocolSPI.refer(interfaceClass, curUrl);
...
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
...
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
创建Invoker
创建Invoker流程里也是需要先构建URL,这里也是先从注册中心构建URL
然后通过Protocol
类创建Invoker,按照Dubbo系列--服务导出及服务注册《四》中的描述
registry
协议对应的实现类是InterfaceCompatibleRegistryProtocol
,但是由于Wrapper类的存在,还是会形成如下的包装关系
我们还是依次看下每个Wrapper类的refer
方法
- InvokerCountWrapper:由于是
registry
协议,满足条件判断,因此没有加任何东西
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
...
}
- ProtocolSerializationWrapper:没有加任何东西
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return protocol.refer(type, url);
}
- ProtocolFilterWrapper:由于是
registry
协议,满足条件判断,因此没有加任何东西
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
...
}
-
QosProtocolWrapper:先启动一个Qos服务器,通过Netty框架监听在0.0.0.0:22222,和导出时一样
-
ProtocolListenerWrapper:由于是
registry
协议,满足条件判断,因此没有加任何东西
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
...
}
-
ProtocolSecurityWrapper:对代理的接口类型、接口方法中设计的参数类型、返回值类型就行放通
-
InterfaceCompatibleRegistryProtocol:最终的实现类,但是由于它继承自
RegistryProtocol
,且没有重写refer
方法,因此会直接使用父类的、
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
...
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
URL consumerUrl = new ServiceConfigURL(
p,
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute
);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = new MigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker, consumerUrl, url);
}
return migrationInvoker;
}
这里会先构建注册URL,其实就是替换协议,真正的协议保存在参数registry=zookeeper
中
然后创建协议对应的Registry,创建流程和服务导出过程是一样的,最终创建的是ZookeeperRegistry
对象,进而创建Zookeeper客户端,并缓存
然后通过SPI方式加载Cluster
的实现类,可以看到默认加载的是FailoverCluster
@SPI("failover")
public interface Cluster {
}
同样由于包装类的存在,最终会形成如下的包装关系
image.png
然后构建消费者URL
然后创建了一个MigrationInvoker
,并通过SPI方式加载监听器,默认情况下只有一个MigrationRuleListener
监听器
最终会通过这个监听器来开始实际的创建Invoker
流程,这里的MigrationInvoker
可以认为是dubbo2
到dubbo3
的兼容性改造
@Override
public void migrateToApplicationFirstInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(0);
refreshInterfaceInvoker(latch);
refreshServiceDiscoveryInvoker(latch);
// directly calculate preferred invoker, will not wait until address notify
// calculation will re-occurred when address notify later
calcPreferredInvoker(newRule);
}
创建接口的Invoker
接口级别的Invoker对应着InterfaceCompatibleRegistryProtocol
中的getInvoker
方法
@Override
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
...
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY),
0,
getPath(parameters, type),
parameters
);
...
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
directory.subscribe(toSubscribeUrl(urlToRegistry));
return (ClusterInvoker<T>) cluster.join(directory, true);
}
-
这里先创建了一个
RegistryDirectory
对象 -
然后进行消费者URL构建
- 并进行消费者注册,注册完成后,注册中心里可以看到消费者信息
[zk: localhost:2181(CONNECTED) 253] ls -R /
/
/dubbo
/dubbo/services.HelloService/consumers/consumer%3A%2F%2F172.168.111.11%2Fservices.HelloService%3Fapplication%3Ddubbo-demo-api-consumer%26background%3Dfalse%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26executor-management-mode%3Disolation%26file-cache%3Dtrue%26interface%3Dservices.HelloService%26methods%3DsayHello%26pid%3D19540%26release%3D3.2.4%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1701174658287%26unloadClusterRelated%3Dfalse
-
然后构造路由链,会有一个主链、一个被用链,当前链指向的是主链
image.png
路由规则也是通过SPI方式加载,主要在RouterFactory
和StateRouterFactory
这两个接口的实现类
- 最后是进行订阅,需要注意的是,这里会指定
category=providers,configurators,routers
,即这三种类型的配置都进行订阅
在服务导出的时候只订阅了configurators
这一类别
再上面我们已经指出了,Registry的最终实现类是ZookeeperRegistry
,因此最终会进入到它的订阅方法
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
...
} else {
try {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
...
// create "directories".
zkClient.create(path, false, true);
...
notify(url, listener, urls);
}...
}
由于指定了category=providers,configurators,routers
,因此这里的toCategoriesPath(url)
会得到三个路径
-
/dubbo/services.HelloService/providers
:服务的提供者信息,由于在服务导出的时候,已经将服务提供者记录在了这个节点下
[zk: localhost:2181(CONNECTED) 255] ls -R /
/dubbo/services.HelloService/providers/dubbo%3A%2F%2F172.168.111.11%3A20880%2Fservices.HelloService%3Fapplication%3Ddubbo-demo-api-provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dservices.HelloService%26methods%3DsayHello%26prefer.serialization%3Dfastjson2%2Chessian2%26release%3D3.2.4%26service-name-mapping%3Dtrue%26side%3Dprovider%26timestamp%3D1701058538278
因此这里就可以将服务提供者信息加载到服务消费者中来
-
/dubbo/services.HelloService/configurators
:服务的配置信息 -
/dubbo/services.HelloService/routers
:服务的路由信息
我们这里重点关注下服务的提供者信息,因为下面会对服务提供者信息进行通知
由于RegistryDirectory
实现了通知接口,因此最终会进入它的通知接口
@Override
public synchronized void notify(List<URL> urls) {
...
refreshOverrideAndInvoker(providerURLs);
}
在它的通知接口里会创建Invoker
private void refreshInvoker(List<URL> invokerUrls) {
invoker = protocol.refer(serviceType, url);
}
首先是这里URL对应的是提供者的:
由于这里对应的协议是dubbo
,因此最终的实现者啊DubboProtocol
,我们看下它的refer
方法
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
...
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
可以看到这里创建的是DubboInvoker
,最主要的方法是获取客户端
private ExchangeClient initClient(URL url) {
Exchangers.connect(url, requestHandler);
}
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
return url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getAdaptiveExtension().connect(url, handler);
}
可以看到客户端是通过SPI方式加载Transporter
的实现类
@SPI(value = "netty", scope = ExtensionScope.FRAMEWORK)
public interface Transporter {
}
默认的实现类是Netty
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
因此最终创建的是Netty客户端,使用Netty客户端去连接服务提供者的Netty服务器
然后这个Netty客户端会保存在DubboInvoker
中,然后返回的就是DubboInvoker
由于Protocol
存在Wrapper类,最终会形成如下的包装类
包装的Invoker
会缓存在RegistryDirectory
对象中
能够正常连接的Invoker
会放到validInvoker
中
至此完成了服务提供者的订阅
- 最后是将
RegistryDirectory
通过Cluster包装返回
至此完成了接口的Invoker
创建过程,并缓存在MigrationInvoker
中
创建应用的Invoker
创建应用的Invoker过程和接口的基本一致,但是存在些许差异
应用级别的Invoker对应着InterfaceCompatibleRegistryProtocol
中的getServiceDiscoveryInvoker
方法
@Override
public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
registry = getRegistry(super.getRegistryUrl(url));
DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
- 首先进行URL协议重写,协议替换为
service-discovery-registry
,这点和服务导出里是一致的
-
由于此时协议变成了
service-discovery-registry
,因此Registry的实现类会变成ServiceDiscoveryRegistry
,这个类在Dubbo系列--服务导出及服务注册《四》中已经详细说过了,它会创建一个ServiceDiscovery
,对应的就是ZookeeperServiceDiscovery
,订阅的是/services
目录 -
然后创建的是
ServiceDiscoveryRegistryDirectory
,它和接口级别的Invoker创建的RegistryDirectory
不一样,但是继承于DynamicDirectory
- 最后的创建Invoker方法
doCreateInvoker
和接口级别的是一样的,只是因为这里使用的是ServiceDiscoveryRegistry
,订阅过程会稍有不同,其它的是一样的
我们详细看下它的订阅过程
@Override
public void doSubscribe(URL url, NotifyListener listener) {
...
Set<String> mappingByUrl = ServiceNameMapping.getMappingByUrl(url);
String key = ServiceNameMapping.buildMappingKey(url);
if (mappingByUrl == null) {
mappingByUrl = serviceNameMapping.getMapping(url);
try {
MappingListener mappingListener = new DefaultMappingListener(url, mappingByUrl, listener);
mappingByUrl = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
} ...
serviceNames = toTreeSet(serviceNames);
String serviceNamesKey = toStringKeys(serviceNames);
String serviceKey = url.getServiceKey();
logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, serviceKey));
// register ServiceInstancesChangedListener
...
serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
for (String serviceName : serviceNames) {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
if (CollectionUtils.isNotEmpty(serviceInstances)) {
serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
...
}
-
这里会先从缓存中获取
key=services.HelloService
的名字服务映射关系 -
缓存中不存在,则会从注册中心获取,在服务导出的时候已经注册了名字服务映射关系,因此这里可以获取到提供者应用名称
dubbo-demo-api-provider
[zk: localhost:2181(CONNECTED) 223] ls -R /
/dubbo/mapping/services.HelloService
[zk: localhost:2181(CONNECTED) 225] get /dubbo/mapping/services.HelloService
dubbo-demo-api-provider
- 然后再从注册中心获取这个应用名称对应的服务,在服务导出的时候已经注册过这个服务
[zk: localhost:2181(CONNECTED) 255] ls -R /
/services/dubbo-demo-api-provider/172.168.111.11:20880
[zk: localhost:2181(CONNECTED) 256] get /services/dubbo-demo-api-provider/172.168.111.11:20880
{"name":"dubbo-demo-api-provider","id":"172.168.111.11:20880","address":"172.168.111.11","port":20880,"sslPort":null,"payload":{"@class":"org.apache.dubbo.registry.zookeeper.ZookeeperInstance","id":"172.168.111.11:20880","name":"dubbo-demo-api-provider","metadata":{"dubbo.endpoints":"[{\"port\":20880,\"protocol\":\"dubbo\"}]","dubbo.metadata-service.url-params":"{\"prefer.serialization\":\"fastjson2,hessian2\",\"version\":\"1.0.0\",\"dubbo\":\"2.0.2\",\"release\":\"3.2.4\",\"side\":\"provider\",\"port\":\"20880\",\"protocol\":\"dubbo\"}","dubbo.metadata.revision":"810c0d1c672d1f93d39e7b23f39abf27","dubbo.metadata.storage-type":"local","timestamp":"1701058538278"}},"registrationTimeUTC":1701058540261,"serviceType":"DYNAMIC","uriSpec":null}
这里消费者会先订阅/services/dubbo-demo-api-provider
这个服务,用于监听这个应用下的所有实例
然后再把这个路径下的所有实例读出来,构建URL
最后通过notify通知机制,回调ServiceDiscoveryRegistryDirectory
中的notify
方法
@Override
public synchronized void notify(List<URL> instanceUrls) {
...
refreshOverrideAndInvoker(instanceUrls);
}
它的notify
方法就和接口级别的一样了,也是创建Invoker
,由于这里获取到的提供者信息中的协议也是dubbo,所以最终的Invoker也是有DubboProtocol
来创建,和接口级别的是完全一致
至此完成了应用级别的Invoker创建,并且会优先使用应用级别的Invoker
private synchronized void calcPreferredInvoker(MigrationRule migrationRule) {
Set<MigrationAddressComparator> detectors = ScopeModelUtil.getApplicationModel(consumerUrl == null ? null : consumerUrl.getScopeModel())
.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (CollectionUtils.isNotEmpty(detectors)) {
// 这里由于接口级别和应用级别都存在Invoker,而且没有指定阈值,默认就是true,会优先使用应用级别的Invoker
if (detectors.stream().allMatch(comparator -> comparator.shouldMigrate(serviceDiscoveryInvoker, invoker, migrationRule))) {
this.currentAvailableInvoker = serviceDiscoveryInvoker;
} else {
this.currentAvailableInvoker = invoker;
}
}
}
创建对象
在回过头来看下创建对象的流程
private T createProxy(Map<String, String> referenceParameters) {
createInvoker();
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
可以看到创建完成Invoker后,就通过ProxyFactory
来创建代理对象,在服务导出文中已经详解过这个接口了,ProxyFactory
的接口定义中指定了默认的实现类是javassist对应的JavassistProxyFactory
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
try {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
} catch (Throwable fromJavassist) {
// 异常后使用JDK方式重试
}
}
这里主要依赖Proxy
这个工具类来创建接口的代理实现类,这个工具类会把动态生成的类缓存起来,它动态生成的实现类如下
package services;
public class HelloServiceDubboProxy0 implements services.HelloService, org.apache.dubbo.rpc.service.EchoService, org.apache.dubbo.rpc.service.Destroyable, org.apache.dubbo.common.bytecode.ClassGenerator.DC {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public HelloServiceDubboProxy0(java.lang.reflect.InvocationHandler handler) {
handler=$1;
}
public HelloServiceDubboProxy0() {
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
// $w是javassist中的类型转换,基础类型转为包装类型,其它类型会忽略这个
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
}
public java.lang.Object $echo(java.lang.Object arg0) {
Object[] args = new Object[1];
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[1], args);
return (java.lang.Object) ret;
}
public void $destroy() {
Object[] args = new Object[0];
Object ret = handler.invoke(this, methods[2], args);
}
}
可以看到动态生成的代理类不仅会实现目标接口,还会实现三个工具接口,并且会生成一个接收InvocationHandler
的构造函数
所以生成代理对象的时候Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
使用的是这个构造函数,直接创建了一个InvokerInvocationHandler
对象作为入参,并且包装了我们之前创建的Invoker对象
至此,就完成了代理对象的创建
image.png
网友评论