开头
上一节讲到了服务的导出,即服务端如何将自己的接口提供成dubbo服务的过程,这一节就是讲服务的调用了,消费端是如何调用服务端的接口的呢?
主要流程
1.spring启动时,会给@Reference注解的属性赋值,赋值的时候会调用referenceBean.get方法
2.准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的
3.在注册中心初始化服务目录RegistryDirectory
4.将消费端信息注册到zk
5.构造路由链、服务订阅
6.根据服务目录得到最终的invoker对象MockClusterInvoker
8.最终调用MockClusterInvoker.invoke方法执行请求发送数据,里面调用了netty.send方法
9.通过netty channel,执行nettyServerHandler方法处理请求和结果返回
源码流程
流程图地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184
1.程序入口
在spring启动的时候,会对@Reference注解的属性赋值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
可以看到,最终调用了 referenceBean.get()方法,这个方法最后返回了一个ref对象,这个ref对象看到最后就是一个Invoke代理对象,也就是主要流程的第二步,准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
}
private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
wrapInvocationHandler(referenceBeanName, referenceBean));
} else { // ReferenceBean should be initialized and get immediately
// 这里
return referenceBean.get();
}
}
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
// 入口
init();
}
return ref; // Invoke代理
}
2.准备初始化invoker对象,MockClusterInvoker
由init()->createProxy(map),这个方法太长了,留了三个主要的方法:
1.加载注册中心url地址
- invoker = REF_PROTOCOL.refer调用registry.refer,这里又是spi机制,最终调用了registryProtocol.refer方法
private T createProxy(Map<String, String> map) {
List<URL> us = loadRegistries(false);
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
}
3.在注册中心初始化服务目录RegistryDirectory
留下了主要代码,可以看到这里初始化了一个注册目录,也就是我们最终在zk上看到的consumers节点文件夹。
registry.register(directory.getRegisteredConsumerUrl());这里最终会调用ZookeeperRegistry.doRegister方法,用zk客户端向zk服务端创建节点,将消费端信息注册到zk,可以看到这里创建的是临时节点
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
registry.register(directory.getRegisteredConsumerUrl());
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
return invoker;
}
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
4.构造路由链、服务订阅
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY
3.生成最终的invoker对象MockClusterInvoker
Invoker invoker = cluster.join(directory);
这里又是SPI机制,由于Cluster有一个包装类,所以会先调用MockClusterWrapper.join方法,原理可参照我之前单独写的一节SPI源码分析
可以看到,这里最终生成MockClusterInvoker
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
4.服务调用
第3步骤中生成了一个MockClusterInvoker对象,所以最终调用服务的方法实际上就是调用MockClusterInvoker.invoke方法,会依次调用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
}
return result;
}
我们直接看DubboInvoker.doInvoke方法
1.首先会拿到一个 ExchangeClient客户端
2.异步请求currentClient.request,最终调用HeaderExchangeChannel.request->调用netty的方法channel.send
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
return asyncRpcResult;
}
}
}
5.服务请求处理
由于使用的netty通信,所有客户端发送消息后,netty服务端会在NettyServerHandler.channelRead中接到消息,这里调用了很多handler,就不展开看了。
1.MultiMessageHandler
2.HeartbeatHandler
3.AllChannelHandler
4.DecodeHandler
5.HeaderExchangeHandler
6.ExchangeHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
总结
服务的引入的目的就是在消费端@Reference标注一个服务端接口,这个注解会去将消费端消息注册到zk,最终会生成一个调用服务端的代理对象invoker,消费端调用服务端接口的时候最后调用的就是invoker.invoke方法,而这个方法采用的通信框架是netty,实现了远程调用。
dubbo源码写的很好,比如里面的SPI机制运用的很巧妙,还有一些抽象工厂设计模式等,源码值得品读。
网友评论