前言
上一篇中Exchange层根据服务提供方的协议类型来初始化一个Client负责和Server端的通信。Client将负责和Server端建立连接,并定时发送心跳来维护连接状态。同时,当Invoker的请求时,将要请求的接口及参数封装成Request通过ExchangeChannel
发送出去,同时注册一个ChannelHandler
来异步的接收Response。
Exchange层通过构建并缓存发送出去的Request来和收到的异步Response做匹配,从而让Invoker能拿到正确的结果。而其它连接和网络传输通过调用底层的Transporter层来实现的。Transporter层的作用就是对网络框架的抽象,这一层只负责数据传输,而不关心传输数据的结构,通过插件的形式来把对象转成二进制数据发送出去,同时对接收的数据做解码和反序列化。
Exchange层通过自己使用的协议来选择传输层,Dubbo协议默认使用的传输层框架是netty4。如果是webService,自然选择http。
建立连接
回顾下上一篇中Dubbo协议的ExchangeClient的初始化:
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
上面的代码中connect动作实际上是通过Transporters.connect()
方法来连接的Server端。Transporters是一个工具类,最终是使用的具体Transporter实现类的connect()
方法。
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
对于Dubbo协议来说,默认的Transporter是NettyTransporter
,使用netty4作为底层框架。来看下它的实现:
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
//开启Server监听
@Override
public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
//建立Client连接
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
最终的connect()
封装在了NettyClient中。
NettyClient初始化
下面是NettyClient的构造函数:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
封装handler
构造函数中将传入的handler又包装了两层,变成了MultiMessageHandler->HeartbeatHandler->handler
一条链条。MultiMessageHandler
的作用是当一个数据包过来时里面包含多个Message的时候,会遍历message然后使用单条message回调后续的handler,也就是说它之后的handler只支持一次处理单条message。HeartbeatHandler
这个看名字就知道了,是处理心跳消息的,它在收到heartbean请求后回复response,同时在收到response后就直接return,不再把消息继续传给后面的handler。
这样在加上上一篇中Exchange层对handler的封装,总的handler链就变成了MultiMessageHandler->HeartbeatHandler->DecodeHandler->HeaderExchangeHandler->DubboExchangeHandler
。
建立连接
对Handler做了封装之后直接调用的父类的构造函数,即AbstractClient
:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 这个参数是设置,在发送请求时如果链接已断开,是否重连
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
//初始化一个executor,NettyClient未用到
initExecutor(url);
//初始化,子类实现
try {
doOpen();
} catch (Throwable t) {
//log error
}
try {
// 连接,判断如果没连接,则调用子类的doConnect()方法.
connect();
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
//log error
}
} catch (Throwable t) {
//错误处理
}
}
上面的抽象类只实现了公共逻辑,比如参数获取和判断是否已连接,主要的初始化连接和connect都是具体实现类做的,NettyClient中的这两个方法实现如下:
@Override
protected void doOpen() throws Throwable {
//将dubbo handler适配成netty的handler
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
//初始化Netty Bootstrap
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
//将Dubbo的Codec实现适配成Netty的codec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
//构建netty pipeline
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
doOpen()方法主要就是netty的初始化操作,这里对netty的使用就不详细说了,网上很多。这里面最主要就是pipeline的构建,netty对于网络数据接收和发送的自定义处理抽象为一个handler的链,即pipeline。这里面的handler可以选择在收到数据时发挥作用(netty称为InboundHandler,需实现read操作),也可以选择在发送数据时发挥作用(netty称为OutboundHandler,需实现write操作),当然也可以在发送和接收时都参与处理。当接收到网络上的数据时,调用handler的方向是从左至右,即先经过第一个加入的handler。反之,程序发送数据到网络上时,调用handler的方向是从右往左,即先经过最后一个加入的handler。
上面Dubbo加入的Netty Handler的作用下面讲,先看看doConnect()方法:
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
//异步连接
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
//等待连接返回结果
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// 连接成功尝试移除之前的连接
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
//并发控制,如果Client被关闭,则移除自己
if (NettyClient.this.isClosed()) {
try {
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
//连接失败处理
} else {
//连接失败处理
}
} finally {
}
}
上面的方法中,除了直接调用netty的connect方法建立连接外,就是将建立好的连接缓存,同时做了并发控制。
请求发送
上面分解了从Transporter中获取Client的过程,Dubbo协议默认会采用netty4做为传输层的框架,并基于TCP协议来发送数据。上一篇的Exchanger层在发送请求时,会将所有参数封装成一个Request,这个Request随后会通过Client发送出去,而在这个对象在进入NettyClient后,会经过所有Netty的OutboundHandler
。
Netty Handlers
回顾下上面的Netty初始化过程
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
这里面4个Handler中,发送请求时按如下顺序调用:
- NettyClientHandler :这个handler的作用是,如果在后续的handler处理逻辑中发生错误,会直接回复调用方一个error response,注意这个并不是网络错误或者Server端返回的错误。
- IdleStateHandler:这个是netty提供的handler,作用是在长时间无数据流入流出时触发一个事件告诉Client。这个事件是由
NettyClientHandler
来处理的,它会发送一个心跳。理论上如果心跳正常,不会发生这个Event。 - MessageToByteEncoder:这个是由adapter.getEncoder()返回的,作用是把Request按照Dubbo协议进行编码。下面着重看下这个Encoder的实现。
组装协议包
上面的Encoder通过NettyCodecAdapter
获取到的,这个类在构造的时候传入了一个Codec2类型的参数,这个Codec2就是协议实现。adapter.getEncoder()返回的结果只是对这个的封装。
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
org.apache.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
codec.encode(channel, buffer, msg);
}
}
上面的方法就是将Codec2的实现类封装成一个Netty的MessageToByteEncoder
的实现,最终调用的是codec2.encode()
方法。对于Dubbo协议,实现类就是 DubboCodec
,它的继承关系如下:
![](https://img.haomeiwen.com/i13282795/1a868e8392a2f337.png)
当Codec2的
encode()
方法被调用的时候,首先调用的是ExchangeCodec
的encode()
方法。
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
//获取序列化实现
Serialization serialization = getSerialization(channel);
// 协议头.
byte[] header = new byte[HEADER_LENGTH];
// 设置Magic number.
Bytes.short2bytes(MAGIC, header);
//设置序列化标记位
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// 序列化请求并设置payload
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
//序列化request.Data写入buffer
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 设置协议头中的len属性
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
上面的编码过程首先获取对象序列化实现,然后按照协议要求组装header和body,Dubbo默认使用的序列化实现是hessian2。这里需要注意的是Dubbo支持多种传输协议并不是整个数据包的外层协议,而是对Request.data
进行编码的协议,即DubboCodec仅实现了上面代码中调用的encodeRequestData()
方法。
请求响应接收
请求发出后,正常情况Server会返回Response,而Response的数据首先进入的还是netty的pipeline,只是调用Handler的顺序变成的反方向,按照调用顺序分别是:
- ByteToMessageDecoder:由
adapter.getDecoder()
返回,负责把二进制数据转换成java对象 - IdleStateHandler : 数据流入和流出都会经过
- NettyClientHandler : 在接收数据时,这个handler的作用是在收到Response后回调Dubbo的Handler,将数据交给后续handler处理
协议包解析
协议包解码跟编码动作刚好相反,ExchangeCodec
读到数据后分割成header和body,然后使用Serialization
反序列化body中的对象,交给DubboCodec
从中解析出Response。这里代码就不贴了。
Callback实现
异步编程中,callback是经常用到的结果获取方式,做为一个负责任的框架,Dubbo自然也要提供对Callback的支持。
Callback方法的声明
Dubbo中如果有个接口的方法是Callback方法,需要在url中声明第几个参数是Callback回调接口。如下接口:
public interface DemoService {
void sayHello(String to, Callback resp);
}
sayHello
方法的第2参数用来接收返回结果,则url中需要添加参数sayHello.1.callback=true
。
Callback方法的实现
Dubbo对Callback的实现非常巧妙。当调用非Callback方法时,Provider端暴露服务,Consumer端生成一个代理,通过Invoker发送请求,Provider端收到请求回复一个Response。
当调用的是Callback方法时,Consumer端发送请求的同时暴露一个回调参数的服务,这样Provider返回结果的方式就变成了调用Consumer暴露的这个服务,也就是返回结果时Provider变成了Consumer。很好了复用了Dubbo本身的逻辑。
下面看下请求发送的特殊处理,对Callback参数的处理是在DubboCodec中:
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
out.writeUTF(inv.getAttachment(PATH_KEY));
out.writeUTF(inv.getAttachment(VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(inv.getParameterTypesDesc());
Object[] args = inv.getArguments();
if (args != null) {
for (int i = 0; i < args.length; i++) {
//写入调用参数
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
}
out.writeAttachments(inv.getObjectAttachments());
}
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException {
// get URL directly
URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
//判断是否为Callback方法
byte callbackStatus = isCallBack(url, inv.getMethodName(), paraIndex);
Object[] args = inv.getArguments();
Class<?>[] pts = inv.getParameterTypes();
switch (callbackStatus) {
//如果是Callback方法,则暴露服务并且将instanceid放入attachment中带到Server端
case CallbackServiceCodec.CALLBACK_CREATE:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
return null;
case CallbackServiceCodec.CALLBACK_DESTROY:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrUnexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
return null;
default:
return args[paraIndex];
}
}
上面的逻辑中,DubboCodec在对调用参数进行编码时,判断出是回调方法,则将调用传入的参数暴露成一个服务。暴露的过程跟暴露一个正常的服务没有太大的区别,后面讲服务提供方的时候再详细解析。
服务提供方在收到请求后,也是在进入DubboCodec进行解码的时候识别出是Callback方法,就会生成参数的代理来调用Provider端的本地方法。本地方法在处理完,调用Callback参数返回结果时,实际是调用Proxy,进而发出远程调用。
首先看下DubboCodec中的decode过程:
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 如果是Response
...
} else {
// 如果是Request
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
if (req.isEvent()) {
//如果是Event
...
} else {
//处理Consumer的Request
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
//错误处理
}
return req;
}
}
服务提供方的DubboCodec在收到Request后,生成DecodeableRpcInvocation
对象,然后调用它的decode()
方法。最终会进入decodeInvocationArgument()
方法:
public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException {
// 获取服务端exporter的url
URL url = null;
try {
url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl();
} catch (RemotingException e) {
...
return inObject;
}
//判断是否是回调方法
byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
switch (callbackstatus) {
//如果是回调,生成回调参数的代理
case CallbackServiceCodec.CALLBACK_CREATE:
try {
return referOrDestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new IOException(StringUtils.toString(e));
}
case CallbackServiceCodec.CALLBACK_DESTROY:
try {
return referOrDestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false);
} catch (Exception e) {
throw new IOException(StringUtils.toString(e));
}
default:
//不是回调,直接返回原参数
return inObject;
}
}
上面的代码中,服务提供端在decode参数时,发现时回调参数,则会返回一个远程调用的代理,生成的过程和Consumer端生成的逻辑时一样的。
private static Object referOrDestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) {
Object proxy;
String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
proxy = channel.getAttribute(proxyCacheKey);
String countkey = getServerSideCountKey(channel, clazz.getName());
if (isRefer) {
if (proxy == null) {
//生成回调的url
URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + INTERFACE_KEY + "=" + clazz.getName());
referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(METHODS_KEY);
if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) {
//生成回调的Invoker
@SuppressWarnings("rawtypes")
Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid));
//生成回调的Proxy
proxy = PROXY_FACTORY.getProxy(new AsyncToSyncInvoker<>(invoker));
channel.setAttribute(proxyCacheKey, proxy);
channel.setAttribute(invokerCacheKey, invoker);
increaseInstanceCount(channel, countkey);
//convert error fail fast .
//ignore concurrent problem.
Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(CHANNEL_CALLBACK_KEY);
if (callbackInvokers == null) {
callbackInvokers = new ConcurrentHashSet<>(1);
callbackInvokers.add(invoker);
channel.setAttribute(CHANNEL_CALLBACK_KEY, callbackInvokers);
}
}
}
} else {
//Consumer端回调结束,销毁暴露的回调接口
...
}
return proxy;
}
总结
消费端的发送请求的整个过程到这篇就讲完了,当然这只是最基本的逻辑。在这个逻辑基础上有很多细节的处理为了简单都已经省掉了,后面服务提供端也讲完后会从整体上讲一下。下一篇开始解析Dubbo提供方服务暴露,以及处理一个请求然后响应的过程。
网友评论