总体流程
ReferenceBean 实现了 FactoryBean 和 InitializingBean 接口, 所以在初始化Bean的时候会执行 ReferenceBean#afterPropertiesSet 方法
ReferenceBean#afterPropertiesSet =>
ReferenceBean#getObject =>
ReferenceConfig#get =>
ReferenceConfig#init =>
ReferenceConfig#createProxy =>
Protocol$Adaptive#refer =>
ProtocolFilterWrapper#refer =>
ProtocolListenerWrapper#refer =>
RegistryProtocol#refer =>
RegistryProtocol#doRefer =>
ZookeeperRegistry#register(注册consumer节点) =>
RegistryDirectory#subscribe(订阅providers routers configurators节点,生成Invoker)
Cluster$Adaptive#join =>
MockClusterWrapper#join =>
FailoverCluster#join(return new FailoverClusterInvoker) =>
return new MockClusterInvoker =>
ProviderConsumerRegTable#registerConsumer =>
return Invoker
----------------------------- 其中 RegistryDirectory#subscribe 中涉及到 Invoer 的生成过程 ----------------------
RegistryDirectory#subscribe =>
FailbackRegistry#subscribe =>
AbstractRegistry#subscribe =>
ZookeeperRegistry#doSubscribe =>
AbstractZookeeperClient#addChildListener =>
ZookeeperRegistry#notify =>
AbstractRegistry#doNotify =>
RegistryDirectory#notify =>
RegistryDirectory#refreshInvoker =>
RegistryDirectory#toInvokers=>
new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl) =>
newUrlInvokerMap.put(key, invoker)
Consumer注册
这一部分工作在 ReferenceConfig#createProxy 方法中完成, 核心代码如下
// 这里的 REF_PROTOCOL 即指 Protocol$Adaptive
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
-
在 Protocol$Adaptive#refer 方法中,根据 SPI 的 AOP 特性,会返回一个 ProtocolFilterWrapper ,它包装了 ProtocolListenerWrapper ,
而 ProtocolListenerWrapper 包装了 RegistryProtocol ,所以调用顺序如下
ProtocolFilterWrapper#refer => ProtocolListenerWrapper#refer => RegistryProtocol#refer
针对 RegistryProtocol , ProtocolFilterWrapper 和 ProtocolListenerWrapper 内部不做任何特殊处理
-
在 RegistryProtocol#refer 方法中,完成Consumer节点注册、节点监听、Invoker 生成工作
RegistryProtocol#refer => RegistryProtocol#doRefer => ZookeeperRegistry#register(注册consumer节点) =>
节点监听
consumer://10.10.129.164/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello,asyncSay&pid=32780&side=consumer&sticky=false×tamp=1570954083348
节点监听大概流程如下:
- 每个 consumer 对应一个唯一的 url 和 RegistryDirectory 对象, RegistryDirectory 实现了 NotifyListener 接口
- 每个 RegistryDirectory 对应一个 ChildListener , ChildListener 是一个匿名内部类,主要功能就是执行 ZookeeperRegistry#notify 方法
- 每个 ChildListener 对应一个 TargetChildListener ,即 CuratorZookeeperClient.CuratorWatcherImpl ,它实现了 CuratorWatcher 和 TreeCacheListener 接口
- 将上面创建的 TargetChildListener 对象作为 path的监听器(path=/dubbo/service/provisers,/dubbo/service/routers,/dubbo/service/configurations)
- 执行 AbstractRegistry#notify 方法,然后执行 RegistryDirectory#notify 方法,即 每当provider发生变化的时候,需要刷新 Invoker 对象
dubbo 的节点监听机制太恶心了,不如这样子讲, 当一个节点发生变化得时候,会执行哪些操作
创建Invoker
RegistryDirectory#notify 方法主要就是根据providerUrl生成 Invoker 对象
RegistryDirectory#notify
- 处理 configurations 类型的url, 如果有这种url ,根据这种url生成 Configurator 对象;
- 处理 routers 类型的 url, 如果有这种url ,根据这种url生成 Router 对象并添加到 RouterChain;
- 处理 providers 类型的 url,即执行 RegistryDirectory#refreshOverrideAndInvoker
RegistryDirectory#refreshOverrideAndInvoker
- 先根据 Configurator 对象 merge overrideDirectoryUrl, 相当于时这些配置信息合并
- 执行 RegistryDirectory#refreshInvoker 方法
RegistryDirectory#refreshInvoker
- RegistryDirectory#toInvokers
- RegistryDirectory#mergeUrl
- new InvokerDelegate()
new InvokerDelegate()
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
- 先通过 protocol.refer 获取到 包装好的(filter,listener) DubboProtocol
- 创建 InvokerDelegate 对象
- 执行 InvokerWrapper 的构造方法
- Invoker 放入缓存
加入Cluster
在上一步返回 Invoker 对象后, 执行 Cluster$Adaptive#join 方法,先根据 SPI 获取到 FailoverCluster , 然后通过 MockClusterWrapper 包装
Cluster$Adaptive#join =>
MockClusterWrapper#join =>
FailoverCluster#join =>
new FailoverClusterInvoker =>
return MockClusterInvoker
创建代理
在 ReferenceConfig#createProxy 方法中,创建完 Invoker 对象之后,以 Invoker 为入参创建代理对象
(T) PROXY_FACTORY.getProxy(invoker)
ProxyFactory$Adaptive#getProxy
=> StubProxyFactoryWrapper#getProxy
=> AbstractProxyFactory#getProxy
=> JavassistProxyFactory#getProxy
=> return proxy
JavassistProxyFactory#getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
最新红生成的代理类如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService{
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String paramString){
Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramString;
Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);
return ((String)localObject);
}
public Object $echo(Object paramObject)
{
Object[] arrayOfObject = new Object[1];
arrayOfObject[0] = paramObject;
Object localObject = this.handler.invoke(this, methods[1], arrayOfObject);
return ((Object)localObject);
}
public proxy0(){
}
public proxy0(InvocationHandler paramInvocationHandler){
this.handler = paramInvocationHandler;
}
}
InvokerInvocationHandler 实现了Java的 InvocationHandler 接口,所以它同样适用于JDK动态代理 它的 invoker 方法如下
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
即:最终用户通过代理对象调用服务的时候, 实际是执行 Invoker#invoker 方法
网友评论