注:文章中使用的dubbo源码版本为2.5.4
零、文章目录
- Consumer发送请求
- Provider接收请求并发送响应
- Consumer接收响应
一、Consumer发送请求
1.1 代码入口
- 在 dubbo剖析:二 服务引用 中讲到,服务引用方根据引用接口
DemoService
,使用dubbo的代理工厂类JavassistProxyFactory.getProxy()
创建出该接口的动态代理对象。 - 当用户想调用
DemoService
的相关方法时,实际是调用了代理对象的相关方法,从InvokerInvocationHandler.invoke()
进入Consumer请求发送流程。
1.2 整体流程
Consumer发送请求流程图- 上图从上往下展示了服务引用方发送一个RPC请求的关键步骤,经历了“代理层”、“集群层”、“过滤监听扩展点”、“调用协议层”、“信息交换层”、“网络传输层”。
- 紫色实线条表示各层关键类的方法调用,蓝色虚线表示关键类的初始化过程。
1)代理执行(InvokerInvocationHandler.invoke):
- 服务引用的过程中,由
ReferenceConfig
使用JavassistProxyFactory
为引用接口创建了代理对象; - 服务引用方调用dubbo代理类
DemoService.sayHello
时,实际执行InvokerInvocationHandler.invoke()
方法,即这是Consumer发送请求的起点; -
InvokerInvocationHandler
内包含一个Invoker
,在JavassistProxyFactory.getProxy()
过程中通过其构造器注入,该Invoker
为一个集群路由功能的AbstractClusterInvoker
;
2)集群容错+负载均衡(AbstractClusterInvoker.invoke):
- 服务引用的过程中,由
RegistryProtocol
使用Cluster.join()
创建集群Invoker
,Cluster
由ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable")
动态生成; - 集群
Invoker
根据负载均衡算法有多种不同实现类(failover、failfast、failsafe、failback),具体使用哪一种由对应的Cluster
实现决定; -
AbstractClusterInvoker
通过Directory.list()
方法获取请求路径对应的Invoker
列表; -
AbstractClusterInvoker
再通过LoadBalance.select()
方法从多个Invoker
中选取一个做本次调用,即负载均衡算法(Random、RoundRobin、LeastActive);
3)Filter链扩展点(ProtocolFilterWrapper + ProtocolListenerWrapper):
- 在
ReferenceConfig
进行服务引用的过程中,通过refProtocol.refer()
创建Invoker
对象; -
refprotocol.refer()
先后经过修饰类ProtocolFilterWrapper
、ProtocolListenerWrapper
,最后执行RegistryProtocol
;ProtocolFilterWrapper
和ProtocolListenerWrapper
就是Dubbo
引入的扩展点; - 扩展点对请求发送和接收的核心功能流程无影响,目的是以插件的方式进行一些辅助功能处理,这里不再进一步展开;
4)调用协议层执行(AbstractInvoker.invoke):
- 经过集群路由和扩展点,现在将直接执行
AbstractInvoker.invoke
方法,开始真正的远程调用了; - 服务引用的过程中,由
RegistryDirectory
使用Protocol.refer()
创建远程执行AbstractInvoker
,Protocol
默认采用default实现,即DubboProtocol
; -
AbstractInvoker
有多种协议的具体实现(dubbo、rmi、hessian、http),具体使用哪一种协议由对应的Protocol
实现决定,默认采用dubbo协议为DubboInvoker
; -
DubboInvoker
中包含了ExchangeClient
的引用,通过DubboInvoker
的构造器注入;
5)交换层执行(ExchangeClient.request):
- 远程执行
Invoker
通过其引用的ExchangeClient.request
完成远程调用请求的发送并得到ResponseFuture
,然后调用ResponseFuture.get()
得到 远程调用结果Result
; - 服务引用的过程中,由
DubboProtocol
使用Exchanger.connect()
创建ExchangeClient
; -
Exchanger
的实现类为HeaderExchanger
,由ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)
动态生成; -
ExchangeClient
在Client
的基础上封装了请求响应模式(其以Request、Response、ResponseFuture为核心,后续单独文章讲解),这也是交换层的核心功能;
6)网络层执行(Client.send):
- 交换层
ExchangeClient.request
封装请求响应模式后,最终依赖网络层Client.send
将请求消息通过网络发送给服务提供方; - 服务引用的过程中,由
HeaderExchanger
使用Transporter.connect()
创建Client
并完成初始连接操作,Client
有多种网络层实现(netty、mina...),具体使用哪一种由对应的Transporter
实现决定; -
Transporter
有多种网络层实现(netty、mina...),由ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
动态生成,默认为NettyTransporter
; - 最后,
NettyClient
使用其包含的底层NettyChannel
完成网络消息发送的功能;
二、Provider接收请求并发送响应
2.1 代码入口
- 在 dubbo剖析:一 服务发布 中讲到,服务提供方通过
NettyServer
完成服务端创建及监听工作。 - 在
NettyServer
的doOpen()
阶段创建了网络事件处理器NettyHandler
,当服务端收到客户端消息时,将触发NettyHandler
的messageReceived()
方法。
2.2 整体流程
接收请求流程图- 上图从上往下表示了服务提供方接收到一个网络请求时的处理步骤,经历了一个
Handler
处理器链,链中的每个Handler
负责实现自己的处理功能。
1)Netty网络事件处理器(NettyHandler):
- 继承自Netty的原生网络时间处理器实现类
SimpleChannelHandler
,定义了网络建连(channelConnected)、断连(channelDisconnected)、消息接收(messageReceived)、异常(exceptionCaught)等事件处理方法; - 维护了
<ip:port, channel>
的对应关系Map<String, Channel>channels
,在网络建连/断连时进行相应put/remove操作,并暴露给NettyServer
使用; - 接收到网络消息时,执行
messageReceived()
方法,将Netty的原生Channel
转换为Dubbo封装的NettyChannel
,并将事件传递给其包含的ChannelHandler
处理;
2)复合消息处理器(MultiMessageHandler):
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
- 处理
MultiMessage
,将其拆分成多个Message
处理;
3)心跳消息处理器(HeartbeatHandler):
- 消息收发时重置当前通道的最新消息收发时间,用于配合
HeaderExchangeServer
和HeaderExchangeClient
中的心跳检测任务HeartBeatTask
; - 拦截并处理心跳请求/响应消息。对心跳请求消息,构建对应的心跳响应消息并通过Channel发送回去;对心跳响应消息,仅记录日志后返回,不做功能上的处理;
4)业务线程转换处理器(AllChannelHandler):
-
Dubbo
通过该处理器完成了 IO线程 与 业务线程 的解耦! - 内部封装了业务线程池,默认使用
FixedThreadPool
;
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
注意点:
a)线程池默认业务线程数为200
b)队列默认采用SynchronousQueue
- 将接收到的网络消息事件封装成可执行任务
ChannelEventRunnable
,交由业务线程池处理;
5)业务解码处理器(DecodeHandler):
- 进行业务请求响应的解码工作;
- 对
Request
和Response
中携带的消息体或结果体,如果其实现了Decodeable
接口,则进行一次解码处理;
6)交换层请求响应处理器(HeaderExchangeHandler):
- 交换层真正完成请求响应收发功能的处理器!
- 将网络层
Channel
转换为交换层ExchangeChannel
,为其增加了请求响应方法request()
; - 判断收到的网络消息类型,根据类型分别执行不同的处理逻辑;
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
//case a: 请求响应模型的请求处理
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
}
//case b: 单向消息接收的处理
else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//case c: 请求响应模型的响应处理
handleResponse(channel, (Response) message);
}
a)请求响应模型的Request消息:调用ExchangeHandlerAdapter.reply()
获取执行结果Result
-->
将本地执行结果Result
封装成RPC响应Response
--> 通过channel.send()
发送RPC响应;
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
Object msg = req.getData();
try {
// 调用```ExchangeHandlerAdapter.reply()```获取执行结果```Result```
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
//将本地执行结果```Result```封装成RPC响应```Response```
return res;
}
b)单向请求消息的处理:调用ExchangeHandlerAdapter.received()
处理请求消息,如果该消息是Invocation
则执行reply()
逻辑但不主动发送RPC响应Response
;
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
c)请求响应模型的Response消息:调用DefaultFuture.received()
处理响应消息。
...注:请求响应模型(Request
,Response
,DufaultFuture
)相关后续专门分析,此处不展开...
7)真正本地实现类方法的执行(ExchangeHandlerAdapter):
-
ExchangeHandlerAdapter
由DubboProtocol
创建,并实现了reply()
方法; -
reply()
方法,实际通过RPC调用参数Invocation
从DubboProtocol.exporterMap
中获取到对应的本地实现DubboExporter
--> 进而获取到对应的本地执行AbstractProxyInvoker
--> 最终通过AbstractProxyInvoker.invoke()
方法,以反射的方式执行真正实现类的对应方法,完成RPC请求。
三、Consumer接收响应
整体流程与 “Provider接收请求” 一样,唯一的区别是在 交换层请求响应处理器(HeaderExchangeHandler
)步骤中会执行 “分支c:请求响应模型的Response消息”,将Response
交由DefaultFuture
处理。
网友评论
使用开发者头条 App 搜索 322723 即可订阅《源代码剖析之夜》