dubbo服务调用,就是服务消费者发起调用服务提供者接口的过程。是对前面服务导出,服务引用,服务集群、路由和负载均衡的综合使用。这个过程包含了:
- 消费者端使用生成的动态代理类发起接口调用
- 消费端进行服务降级
- 服务集群、路由和负载均衡
- 过滤器链进行接口调用请求处理
- 编解码和序列化
- 线程派发
- 请求响应
- 消费端接受请求响应
接下来跟着几个问题来进行dubbo服务调用的学习。
1、同步和异步调用如何实现?
在dubbo中服务调用方式分为同步调用和异步调用,异步调用又分为异步有结果调用和异步无结果调用。异步无结果调用就类似消息通知功能。下面图片展示了dubbo在服务端调用过程中的流程图示:
服务调用端调用流程
下面在直接看下DubboInvoker:
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//TODO 是否异步调用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
//TODO 是否异步无结果调用
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//TODO 接口调用超时时间
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
//TODO 表示是否需要等待请求数据从TCP缓冲区发送出去
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//TODO 异步调用将返回的ResponseFuture封装给用户线程,由用户线程进行get阻塞
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
//TODO 直接在返回的ResponseFuture中调用get,这个时候是阻塞的dubbo线程以及用户线程
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
可以看到在dubbo中,服务调用细分有3种方式,同步、异步有结果和异步无结果这3种。
- 同步:currentClient.request返回DefaultFuture,直接在dubbo线程中发起get调用,这个时候会阻塞dubbo线程,同时用户线程也会被阻塞住。
- 异步有结果:同样currentClient.request返回DefaultFuture,这个时候会把这个返回的future封装成FutureAdapter设置到用户调用线程中,然后返回一个空的RpcResult,用户线程可以拿到Future,在用户自己线程中调用get方法获取接口返回结果。
- 异步无结果:异步无结果调用返回用户线程的future是null,并且结果也是空的。异步无结果调用又被分为是否等待调用请求数据从缓冲区中发送出去。
可以看到同步和异步调用方式的区别,就是同步会在dubbo框架中等待结果的返回,异步是将Future返回给调用的用户线程,由用户线程自己去等待获取结果。
请求的发送和编码?
在dubbo中,接口调用端发起调用,最终还是需要网络通信层进行请求数据的发送,在dubbo中默认使用的是Netty。通过以下图示可以看出请求发送数据流转的过程:
接口调用端请求数据流转图示
DubboInvoker -> ReferenceCountExchangeClient -> HeaderExchangeClient -> HeaderExchangeChannel -> NettyChannel -> NettyHandler -> InternalEncoder -> DubboCountCodec -> ExchangeCodec,在网络通信层采用Netty进行数据交换。
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
//TODO 通过channel将数据写出去,这个时候用到了netty
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
在NettyChannel中,调用channel进行write
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
//TODO 这个是解码器
pipeline.addLast("decoder", adapter.getDecoder());
//TODO 这个是编码器
pipeline.addLast("encoder", adapter.getEncoder());
//TODO 这个是自定义处理器
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
在Netty中调用channel.write方法的时候,在数据写出之前,会在ChannelPipleline中进行流转,这个时候会经过pipeline中所有的handler进行处理。写出数据先是经过NettyHandler进行处理,然后再就是进过Encoder进行编码处理。
public class ExchangeCodec extends TelnetCodec {
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//TODO 对请求数据进行编码
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
//TODO 对结果数据进行编码
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
}
这里可以看到ExchangeCodec可以对请求和结果数据编码,采用的是长度编码器,编码后的数据如下:
dubbo请求数据格式
可以看到dubbo中的请求数据经过序列化和编码后,通过netty发送出去,发送给服务提供者。序列化默认采用的是Hessian,然后编码器采用的是长度编码器,整个请求数据长度会写入到请求数据的特定位置。
请求如何接收?
在dubbo中通信层默认采用的是Netty,所以请求过来的数据是Netty层先捕捉到对应数据,然后是先经过解码器然后是交给NettyHandler进行处理。
InternalDecoder -> DubboCountCodec -> ExchangeCodec -> NettyHandler -> MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler
调用请求数据过来之后,会被解码,然后经过Dispatcher交给具体的线程池执行对应接口的调用。在dubbo(2.6.4)中有4中线程派发模型,默认是AllChannelHandler:
策略 | 用途 |
---|---|
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等 |
message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
这里说的IO线程是指通信框架层中接收请求的线程,然后线程池是指dubbo中的线程池,dubbo中的线程池有4种,默认是FixedThreadPool:
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
dubbo中的线程池大小默认是200个线程,可以自己配置线程池大小,然后阻塞队列采用的是LinkedBlockingQueue,大小也可以配置。
网友评论