Dubbo framework示意图

整个调用过程
(1)protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
(2)exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
(3)transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
请求过程
首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。

Javassist 框架为服务接口生成动态代理类

服务消费端
就是上图中服务消费端的 proxy,用户代码通过这个 proxy 调用其对应的 Invoker 。DubboInvoker、 HessianRpcInvoker、 InjvmInvoker、 RmiInvoker、 WebServiceInvoker 中的任何一个
public class DemoClientAction {
private DemoService demoService;
public void setDemoService(DemoService demoService) {
this.demoService = demoService;
}
public void start() {
String hello = demoService.sayHello("world" + i);
}
}
Javassist 框架为服务接口生成动态代理类
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
// 方法数组
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
public proxy0() {
}
public String sayHello(String string) {
// 将参数存储到 Object 数组中
Object[] arrobject = new Object[]{string};
// 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
Object object = this.handler.invoke(this, methods[0], arrobject);
// 返回调用结果
return (String)object;
}
/** 回声测试方法 */
public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}
InvokerInvocationHandler
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker; //MockClusterInvoker 内部封装了服务降级逻辑
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
客户端调用栈
proxy0#sayHello(String)
—> InvokerInvocationHandler#invoke(Object, Method, Object[])
—> MockClusterInvoker#invoke(Invocation) //失败后的降级处理
—> AbstractClusterInvoker#invoke(Invocation) //负载均衡等操作均会在此阶段被执行
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) //集群的容错处理
—> Filter#invoke(Invoker, Invocation)
—> ListenerInvokerWrapper#invoke(Invocation)
—> AbstractInvoker#invoke(Invocation)
—> DubboInvoker#doInvoke(Invocation)
—> ReferenceCountExchangeClient#request(Object, int)
—> HeaderExchangeClient#request(Object, int)
—> HeaderExchangeChannel#request(Object, int)
—> AbstractPeer#send(Object)
—> AbstractClient#send(Object, boolean)
—> NettyChannel#send(Object, boolean)
—> NioClientSocketChannel#write(Object)
AbstractInvoker 具体实现
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service ...");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 设置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 设置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 变量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 设置异步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子类实现
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}
DubboInvoker 具体实现
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 和 version 到 attachment 中
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 从 clients 数组中获取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 获取异步配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// isOneway 为 true,表示“单向”通信
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 异步无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 发送请求
currentClient.send(inv, isSent);
// 设置上下文中的 future 字段为 null
RpcContext.getContext().setFuture(null);
// 返回一个空的 RpcResult
return new RpcResult();
}
// 异步有返回值
else if (isAsync) {
// 发送请求,并得到一个 ResponseFuture 实例
ResponseFuture future = currentClient.request(inv, timeout);
// 设置 future 到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 暂时返回一个空结果
return new RpcResult();
}
// 同步调用
else {
RpcContext.getContext().setFuture(null);
// 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(..., "Invoke remote method timeout....");
} catch (RemotingException e) {
throw new RpcException(..., "Failed to invoke remote method: ...");
}
}
// 省略其他方法
}
ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。HeaderExchangeClient 封装了一些关于心跳检测的逻辑
HeaderExchangeChannel定义request 并且send
final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
// 这里的 channel 指向的是 NettyClient
this.channel = channel;
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(..., "Failed to send request ...);
}
// 创建 Request 对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
// 设置双向通信标志为 true
req.setTwoWay(true);
// 这里的 request 变量类型为 RpcInvocation
req.setData(request);
// 创建 DefaultFuture 对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 调用 NettyClient 的 send 方法发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
// 返回 DefaultFuture 对象
return future;
}
}

客户端encodeRequest
public class ExchangeCodec extends TelnetCodec {
// 消息头长度
protected static final int HEADER_LENGTH = 16;
// 魔数内容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 对 Request 对象进行编码
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 对 Response 对象进行编码,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 创建消息头字节数组,长度为 16
byte[] header = new byte[HEADER_LENGTH];
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// 设置数据包类型(Request/Response)和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 设置通信方式(单向/双向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 设置事件标识
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// 获取 buffer 当前的写位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 创建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 对事件数据进行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 对请求数据进行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header);
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}
服务端调用栈
public class DemoServiceImpl implements DemoService {
public String sayHello(String name) throws RemoteException {
return "Hello " + name;
}
}
上面这个类会被封装成为一个 AbstractProxyInvoker 实例,并新生成一个 Exporter 实例。
这样当网络通讯层收到一个请求后,会找到对应的 Exporter 实例,并调用它所对应的 AbstractProxyInvoker 实例,从而真正调用了服务提供者的代码。
Dubbo 里还有一些其他的 Invoker 类,但上面两种是最重要的。
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
—> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
服务端 decode
public class ExchangeCodec extends TelnetCodec {
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
// 创建消息头字节数组
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 读取消息头数据
buffer.readBytes(header);
// 调用重载方法进行后续解码工作
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 检查魔数是否相等
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通过 telnet 命令行发送的数据包不包含消息头,所以这里
// 调用 TelnetCodec 的 decode 方法对数据包进行解码
return super.decode(channel, buffer, readable, header);
}
// 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 从消息头中获取消息体长度
int len = Bytes.bytes2int(header, 12);
// 检测消息体长度是否超出限制,超出则抛出异常
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 检测可读的字节数是否小于实际的字节数
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 继续进行解码工作
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler

ExecutorService cexecutor = getExecutorService();
try {
// 将连接事件派发到线程池中处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException(..., " error when process connected event .", t);
}
ChannelEventRunnable
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
网友评论