美文网首页
Dubbo的服务调用(消费端)

Dubbo的服务调用(消费端)

作者: 就这些吗 | 来源:发表于2020-07-26 02:30 被阅读0次

    本系列参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。
    Dubbo版本为2.6.1。

    文章内容顺序:

    1. 本地调用链路以及XML配置

    2. 远程调用

    • 2.1 远程调用的整体链路
    • 2.2 代理类执行
    • 2.3 InvokerInvocationHandler#invoke 过滤toString等方法
    • 2.4 MockClusterInvoker#invoke Mock逻辑判断
    • 2.5 AbstractClusterInvoker#invoke 服务路由
    • 2.6 FailoverClusterInvoker#doInvoke 负载均衡
    • 2.7 ListenerInvokerWrapper#invoke Invoker和监听器的包装类
    • 2.8 Filter链路的invoke 包含多个 Filter 调用
    • 2.9 AbstractInvoker#invoke 设置隐式传参、async值
    • 2.10 DubboInvoker#doInvoke 开始远程调用逻辑
    • 2.11 ReferenceCountExchangeClient 这个是装饰类
    • 2.12 HeaderExchangeClient 被装饰类,信息交互客户端
    • 2.13 HeaderExchangeChannel 定义Request 对象
    • 2.14 AbstractPeer 调用子类的重载方法
    • 2.15 AbstractClient 调用指定子类的实现拿到Channel
    • 2.16 NettyClient#doConnect 连接服务器
    • 2.17 NettyChannel#getOrAddChannel 静态方法拿到真正的Netty的Channel
    • 2.18 NettyChannel#send 调用Netty发送消息

    1. 本地调用链路以及XML配置

    image.png

    引用芋艿的本地调用的链路图,先忽略橘色框框的注释,这个就是本地调用的链路,
    相比远程调用,实现上会简单很多:因为调用的服务,就在本地进程内,且不存在多个,所以不需要集群容错和网络通信相关的功能。
    这里要提一下,我在实际调用debug的时候发现这张图的ProtocolFilterWrapperListerInvokerWrapper的顺序应该调换一下,不过并不影响理解,后面会介绍到。
    注意:这个本地调用指的是ProviderConsumer都在一个项目里,只需要启动一个Consumer就可以了。xml的配置如下

    image.png

    2. 远程调用

    2.1远程调用的整体链路

    然后我们再来看远程调用,那也就是橘红色的框框锁添加的内容了,下面列出来的也是远程调用的链路,我们就按着这个链路来一步步往下分析。

    proxy0#sayHello(String)//代理类执行
      —> InvokerInvocationHandler#invoke(Object, Method, Object[])//过滤toString等方法
        —> MockClusterInvoker#invoke(Invocation) //Mock逻辑判断
          —> AbstractClusterInvoker#invoke(Invocation)//服务路由
            —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)//负载均衡
              —> ListenerInvokerWrapper#invoke(Invocation) //Invoker和监听器的包装类
         //      —> ProtocolFilterWrapper//这个比较特殊,会在匿名类里调用了Filter,下面会分析
                —>Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
                  —> AbstractInvoker#invoke(Invocation) //设置隐式传参、async值
                    —> DubboInvoker#doInvoke(Invocation)//开始远程调用逻辑
                      —> ReferenceCountExchangeClient#request(Object, int)//这个是装饰类
                        —> HeaderExchangeClient#request(Object, int)//被装饰类,信息交互客户端
                          —> HeaderExchangeChannel#request(Object, int)//定义Request 对象
                            —> AbstractPeer#send(Object)//调用子类的重载方法
                              —> AbstractClient#send(Object, boolean)//调用指定子类的实现拿到Channel
                                —> NettyChannel#send(Object, boolean)//调用Netty发送消息
                                  —> NioClientSocketChannel#write(Object)//Netty的逻辑
    

    2.2代理类执行

    Dubbo 默认使用 Javassist 框架为服务接口生成动态代理类,样例如下

    public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
        // 方法数组
        public static Method[] methods;
        private InvocationHandler handler;
    
        public proxy0(InvocationHandler invocationHandler) {
            this.handler = invocationHandler;
        }
    
        public proxy0() {
        }
    
        public String sayHello(String string) {
            // 将参数存储到 Object 数组中
            Object[] arrobject = new Object[]{string};
            // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
            Object object = this.handler.invoke(this, methods[0], arrobject);
            // 返回调用结果
            return (String)object;
        }
    
        /** 回声测试方法 */
        public Object $echo(Object object) {
            Object[] arrobject = new Object[]{object};
            Object object2 = this.handler.invoke(this, methods[1], arrobject);
            return object2;
        }
    }
    

    首先将运行时参数存储到数组中,然后调用 InvocationHandler 接口实现类的 invoke 方法,得到调用结果,最后将结果转型并返回给调用方。

    2.3 InvokerInvocationHandler#invoke 过滤toString等方法

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            
            // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            
            // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            
            // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    

    InvokerInvocationHandler 中的 invoker成员变量类型为 MockClusterInvokerMockClusterInvoker 内部封装了服务降级逻辑。

    2.4MockClusterInvoker#invokeMock 逻辑判断

    public class MockClusterInvoker<T> implements Invoker<T> {
        
        private final Invoker<T> invoker;
        
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
            // 获取 mock 配置值
            String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
                // 比如 FailoverClusterInvoker
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                // force:xxx 直接执行 mock 逻辑,不发起远程调用
                result = doMockInvoke(invocation, null);
            } else {
                // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
                try {
                    // 调用其他 Invoker 对象的 invoke 方法
                    result = this.invoker.invoke(invocation);
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    } else {
                        // 调用失败,执行 mock 逻辑
                        result = doMockInvoke(invocation, e);
                    }
                }
            }
            return result;
        }
        
        // 省略其他方法
    }
    

    只有mock配置是force或者fail且调用失败的时候才会进入doMockInvoke的逻辑,这里我们在前文分析过了doMockInvoke的逻辑,就认为调用没有出错来看,进入下一个invoke方法。

    !!!注意这里的invkerFailoverClusterInvoker,他继承了AbstractClusterInvoker并且没有重写invoke方法,实际上我们调用的是FailoverClusterInvoker#invoke,但是这个方法的代码是在AbstractClusterInvoker的类文件中。下面出现的一些带AbstractXXX也是一样的道理,就不再说多遍了。

    2.5AbstractClusterInvoker#invoke 服务路由

        public Result invoke(final Invocation invocation) throws RpcException {
            // 校验是否销毁
            checkWhetherDestroyed();
    
            // 获得所有服务提供者 Invoker 集合
            List<Invoker<T>> invokers = list(invocation);
    
            // 获得 LoadBalance 对象
            LoadBalance loadbalance;
            if (invokers != null && !invokers.isEmpty()) {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            } else {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
            }
    
            // 设置调用编号,若是异步调用
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            // 执行调用
            return doInvoke(invocation, invokers, loadbalance);
        }
    
        protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;
    

    AbstractClusterInvoker#invoke调用了list方法获得路由后的可用Invoker集合(在list中会进行服务路由,就不进去分析了。)接着调用子类自己的实现FailoverClusterInvoker#doInvoke

    2.6FailoverClusterInvoker#doInvoke

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
            checkInvokers(copyinvokers, invocation);
            // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // 保存最后一次调用的异常
            // retry loop.
            RpcException le = null; // last exception.
            // 保存已经调用过的Invoker
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            // failover机制核心实现:如果出现调用失败,那么重试其他服务器
            for (int i = 0; i < len; i++) {
                // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
                    // 通过调用 list 可得到最新可用的 Invoker 列表
                if (i > 0) {
                    checkWhetherDestroyed();
                    // 根据Invocation调用信息从Directory中获取所有可用Invoker
                    copyinvokers = list(invocation);
                    // 对 copyinvokers 进行判空检查
                    checkInvokers(copyinvokers, invocation);
                }
                // 根据负载均衡机制从copyinvokers中选择一个Invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                // 保存每次调用的Invoker
                invoked.add(invoker);
                // 设置已经调用的 Invoker 集合到RPC 上下文中
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    // RPC 调用得到 Result
                    Result result = invoker.invoke(invocation);
                    // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + invocation.getMethodName()
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyinvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    // 如果是业务性质的异常,不再重试,直接抛出
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    // 其他性质的异常统一封装成RpcException
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
            throw new RpcException(le.getCode(), "Failed to invoke the method " + invocation.getMethodName()
                    + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers "
                    + providers + " (" + providers.size() + "/" + copyinvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost()
                    + " using the dubbo version " + Version.getVersion()
                    + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
    
    }
    

    FailoverClusterInvokerdoInvoke方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。
    for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。
    接下来调用的是ListenerInvokerWrapper#invoke

    2.7ListenerInvokerWrapper#invoke Invoker和监听器的包装类

    这个是在远程引用,调用refer方法的时候,通过Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper =>RegistrProtocol =>Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol执行链中的第二个ProtocolListenerWrapper方法中的refer加入进来的监听器和Invoker的包装类
    ListenerInvokerWrapper装饰invoker, 在构造器中遍历listeners构建referer的监听链

    image.png

    而这个包装类里的invoke,则没什么好讲的,直接就调用了Filter链路的invoke

    2.8Filter链路的invoke 包含多个 Filter 调用

    仍然是调用refer方法的时候,通过Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper =>RegistrProtocol =>Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol的第二个ProtocolFilterWrapper#buildInvokerChain方法返回了一个带有filter链的匿名invoker,所以调用的时候会debug到这一行,通过invoke(next, invocation)的方式一个个调用排在后面的filter直到调用真正的invoker。下面会有一个Filterinvoke方法的例子。

    image.png
    public class ConsumerContextFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            // 设置 RpcContext 对象
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0) // 本地地址
                    .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // 远程地址
            // 设置 RpcInvocation 对象的 `invoker` 属性
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation) invocation).setInvoker(invoker);
            }
            // 服务调用
            try {
                return invoker.invoke(invocation);
            } finally {
                // 清理隐式参数集合
                RpcContext.getContext().clearAttachments();
            }
        }
    
    }
    

    可以看到,在这个Filter里接着调用了传入的下一个Invokerinvoke方法。
    到最后会调用到DubboInvoker,执行他的父类AbstractInvokerinvoke,在invoke中再调用DubboInvoker本类doInvoke的实现

    2.9 AbstractInvoker#invoke 设置隐式传参、async值

    public abstract class AbstractInvoker<T> implements Invoker<T> {
        
        public Result invoke(Invocation inv) throws RpcException {
            if (destroyed.get()) {
                throw new RpcException("Rpc invoker for service ...");
            }
            RpcInvocation invocation = (RpcInvocation) inv;
            // 设置 Invoker
            invocation.setInvoker(this);
            if (attachment != null && attachment.size() > 0) {
                // 设置 attachment
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
                invocation.addAttachments(contextAttachments);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                // 设置异步信息到 RpcInvocation#attachment 中
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            try {
                // 抽象方法,由子类实现
                return doInvoke(invocation);
            } catch (InvocationTargetException e) {
                // ...
            } catch (RpcException e) {
                // ...
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }
    
        protected abstract Result doInvoke(Invocation invocation) throws Throwable;
        
        // 省略其他方法
    }
    

    上面大部分代码用于添加信息到 RpcInvocation#attachment变量中,添加完毕后,调用 doInvoke 执行后续的调用

    2.10 DubboInvoker#doInvoke 开始远程调用逻辑

    public class DubboInvoker<T> extends AbstractInvoker<T> {
        
        private final ExchangeClient[] clients;
        
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            // 设置 path 和 version 到 attachment 中
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                // 从 clients 数组中获取 ExchangeClient
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                // 获取异步配置
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                // isOneway 为 true,表示“单向”通信
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    
                // 异步无返回值
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    // 发送请求
                    currentClient.send(inv, isSent);
                    // 设置上下文中的 future 字段为 null
                    RpcContext.getContext().setFuture(null);
                    // 返回一个空的 RpcResult
                    return new RpcResult();
                } 
    
                // 异步有返回值
                else if (isAsync) {
                    // 发送请求,并得到一个 ResponseFuture 实例
                    ResponseFuture future = currentClient.request(inv, timeout);
                    // 设置 future 到上下文中
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    // 暂时返回一个空结果
                    return new RpcResult();
                } 
    
                // 同步调用
                else {
                    RpcContext.getContext().setFuture(null);
                    // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(..., "Invoke remote method timeout....");
            } catch (RemotingException e) {
                throw new RpcException(..., "Failed to invoke remote method: ...");
            }
        }
        
        // 省略其他方法
    }
    

    在服务引用中分析过,当调用refer的时候,会把clients传进来,这里的clients正是ReferenceCountExchangeClient ,他仅仅是一个包装类,在这个getClients(url)方法中又调用了initClient(url)创建了HeaderExchangeClient,并将其放入到ReferenceCountExchangeClient中。详细可看我写的另一篇Dubbo中的服务引用

    注意这里的几个调用,异步无返回值调用的是send方法,而同步和异步返回则是request方法。
    当服务消费者还未接收到调用结果时,用户线程调用get方法会被阻塞住。同步调用模式下,框架获得 DefaultFuture对象后,会立即调用 get方法进行等待。而异步模式下则是将该对象封装到FutureAdapter实例中,并将 FutureAdapter 实例设置到 RpcContext中,供用户使用。FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFutureJDK中的 Future 进行适配。这样当用户线程调用Futureget 方法时,经过 FutureAdapter 适配,最终会调用 ResponseFuture实现类对象的 get 方法,也就是 DefaultFutureget方法。
    具体的异步接收结果的分析会放到另一篇来讲。

    这里的currentClient正是ReferenceCountExchangeClient。那就来看看这个类。

    2.11ReferenceCountExchangeClient 这个是装饰类

    final class ReferenceCountExchangeClient implements ExchangeClient {
    
        private final URL url;
        private final AtomicInteger referenceCount = new AtomicInteger(0);
        private ExchangeClient client;
    
        public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
            this.client = client;
            // 引用计数自增
            referenceCount.incrementAndGet();
            this.url = client.getUrl();
            
            // ...
        }
    
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request);
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            // 直接调用被装饰对象的同签名方法
            return client.request(request, timeout);
        }
    
       public void send(Object message) throws RemotingException {
          // 直接调用被装饰对象的同签名方法
            client.send(message);
        }
    
        /** 引用计数自增,该方法由外部调用 */
        public void incrementAndGetCount() {
            // referenceCount 自增
            referenceCount.incrementAndGet();
        }
        
            @Override
        public void close(int timeout) {
            // referenceCount 自减
            if (referenceCount.decrementAndGet() <= 0) {
                if (timeout == 0) {
                    client.close();
                } else {
                    client.close(timeout);
                }
                client = replaceWithLazyClient();
            }
        }
        
        // 省略部分方法
    }
    

    再复习下LazyConnectExchangeClient的作用,当服务引用时,我们并不想此时就是开始通信,而是在调用的时候再与服务端通信,LazyConnectExchangeClient就像是一个缓存,在服务调用的时候才会创建真正的Client去连接,节省了资源。此时正是服务调用的时候。
    可以看到这里调用的是client的方法,也就是他包装的HeaderExchangeClient类。

    2.12 HeaderExchangeClient 被装饰类,信息交互客户端

    public class HeaderExchangeClient implements ExchangeClient {
    
        private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
        private final Client client;
        private final ExchangeChannel channel;
        private ScheduledFuture<?> heartbeatTimer;
        private int heartbeat;
        private int heartbeatTimeout;
    
        public HeaderExchangeClient(Client client, boolean needHeartbeat) {
            if (client == null) {
                throw new IllegalArgumentException("client == null");
            }
            this.client = client;
            
            // 创建 HeaderExchangeChannel 对象
            this.channel = new HeaderExchangeChannel(client);
            
            // 以下代码均与心跳检测逻辑有关
            String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
            this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
            this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
            if (heartbeatTimeout < heartbeat * 2) {
                throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
            }
            if (needHeartbeat) {
                // 开启心跳检测定时器
                startHeartbeatTimer();
            }
        }
    
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            // 直接 HeaderExchangeChannel 对象的同签名方法
            return channel.request(request);
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            // 直接 HeaderExchangeChannel 对象的同签名方法
            return channel.request(request, timeout);
        }
    
    public void send(Object message) throws RemotingException {
            // 直接 HeaderExchangeChannel 对象的同签名方法
            channel.send(message);
        }
    
        @Override
        public void close() {
            doClose();
            channel.close();
        }
        
        private void doClose() {
            // 停止心跳检测定时器
            stopHeartbeatTimer();
        }
    
        private void startHeartbeatTimer() {
            stopHeartbeatTimer();
            if (heartbeat > 0) {
                heartbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                            @Override
                            public Collection<Channel> getChannels() {
                                return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS);
            }
        }
    
        private void stopHeartbeatTimer() {
            if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
                try {
                    heartbeatTimer.cancel(true);
                    scheduled.purge();
                } catch (Throwable e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
            heartbeatTimer = null;
        }
        
        // 省略部分方法
    }
    

    HeaderExchangeClient中很多方法只有一行代码,即调用 HeaderExchangeChannel对象的同签名方法。那HeaderExchangeClient作用则是封装了一些关于心跳检测的逻辑。这个我们会在另一篇中详细讲解
    这里的channel属性是由new HeaderExchangeChannel(client)传入的,这个clienturl自适应的,默认为nettyClient,当然传入这个client的时机,也正是在服务引用的refer方法时。

    2.13 HeaderExchangeChannel 定义Request 对象

    final class HeaderExchangeChannel implements ExchangeChannel {
        
        private final Channel channel;
        
        HeaderExchangeChannel(Channel channel) {
            if (channel == null) {
                throw new IllegalArgumentException("channel == null");
            }
            
            // 这里的 channel 指向的是 NettyClient
            this.channel = channel;
        }
        
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
           // 创建 Request 对象
            Request req = new Request();
            req.setVersion("2.0.0");
          // 设置双向通信标志为 true,即需要响应
            req.setTwoWay(true); 
        // 这里的 request 变量类型为 RpcInvocation
            req.setData(request);
            // 创建 DefaultFuture 对象
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 注意这里channel 指向的是 NettyClient,用 NettyClient 的 send 方法发送请求
                channel.send(req);
            } catch (RemotingException e) { // 发生异常,取消 DefaultFuture
                future.cancel();
                throw e;
            }
            // 返回 DefaultFuture 对象
            return future;
        }
    
        public void send(Object message, boolean sent) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
            }
            if (message instanceof Request
                    || message instanceof Response
                    || message instanceof String) {
                channel.send(message, sent);
            } else {
                Request request = new Request();
                request.setVersion("2.0.0");
     // 设置双向通信标志为 false,不需要响应
                request.setTwoWay(false);
                request.setData(message);
                channel.send(request, sent);
            }
        }
    }
    

    上面的方法首先定义了一个Request 对象,然后再将该对象传给 NettyClient (注意这个Client是url自适应的,默认为NettyClient,并非固定)的send方法,进行后续的调用。
    requestsend只有 request.setTwoWay(true/false)的区别,通过这个方法我们就可以来设置有无返回值啦。
    需要说明的是,NettyClient中并未实现send方法,该方法继承自父类 AbstractPeer

    2.14 AbstractPeer 调用子类的重载方法

    public abstract class AbstractPeer implements Endpoint, ChannelHandler {
        
        @Override
        public void send(Object message) throws RemotingException {
            // 该方法由 AbstractClient 类实现
            send(message, url.getParameter(Constants.SENT_KEY, false));
        }
        
        // 省略其他方法
    }
    

    继续看AbstractClient的实现吧

    2.15 AbstractClient 调用指定子类的实现拿到Channel

    public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (send_reconnect && !isConnected()) {
                connect();
            }
            
            // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
            Channel channel = getChannel();
            if (channel == null || !channel.isConnected()) {
                throw new RemotingException(this, "message can not send ...");
            }
            
            // 继续向下调用
            channel.send(message, sent);
        }
    
        protected void connect() throws RemotingException {
            // 获得锁
            connectLock.lock();
            try {
                // 已连接,
                if (isConnected()) {
                    return;
                }
                // 初始化重连线程
                initConnectStatusCheckCommand();
                // 执行连接
                doConnect();
                // 连接失败,抛出异常
                if (!isConnected()) {
                    throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                            + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
                // 连接成功,打印日志
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                                + ", channel is " + this.getChannel());
                    }
                }
                // 设置重连次数归零
                reconnect_count.set(0);
                // 设置未打印过错误日志
                reconnect_error_log_flag.set(false);
            } catch (RemotingException e) {
                throw e;
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", cause: " + e.getMessage(), e);
            } finally {
                // 释放锁
                connectLock.unlock();
            }
        }
        
        protected abstract Channel getChannel();
        
        // 省略其他方法
    }
    

    这边的send方法就直接到最后调用到了NettyChannel#send ,在此之前我们先来看看他的重连逻辑。
    AbstractClient实现了重连逻辑,而真正通信仍然是交由子类实现

    注意:在AbstractClient的构造方法中(有点长就不贴了),会调用这个connect方法,(同样的,在这个send方法里,如果断开了连接,也会调用这个connect方法。)而这个方法里面的doConnect()方法则是子类的实现。

    那么就来简单看看默认实现子类NettyClient#doConnect的实现

    2.16 NettyClient#doConnect 连接服务器

    public class NettyClient extends AbstractClient {
        
        // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
        private volatile Channel channel;
    //调用父类的构造方法
     public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
    protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            // 连接服务器
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                // 等待连接成功或者超时
                boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
                // 连接成功
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.channel();
                    try {
                        // 关闭老的连接
                        // Close old channel
                        Channel oldChannel = NettyClient.this.channel; // copy reference
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        // 若 NettyClient 被关闭,关闭连接
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        // 设置新连接
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                // 发生异常,抛出 RemotingException 异常
                } else if (future.cause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
                // 无结果(连接超时),抛出 RemotingException 异常
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            } finally {
                if (!isConnected()) { // 【TODO 8028】为什么不取消 future
                    //future.cancel(true);
                }
            }
        }
    
        @Override
        protected com.alibaba.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isConnected())
                return null;
            // 获取一个 NettyChannel 类型对象
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    }
    
    

    doConnect中有一段代码NettyClient.this.channel = newChannel,这个newChannel就是真正Netty对象——org.jboss.netty.channel.Channel。在NettyClient初始化的过程中会调用父类的构造方法,再调用这个doConnect方法从而对其赋值。

    再来看getChannel方法,好家伙,还得套娃,这边的c就是我们的org.jboss.netty.channel.Channel

    获取到 NettyChannel实例后,即可进行后续的调用。下面看一下 NettyChannel#getOrAddChannel方法。

    2.17 NettyChannel#getOrAddChannel 静态方法拿到真正的Netty的Channel

    final class NettyChannel extends AbstractChannel {
    
        private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = 
            new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
    
        private final org.jboss.netty.channel.Channel channel;
        
        /** 私有构造方法 */
        private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
            super(url, handler);
            if (channel == null) {
                throw new IllegalArgumentException("netty channel == null;");
            }
            this.channel = channel;
        }
    
        static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
            if (ch == null) {
                return null;
            }
            
            // 尝试从集合中获取 NettyChannel 实例
            NettyChannel ret = channelMap.get(ch);
            if (ret == null) {
                // 如果 ret = null,则创建一个新的 NettyChannel 实例
                NettyChannel nc = new NettyChannel(ch, url, handler);
                if (ch.isConnected()) {
                    // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                    ret = channelMap.putIfAbsent(ch, nc);
                }
                if (ret == null) {
                    ret = nc;
                }
            }
            return ret;
        }
    }
    

    注意这个getOrAddChannel是静态方法,里面才创建了一个NettyChannel得到实例。
    获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下NettyChannel方法。

    2.18 NettyChannel#send

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
            // 发送消息(包含请求和响应消息)
            ChannelFuture future = channel.write(message);
            
            // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
            //   1. true: 等待消息发出,消息发送失败将抛出异常
            //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
            // 默认情况下 sent = false;
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                // 等待消息发出,若在规定时间没能发出,success 会被置为 false
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    
        // 若 success 为 false,这里抛出异常
        if (!success) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    }
    

    上述方法里的 ChannelFuture future = channel.write(message);
    这个channel就是org.jboss.netty.channel.Channel,往下就是Netty的工作了。

    相关文章

      网友评论

          本文标题:Dubbo的服务调用(消费端)

          本文链接:https://www.haomeiwen.com/subject/zckwkktx.html