String hello = demoService.sayHello("world");
一、服务通信简图
image.png
- consumer
- proxy0(demoService)调用其属性 InvocationHandler
- InvocationHandler 调用其属性 MockClusterInvoker
- MockClusterInvoker 调用其属性 FailoverClusterInvoker
- FailoverClusterInvoker 调用其属性 RegistryDirectory,从中根据 method 获取 List<DubboInvoker(filtered)>
- RegistryDirectory 调用其 List<Router> 对 List<DubboInvoker(filtered) 进行过滤
- FailoverClusterInvoker 获取 LoadBalancer,从 5 的结果获取一个 DubboInvoker(filtered)
- 执行 DubboInvoker(filtered) 的过滤链,最后执行 DubboInvoker,发起 Netty 调用
- provider
- 根据请求参数 RpcInvocation(获取 interface、group、version) 和通信通道 Channel(获取 port) 组装 serviceKey,根据 serviceKey 获取 DubboExporter
- 从 DubboExporter 的取出其属性 AbstractProxyInvoker(filtered)
- 执行 AbstractProxyInvoker(filtered) 的过滤链,最后执行 AbstractProxyInvoker
- AbstractProxyInvoker 调用 Wrapper,Wrapper 调用 DemoServiceImpl
注意
image.png
通过 第7章 Dubbo 服务暴露流程的设计与实现 第8章 Dubbo 服务引用流程的设计与实现 这两章的分析,我们发现在 Dubbo 中存在四类 Invoker,而 Invoker 也是 Dubbo 中对调用逻辑进行封装的一个模型体:
AbstractProxyInvoker
:服务端,提供了对具体实现(eg. DemoServiceImpl)的调用封装;DubboInvoker
:客户端,封装了 NettyClient,进行远程调用的发起;- 具体的
XxxClusterInvoker
:客户端,将多个Invoker伪装成一个集群版的Invoker;InvokerWrapper 包装类
:eg.MockClusterInvoker
,应用于客户端,是具体的XxxClusterInvoker
的包装类,提供 mock操作。
二、服务通信源码梯形图
服务通信分为:客户端发出请求;服务端接收请求并返回响应;客户端接收响应。
2.1 客户端发出请求
//代理发出请求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
-->new RpcInvocation(method, args)
-->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
//ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
-->AbstractClusterInvoker.invoke(Invocation invocation)
//获取Invokers
-->list(Invocation invocation)
-->AbstractDirectory.list(Invocation invocation)
-->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
-->MockInvokersSelector.getNormalInvokers(List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
//获取负载均衡器
-->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
-->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
-->FailoverClusterInvoker.doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
//使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
-->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
-->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
//执行listener和filter链
-->ListenerInvokerWrapper.invoke
-->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
-->FutureFilter.invoke(Invocation invocation)
-->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
-->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
-->DubboInvoker.doInvoke(Invocation invocation)
//获取ExchangeClient进行消息的发送
-->ReferenceCountExchangeClient.request(Object request, int timeout)
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
-->AbstractClient.send(Object message, boolean sent)//NettyClient的父类
-->getChannel()//NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
-->NettyChannel.send(Object message, boolean sent)
-->NioClientSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
总体流程(默认情况下):
- 代理
proxy0
(demoService)调用InvokerInvocationHandler
执行 sayHelloInvokerInvocationHandler
先将请求信息(methodName,parameterTypes,arguments)封装成 RpcInvocation 对象,之后调用MockClusterInvoker
#invokeMockClusterInvoker
根据是否配置了mock信息(服务降级信息)决定走mock逻辑还是正常逻辑(关于服务降级的详细源码解析,见 http://www.cnblogs.com/java-zhao/p/8320519.html,这里只走正常逻辑)MockClusterInvoker
调用FailoverClusterInvoker
#invokeFailoverClusterInvoker
先从RegistryDirectory
中的newMethodInvokerMap
中根据 methodName 获取 InvokerDelegate 实例列表(即可用的 provider 列表)- 使用
Router
对获取到的 InvokerDelegate 实例列表再进行一次选择(这里就可以实现服务的读写分离)- 根据 Dubbo SPI 机制创建负载均衡器(默认是 RandomLoadBalance)
- 使用
RandomLoadBalance
从被 Router 过滤过的 InvokerDelegate 实例列表选择一个实例出来(即从 provider 列表选出一台 provider 机器来)- 之后执行
InvokerDelegate
#invoke 方法:这里首先执行 filter 链,最后执行到DubboInvoker
#doInvoke 方法,在这里首先为 RpcInvocation 添加了新的参数,然后选取了一个ReferenceCountExchangeClient
,向服务端发出了请求。
注意最终的 RpcInvocation
实例包含的属性:
methodName=sayHello
parameterTypes=[class java.lang.String]
arguments=[world]
attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}
2.2 服务端接收请求并返回响应
服务端接收请求消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->Response response = handleRequest(exchangeChannel, request)
-->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message就是上边的RpcInvocation
//首先获取exporter,之后再获取invoker
-->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20880
-->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
-->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
//执行filter链
-->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
-->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
//执行真正的invoker调用
-->AbstractProxyInvoker.invoke(Invocation invocation)
-->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
-->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
-->DemoServiceImpl.sayHello(String name)
-->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
服务端发送响应消息
-->channel.send(response)//NettyChannel
-->NioAcceptedSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Response实例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]
总体流程(默认情况下):
- NettyServer 接收到请求消息后,进行解码,之后交给 provider 端业务线程池进行处理;
- 业务线程调用
DubboProtocol$requestHandler#reply
方法,该方法首先根据请求信息 RpcInvocation 组装 serviceKey,之后根据 serviceKey 从 DubboProtocol 的 exporterMap 中获取指定服务的DubboExporter
(serviceKey 是由 RpcInvocation 中的group/path:version:port
组成,其中 port 来自于 channel)- 之后从
DubboExporter
中获取存储在其内的 InvokerDelegete 实例,之后执行 filter 链,最后执行到AbstractProxyInvoker#invoke
方法,
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
/**
* 真实对象 ref, eg. DemoServiceImpl
*/
private final T proxy;
...
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 子类覆写的真正调用的方法
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
...
}
AbstractProxyInvoker 的实现是在 ProxyFactory 中创建的匿名内部类。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这是在讲 provider 服务暴露时的一段代码,这里实现了 AbstractProxyInvoker#doInvoke 方法,在该方法中又调用了 wrapper#invokeMethod
/**
* @param o 实现类
* @param n 方法名称
* @param p 参数类型
* @param v 参数值
* @return
* @throws java.lang.reflect.InvocationTargetException
*/
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
try {
w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals(n) && p.length == 1) {
return ($w) w.sayHello((java.lang.String) v[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
}
这里最终调到了 DemoServiceImpl#sayHello 方法。最后将响应结果封装成 RpcResult,返回给客户端。
2.3 客户端接收响应
客户端接收响应消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->handleResponse(Channel channel, Response response)
-->DefaultFuture.received(channel, response)
-->doReceived(Response res)//异步转同步
客户端接收到服务端的响应之后,先解码,之后将响应信息交给业务线程池去处理,然后就涉及到 Dubbo 的异步转同步的实现,直接看 Dubbo 线程模型。
三、Dubbo 线程模型
以同步调用为例。
Alt pic
线程池
在provider端,存在三个线程池:
- boss 线程池:Netty 所有,默认只包含一个 NioEventLoop。用于接收客户端的连接 channel,并且之后将 channel 注册到 worker 线程池中的一个 NioEventLoop 上(实际上是注册在 NioEventLoop 所拥有的那个 Selector 上);
- worker 线程池:Netty 所有,在 Dubbo 中默认包含“核数+1”个 NioEventLoop(在 Netty 中默认是2*核数)。worker 线程池中的每一个 NioEventLoop 去阻塞(Selector.select())获取注册在其上的 channel 准备就绪的事件,然后做出相应处理;
- server 线程池:Dubbo 服务端的业务线程池,默认 worker 线程会将解码后的请求消息交由该线程池进行处理。
在consumer端,存在两个线程池:
- worker 线程池:同 provider 的 worker 线程池
- client 线程池:Dubbo 服务端的业务线程池,默认 worker 线程会将解码后的响应消息交由该线程池进行处理。
通信流程
在上一小节中总结了通信流程的源码调用链,这一节从线程模型的角度来看通信流程。(下面以同步调用为例)
- consumer 端用户线程在发出请求之前会先创建一个
DefaultFuture
对象;并将requestID
作为 DefaultFuture 对象的 key 存储在Map<Long, DefaultFuture> FUTURES
中(注意:每一个requestID
是一个请求的唯一标识,最后相应的响应 Response 的responseID
就等于这个requestID
)- 之后调用 Netty 编码并发出请求,然后马上调用 DefaultFuture#get 进行阻塞等待(阻塞等待 response 不为空);
- provider 端 NettyServer 接收到请求后,解码,然后交由 server 线程池进行处理;
- server 线程池处理完成之后,调用 Netty 编码并发送响应消息给 consumer 端;
- consumer 端接收到响应后,解码,然后交给 client 线程池处理,client 线程池从
Map<Long, DefaultFuture> FUTURES
中获取key=responseID
的DefaultFuture
对象,然后将响应消息填充到其 response 属性后,唤醒 consumer 端阻塞的用户线程;- 最后 consumer 得到了响应
网友评论