本系列参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。
Dubbo版本为2.6.1。
文章内容顺序:
本地调用链路以及XML配置
远程调用
- 2.1 远程调用的整体链路
- 2.2 代理类执行
- 2.3 InvokerInvocationHandler#invoke 过滤toString等方法
- 2.4 MockClusterInvoker#invoke Mock逻辑判断
- 2.5 AbstractClusterInvoker#invoke 服务路由
- 2.6 FailoverClusterInvoker#doInvoke 负载均衡
- 2.7 ListenerInvokerWrapper#invoke Invoker和监听器的包装类
- 2.8 Filter链路的invoke 包含多个 Filter 调用
- 2.9 AbstractInvoker#invoke 设置隐式传参、async值
- 2.10 DubboInvoker#doInvoke 开始远程调用逻辑
- 2.11 ReferenceCountExchangeClient 这个是装饰类
- 2.12 HeaderExchangeClient 被装饰类,信息交互客户端
- 2.13 HeaderExchangeChannel 定义Request 对象
- 2.14 AbstractPeer 调用子类的重载方法
- 2.15 AbstractClient 调用指定子类的实现拿到Channel
- 2.16 NettyClient#doConnect 连接服务器
- 2.17 NettyChannel#getOrAddChannel 静态方法拿到真正的Netty的Channel
- 2.18 NettyChannel#send 调用Netty发送消息
1. 本地调用链路以及XML配置
image.png引用芋艿的本地调用的链路图,先忽略橘色框框的注释,这个就是本地调用的链路,
image.png
相比远程调用,实现上会简单很多:因为调用的服务,就在本地进程内,且不存在多个,所以不需要集群容错和网络通信相关的功能。
这里要提一下,我在实际调用debug的时候发现这张图的ProtocolFilterWrapper
和ListerInvokerWrapper
的顺序应该调换一下,不过并不影响理解,后面会介绍到。
注意:这个本地调用指的是Provider
和Consumer
都在一个项目里,只需要启动一个Consumer
就可以了。xml的配置如下
2. 远程调用
2.1远程调用的整体链路
然后我们再来看远程调用,那也就是橘红色的框框锁添加的内容了,下面列出来的也是远程调用的链路,我们就按着这个链路来一步步往下分析。
proxy0#sayHello(String)//代理类执行
—> InvokerInvocationHandler#invoke(Object, Method, Object[])//过滤toString等方法
—> MockClusterInvoker#invoke(Invocation) //Mock逻辑判断
—> AbstractClusterInvoker#invoke(Invocation)//服务路由
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)//负载均衡
—> ListenerInvokerWrapper#invoke(Invocation) //Invoker和监听器的包装类
// —> ProtocolFilterWrapper//这个比较特殊,会在匿名类里调用了Filter,下面会分析
—>Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
—> AbstractInvoker#invoke(Invocation) //设置隐式传参、async值
—> DubboInvoker#doInvoke(Invocation)//开始远程调用逻辑
—> ReferenceCountExchangeClient#request(Object, int)//这个是装饰类
—> HeaderExchangeClient#request(Object, int)//被装饰类,信息交互客户端
—> HeaderExchangeChannel#request(Object, int)//定义Request 对象
—> AbstractPeer#send(Object)//调用子类的重载方法
—> AbstractClient#send(Object, boolean)//调用指定子类的实现拿到Channel
—> NettyChannel#send(Object, boolean)//调用Netty发送消息
—> NioClientSocketChannel#write(Object)//Netty的逻辑
2.2代理类执行
Dubbo 默认使用 Javassist 框架为服务接口生成动态代理类,样例如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
// 方法数组
public static Method[] methods;
private InvocationHandler handler;
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
public proxy0() {
}
public String sayHello(String string) {
// 将参数存储到 Object 数组中
Object[] arrobject = new Object[]{string};
// 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
Object object = this.handler.invoke(this, methods[0], arrobject);
// 返回调用结果
return (String)object;
}
/** 回声测试方法 */
public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
}
首先将运行时参数存储到数组中,然后调用 InvocationHandler 接口实现类的 invoke 方法,得到调用结果,最后将结果转型并返回给调用方。
2.3 InvokerInvocationHandler#invoke 过滤toString等方法
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
InvokerInvocationHandler
中的 invoker
成员变量类型为 MockClusterInvoker
,MockClusterInvoker
内部封装了服务降级逻辑。
2.4MockClusterInvoker#invokeMock 逻辑判断
public class MockClusterInvoker<T> implements Invoker<T> {
private final Invoker<T> invoker;
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 获取 mock 配置值
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
// 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
// 比如 FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// force:xxx 直接执行 mock 逻辑,不发起远程调用
result = doMockInvoke(invocation, null);
} else {
// fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
try {
// 调用其他 Invoker 对象的 invoke 方法
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
// 调用失败,执行 mock 逻辑
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
// 省略其他方法
}
只有mock
配置是force
或者fail
且调用失败的时候才会进入doMockInvoke
的逻辑,这里我们在前文分析过了doMockInvoke
的逻辑,就认为调用没有出错来看,进入下一个invoke
方法。
!!!注意这里的
invker
是FailoverClusterInvoker
,他继承了AbstractClusterInvoker
并且没有重写invoke
方法,实际上我们调用的是FailoverClusterInvoker#invoke
,但是这个方法的代码是在AbstractClusterInvoker
的类文件中。下面出现的一些带AbstractXXX
也是一样的道理,就不再说多遍了。
2.5AbstractClusterInvoker#invoke 服务路由
public Result invoke(final Invocation invocation) throws RpcException {
// 校验是否销毁
checkWhetherDestroyed();
// 获得所有服务提供者 Invoker 集合
List<Invoker<T>> invokers = list(invocation);
// 获得 LoadBalance 对象
LoadBalance loadbalance;
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 设置调用编号,若是异步调用
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行调用
return doInvoke(invocation, invokers, loadbalance);
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;
在AbstractClusterInvoker#invoke
调用了list
方法获得路由后的可用Invoker
集合(在list中会进行服务路由,就不进去分析了。)接着调用子类自己的实现FailoverClusterInvoker#doInvoke
2.6FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
// 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(copyinvokers, invocation);
// 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 保存最后一次调用的异常
// retry loop.
RpcException le = null; // last exception.
// 保存已经调用过的Invoker
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// failover机制核心实现:如果出现调用失败,那么重试其他服务器
for (int i = 0; i < len; i++) {
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
if (i > 0) {
checkWhetherDestroyed();
// 根据Invocation调用信息从Directory中获取所有可用Invoker
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 根据负载均衡机制从copyinvokers中选择一个Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 保存每次调用的Invoker
invoked.add(invoker);
// 设置已经调用的 Invoker 集合到RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// RPC 调用得到 Result
Result result = invoker.invoke(invocation);
// 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 如果是业务性质的异常,不再重试,直接抛出
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他性质的异常统一封装成RpcException
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
throw new RpcException(le.getCode(), "Failed to invoke the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers "
+ providers + " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}
FailoverClusterInvoker
的 doInvoke
方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。
在 for
循环内,首先是通过负载均衡组件选择一个 Invoker
,然后再通过这个 Invoker 的 invoke 方法进行远程调用。
接下来调用的是ListenerInvokerWrapper#invoke
2.7ListenerInvokerWrapper#invoke Invoker和监听器的包装类
这个是在远程引用,调用refer方法的时候,通过
image.pngProtocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper =>RegistrProtocol =>Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
执行链中的第二个ProtocolListenerWrapper
方法中的refer加入进来的监听器和Invoker的包装类
ListenerInvokerWrapper
装饰invoker, 在构造器中遍历listeners构建referer的监听链
而这个包装类里的invoke,则没什么好讲的,直接就调用了Filter链路的invoke
2.8Filter链路的invoke 包含多个 Filter 调用
仍然是调用refer方法的时候,通过
image.pngProtocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper =>RegistrProtocol =>Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
的第二个ProtocolFilterWrapper#buildInvokerChain
方法返回了一个带有filter链的匿名invoker,所以调用的时候会debug到这一行,通过invoke(next, invocation)
的方式一个个调用排在后面的filter
直到调用真正的invoker
。下面会有一个Filter
的invoke
方法的例子。
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 设置 RpcContext 对象
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0) // 本地地址
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // 远程地址
// 设置 RpcInvocation 对象的 `invoker` 属性
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
// 服务调用
try {
return invoker.invoke(invocation);
} finally {
// 清理隐式参数集合
RpcContext.getContext().clearAttachments();
}
}
}
可以看到,在这个Filter
里接着调用了传入的下一个Invoker
的invoke
方法。
到最后会调用到DubboInvoker
,执行他的父类AbstractInvoker
的invoke
,在invoke
中再调用DubboInvoker
本类doInvoke
的实现
2.9 AbstractInvoker#invoke 设置隐式传参、async值
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service ...");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 设置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 设置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 变量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
// 设置异步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子类实现
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}
上面大部分代码用于添加信息到 RpcInvocation#attachment
变量中,添加完毕后,调用 doInvoke
执行后续的调用
2.10 DubboInvoker#doInvoke 开始远程调用逻辑
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
// 设置 path 和 version 到 attachment 中
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
// 从 clients 数组中获取 ExchangeClient
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 获取异步配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// isOneway 为 true,表示“单向”通信
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 异步无返回值
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// 发送请求
currentClient.send(inv, isSent);
// 设置上下文中的 future 字段为 null
RpcContext.getContext().setFuture(null);
// 返回一个空的 RpcResult
return new RpcResult();
}
// 异步有返回值
else if (isAsync) {
// 发送请求,并得到一个 ResponseFuture 实例
ResponseFuture future = currentClient.request(inv, timeout);
// 设置 future 到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 暂时返回一个空结果
return new RpcResult();
}
// 同步调用
else {
RpcContext.getContext().setFuture(null);
// 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(..., "Invoke remote method timeout....");
} catch (RemotingException e) {
throw new RpcException(..., "Failed to invoke remote method: ...");
}
}
// 省略其他方法
}
在服务引用中分析过,当调用
refer
的时候,会把clients
传进来,这里的clients
正是ReferenceCountExchangeClient
,他仅仅是一个包装类,在这个getClients(url)
方法中又调用了initClient(url)创建了HeaderExchangeClient
,并将其放入到ReferenceCountExchangeClient
中。详细可看我写的另一篇Dubbo中的服务引用
注意这里的几个调用,异步无返回值调用的是
send
方法,而同步和异步返回则是request
方法。
当服务消费者还未接收到调用结果时,用户线程调用get
方法会被阻塞住。同步调用模式下,框架获得DefaultFuture
对象后,会立即调用get
方法进行等待。而异步模式下则是将该对象封装到FutureAdapter
实例中,并将 FutureAdapter 实例设置到RpcContext
中,供用户使用。FutureAdapter
是一个适配器,用于将Dubbo
中的ResponseFuture
与JDK
中的Future
进行适配。这样当用户线程调用Future
的get
方法时,经过FutureAdapter
适配,最终会调用ResponseFuture
实现类对象的get
方法,也就是DefaultFuture
的get
方法。
具体的异步接收结果的分析会放到另一篇来讲。
这里的currentClient
正是ReferenceCountExchangeClient
。那就来看看这个类。
2.11ReferenceCountExchangeClient 这个是装饰类
final class ReferenceCountExchangeClient implements ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
private ExchangeClient client;
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
this.client = client;
// 引用计数自增
referenceCount.incrementAndGet();
this.url = client.getUrl();
// ...
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接调用被装饰对象的同签名方法
return client.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接调用被装饰对象的同签名方法
return client.request(request, timeout);
}
public void send(Object message) throws RemotingException {
// 直接调用被装饰对象的同签名方法
client.send(message);
}
/** 引用计数自增,该方法由外部调用 */
public void incrementAndGetCount() {
// referenceCount 自增
referenceCount.incrementAndGet();
}
@Override
public void close(int timeout) {
// referenceCount 自减
if (referenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
client = replaceWithLazyClient();
}
}
// 省略部分方法
}
再复习下
LazyConnectExchangeClient
的作用,当服务引用时,我们并不想此时就是开始通信,而是在调用的时候再与服务端通信,LazyConnectExchangeClient
就像是一个缓存,在服务调用的时候才会创建真正的Client
去连接,节省了资源。此时正是服务调用的时候。
可以看到这里调用的是client
的方法,也就是他包装的HeaderExchangeClient
类。
2.12 HeaderExchangeClient 被装饰类,信息交互客户端
public class HeaderExchangeClient implements ExchangeClient {
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
private ScheduledFuture<?> heartbeatTimer;
private int heartbeat;
private int heartbeatTimeout;
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
// 以下代码均与心跳检测逻辑有关
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 开启心跳检测定时器
startHeartbeatTimer();
}
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
// 直接 HeaderExchangeChannel 对象的同签名方法
return channel.request(request);
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
// 直接 HeaderExchangeChannel 对象的同签名方法
return channel.request(request, timeout);
}
public void send(Object message) throws RemotingException {
// 直接 HeaderExchangeChannel 对象的同签名方法
channel.send(message);
}
@Override
public void close() {
doClose();
channel.close();
}
private void doClose() {
// 停止心跳检测定时器
stopHeartbeatTimer();
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
try {
heartbeatTimer.cancel(true);
scheduled.purge();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(e.getMessage(), e);
}
}
}
heartbeatTimer = null;
}
// 省略部分方法
}
HeaderExchangeClient
中很多方法只有一行代码,即调用HeaderExchangeChannel
对象的同签名方法。那HeaderExchangeClient
作用则是封装了一些关于心跳检测的逻辑。这个我们会在另一篇中详细讲解
这里的channel
属性是由new HeaderExchangeChannel(client)
传入的,这个client
是url
自适应的,默认为nettyClient
,当然传入这个client
的时机,也正是在服务引用的refer
方法时。
2.13 HeaderExchangeChannel 定义Request 对象
final class HeaderExchangeChannel implements ExchangeChannel {
private final Channel channel;
HeaderExchangeChannel(Channel channel) {
if (channel == null) {
throw new IllegalArgumentException("channel == null");
}
// 这里的 channel 指向的是 NettyClient
this.channel = channel;
}
@Override
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 创建 Request 对象
Request req = new Request();
req.setVersion("2.0.0");
// 设置双向通信标志为 true,即需要响应
req.setTwoWay(true);
// 这里的 request 变量类型为 RpcInvocation
req.setData(request);
// 创建 DefaultFuture 对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 注意这里channel 指向的是 NettyClient,用 NettyClient 的 send 方法发送请求
channel.send(req);
} catch (RemotingException e) { // 发生异常,取消 DefaultFuture
future.cancel();
throw e;
}
// 返回 DefaultFuture 对象
return future;
}
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion("2.0.0");
// 设置双向通信标志为 false,不需要响应
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
}
上面的方法首先定义了一个
Request
对象,然后再将该对象传给 NettyClient (注意这个Client是url自适应的,默认为NettyClient
,并非固定)的send
方法,进行后续的调用。
request
与send
只有request.setTwoWay(true/false)
的区别,通过这个方法我们就可以来设置有无返回值啦。
需要说明的是,NettyClient
中并未实现send
方法,该方法继承自父类AbstractPeer
2.14 AbstractPeer 调用子类的重载方法
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
@Override
public void send(Object message) throws RemotingException {
// 该方法由 AbstractClient 类实现
send(message, url.getParameter(Constants.SENT_KEY, false));
}
// 省略其他方法
}
继续看AbstractClient的实现吧
2.15 AbstractClient 调用指定子类的实现拿到Channel
public abstract class AbstractClient extends AbstractEndpoint implements Client {
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
Channel channel = getChannel();
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send ...");
}
// 继续向下调用
channel.send(message, sent);
}
protected void connect() throws RemotingException {
// 获得锁
connectLock.lock();
try {
// 已连接,
if (isConnected()) {
return;
}
// 初始化重连线程
initConnectStatusCheckCommand();
// 执行连接
doConnect();
// 连接失败,抛出异常
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
// 连接成功,打印日志
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
// 设置重连次数归零
reconnect_count.set(0);
// 设置未打印过错误日志
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
// 释放锁
connectLock.unlock();
}
}
protected abstract Channel getChannel();
// 省略其他方法
}
这边的
send
方法就直接到最后调用到了NettyChannel#send
,在此之前我们先来看看他的重连逻辑。
AbstractClient
实现了重连逻辑,而真正通信仍然是交由子类实现。
注意:在
AbstractClient
的构造方法中(有点长就不贴了),会调用这个connect
方法,(同样的,在这个send
方法里,如果断开了连接,也会调用这个connect
方法。)而这个方法里面的doConnect()
方法则是子类的实现。
那么就来简单看看默认实现子类NettyClient#doConnect
的实现
2.16 NettyClient#doConnect 连接服务器
public class NettyClient extends AbstractClient {
// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
private volatile Channel channel;
//调用父类的构造方法
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 连接服务器
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 等待连接成功或者超时
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
// 连接成功
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// 关闭老的连接
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 若 NettyClient 被关闭,关闭连接
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
// 设置新连接
} else {
NettyClient.this.channel = newChannel;
}
}
// 发生异常,抛出 RemotingException 异常
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
// 无结果(连接超时),抛出 RemotingException 异常
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) { // 【TODO 8028】为什么不取消 future
//future.cancel(true);
}
}
}
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// 获取一个 NettyChannel 类型对象
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
}
在
doConnect
中有一段代码NettyClient.this.channel = newChannel
,这个newChannel
就是真正Netty对象——org.jboss.netty.channel.Channel
。在NettyClient
初始化的过程中会调用父类的构造方法,再调用这个doConnect
方法从而对其赋值。再来看
getChannel
方法,好家伙,还得套娃,这边的c
就是我们的org.jboss.netty.channel.Channel
获取到
NettyChannel
实例后,即可进行后续的调用。下面看一下NettyChannel#getOrAddChannel
方法。
2.17 NettyChannel#getOrAddChannel 静态方法拿到真正的Netty的Channel
final class NettyChannel extends AbstractChannel {
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
/** 私有构造方法 */
private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
// 尝试从集合中获取 NettyChannel 实例
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
// 如果 ret = null,则创建一个新的 NettyChannel 实例
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
// 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
}
注意这个
getOrAddChannel
是静态方法,里面才创建了一个NettyChannel
得到实例。
获取到NettyChannel
实例后,即可进行后续的调用。下面看一下NettyChannel
方法。
2.18 NettyChannel#send
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息(包含请求和响应消息)
ChannelFuture future = channel.write(message);
// sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
// 1. true: 等待消息发出,消息发送失败将抛出异常
// 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
// 默认情况下 sent = false;
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待消息发出,若在规定时间没能发出,success 会被置为 false
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message ...");
}
// 若 success 为 false,这里抛出异常
if (!success) {
throw new RemotingException(this, "Failed to send message ...");
}
}
上述方法里的
ChannelFuture future = channel.write(message)
;
这个channel就是org.jboss.netty.channel.Channel
,往下就是Netty的工作了。
网友评论