今天我们要介绍的是Dubbo消息派发的时候,使用的线程模型,Dubbo版本2.8.4。那么什么是Dubbo的消息派发呢?比如,Dubbo提供者收到消费者的请求之后,要将这个请求派发给哪个线程来处理。这个派发给某个线程的规则,就是我们今天要讨论的消息派发的线程模型。
请求流程
大家请看下图,下图来自Dubbo官方文档,线程模型:

我们解释一下:
-
消费者的代理类发起请求
-
请求经过编码和序列化,然后经由Netty(或者Mina等)发送给提供者
-
提供者收到了请求,然后解码和反序列化等
-
根据Dubbo配置,选择相应的线程,处理业务逻辑
这里的第四步,就是我们今天要讨论的内容,Dubbo有哪些处理业务的线程模型以及如何选择。
消息分类
首先,我们讨论下Dubbo有哪些消息,大体上可以分为两类:非业务消息、业务消息。
-
非业务消息包括:建立连接,断开连接,心跳
-
业务消息包括:请求,响应,异常
这些消息我们可以在com.alibaba.dubbo.remoting.ChannelHandler中找到,不包括心跳。
public interface ChannelHandler {
void connected(Channel channel) throws RemotingException;
void disconnected(Channel channel) throws RemotingException;
void sent(Channel channel, Object message) throws RemotingException;
void received(Channel channel, Object message) throws RemotingException;
void caught(Channel channel, Throwable exception) throws RemotingException;
}
线程池
针对两种分类的消息,我们的线程也可以分为两类:Netty的线程池、业务线程池
-
Netty(Mina等)的线程池:就是底层网络框架用于处理网络传输的线程池,网络消息编码解码,发送网络数据等
-
业务线程池:这个线程池是Dubbo框架创建的,用于处理除了网络传输之外,Dubbo相关的业务。
消息我们都理清楚了,线程池也都知道了,那么,哪些消息应该派发到哪个线程池,这个规则,就是今天的主角。
消息处理线程模型
Dubbo有五种处理的线程模型,
-
all
:所有的消息都派发到业务线程池,包括请求,响应,连接事件,断开事件,心跳等。 -
direct
:所有消息都不派发到业务线程池,全部在Netty(或者Mina等)线程上直接执行。 -
message
:只有请求响应等业务消息派发到业务线程池,其它连接断开事件,心跳等消息,直接在Netty(或者Mina等)线程上执行。 -
execution
:只有请求消息派发到Netty线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在业务线程上执行。 -
connection
: 将连接断开事件放入队列,由一个只有1个线程的其他线程池处理,有序逐个执行,其它消息派发到业务线程池。
源码分析
派发接口是一个SPI拓展接口,com.alibaba.dubbo.remoting.Dispatcher,这个拓展接口中默认的线程模型是all
。
@SPI(AllDispatcher.NAME)
public interface Dispatcher {
/**
* dispatch the message to threadpool.
*
* @param handler
* @param url
* @return channel handler
*/
@Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 后两个参数为兼容旧配置
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
如果某个Dubbo服务接口单独配置了线程模型,那么会从URL中,按照dispatcher
,dispather
,channel.handler
的key值,找对应的线程模型来处理。后面的两个参数是为了兼容老版本Dubbo,dispather
就完全是在老版Dubbo中,这个单词拼错了,后来在新版本中改过来了,就必须要兼容这个错误的配置。
OK,这个接口有5种实现,就是对应上面的5中线程模型:

这5种Dispatcher分别有对应的5种ChannelHandler,逻辑都在handler中,如下图

以AllChannelHandler为例,里面就是对应的connected,disconnected,received,caught的实现,非常的简单,就是把任务提交到线程池里。

每个任务都是ChannelEventRunnable类,实现了Runnable接口,线程会执行它的run方法。在run方法中,是一个switch,根据消息类型,调用handler对应的方法。我们以received为例:
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
received调用了reply方法,我们继续看看reply方法:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
reply中就是查找invoker,调用业务逻辑了,后面就不是本篇的重点了。
网友评论