美文网首页
第9章 Dubbo 服务通信流程的设计与实现

第9章 Dubbo 服务通信流程的设计与实现

作者: 原水寒 | 来源:发表于2019-07-24 08:14 被阅读0次
    String hello = demoService.sayHello("world");
    

    一、服务通信简图

    image.png
    • consumer
    1. proxy0(demoService)调用其属性 InvocationHandler
    2. InvocationHandler 调用其属性 MockClusterInvoker
    3. MockClusterInvoker 调用其属性 FailoverClusterInvoker
    4. FailoverClusterInvoker 调用其属性 RegistryDirectory,从中根据 method 获取 List<DubboInvoker(filtered)>
    5. RegistryDirectory 调用其 List<Router> 对 List<DubboInvoker(filtered) 进行过滤
    6. FailoverClusterInvoker 获取 LoadBalancer,从 5 的结果获取一个 DubboInvoker(filtered)
    7. 执行 DubboInvoker(filtered) 的过滤链,最后执行 DubboInvoker,发起 Netty 调用
    • provider
    1. 根据请求参数 RpcInvocation(获取 interface、group、version) 和通信通道 Channel(获取 port) 组装 serviceKey,根据 serviceKey 获取 DubboExporter
    2. 从 DubboExporter 的取出其属性 AbstractProxyInvoker(filtered)
    3. 执行 AbstractProxyInvoker(filtered) 的过滤链,最后执行 AbstractProxyInvoker
    4. AbstractProxyInvoker 调用 Wrapper,Wrapper 调用 DemoServiceImpl

    注意

    image.png
    通过 第7章 Dubbo 服务暴露流程的设计与实现 第8章 Dubbo 服务引用流程的设计与实现 这两章的分析,我们发现在 Dubbo 中存在四类 Invoker,而 Invoker 也是 Dubbo 中对调用逻辑进行封装的一个模型体:
    • AbstractProxyInvoker:服务端,提供了对具体实现(eg. DemoServiceImpl)的调用封装;
    • DubboInvoker:客户端,封装了 NettyClient,进行远程调用的发起;
    • 具体的 XxxClusterInvoker:客户端,将多个Invoker伪装成一个集群版的Invoker;
    • InvokerWrapper 包装类:eg. MockClusterInvoker,应用于客户端,是具体的 XxxClusterInvoker 的包装类,提供 mock操作。

    二、服务通信源码梯形图

    服务通信分为:客户端发出请求;服务端接收请求并返回响应;客户端接收响应。

    2.1 客户端发出请求

    //代理发出请求
    proxy0.sayHello(String paramString)
    -->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
      -->new RpcInvocation(method, args)
      -->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
        //ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
        -->AbstractClusterInvoker.invoke(Invocation invocation)
          //获取Invokers
          -->list(Invocation invocation)
            -->AbstractDirectory.list(Invocation invocation)
              -->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
              -->MockInvokersSelector.getNormalInvokers(List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
          //获取负载均衡器
          -->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
          -->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
          -->FailoverClusterInvoker.doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
            //使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
            -->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
              -->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
                -->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
                  -->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
            //执行listener和filter链
            -->ListenerInvokerWrapper.invoke
              -->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
                -->FutureFilter.invoke(Invocation invocation)
                  -->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
                    -->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
                      -->DubboInvoker.doInvoke(Invocation invocation)
                        //获取ExchangeClient进行消息的发送
                        -->ReferenceCountExchangeClient.request(Object request, int timeout)
                          -->HeaderExchangeClient.request(Object request, int timeout)
                            -->HeaderExchangeChannel.request(Object request, int timeout)
                              -->AbstractClient.send(Object message, boolean sent)//NettyClient的父类
                                -->getChannel()//NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
                                -->NettyChannel.send(Object message, boolean sent)
                                  -->NioClientSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
    
    总体流程(默认情况下):
    1. 代理 proxy0(demoService)调用 InvokerInvocationHandler 执行 sayHello
    2. InvokerInvocationHandler 先将请求信息(methodName,parameterTypes,arguments)封装成 RpcInvocation 对象,之后调用 MockClusterInvoker#invoke
    3. MockClusterInvoker 根据是否配置了mock信息(服务降级信息)决定走mock逻辑还是正常逻辑(关于服务降级的详细源码解析,见 http://www.cnblogs.com/java-zhao/p/8320519.html,这里只走正常逻辑)
    4. MockClusterInvoker 调用 FailoverClusterInvoker#invoke
    5. FailoverClusterInvoker 先从 RegistryDirectory 中的newMethodInvokerMap 中根据 methodName 获取 InvokerDelegate 实例列表(即可用的 provider 列表)
    6. 使用 Router 对获取到的 InvokerDelegate 实例列表再进行一次选择(这里就可以实现服务的读写分离)
    7. 根据 Dubbo SPI 机制创建负载均衡器(默认是 RandomLoadBalance)
    8. 使用 RandomLoadBalance 从被 Router 过滤过的 InvokerDelegate 实例列表选择一个实例出来(即从 provider 列表选出一台 provider 机器来)
    9. 之后执行 InvokerDelegate#invoke 方法:这里首先执行 filter 链,最后执行到 DubboInvoker#doInvoke 方法,在这里首先为 RpcInvocation 添加了新的参数,然后选取了一个 ReferenceCountExchangeClient,向服务端发出了请求。

    注意最终的 RpcInvocation 实例包含的属性:

    methodName=sayHello
    parameterTypes=[class java.lang.String]
    arguments=[world]
    attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}
    

    2.2 服务端接收请求并返回响应

    服务端接收请求消息
    NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -->MultiMessageHandler.received(Channel channel, Object message)
      -->HeartbeatHandler.received(Channel channel, Object message)
        -->AllChannelHandler.received(Channel channel, Object message)
          -->ExecutorService cexecutor = getExecutorService()
          -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
            -->ChannelEventRunnable.run()
              -->DecodeHandler.received(Channel channel, Object message)
                -->decode(Object message)
                -->HeaderExchangeHandler.received(Channel channel, Object message)
                  -->Response response = handleRequest(exchangeChannel, request)
                    -->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message就是上边的RpcInvocation
                      //首先获取exporter,之后再获取invoker
                      -->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20880
                        -->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
                        -->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
                      //执行filter链
                      -->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
                        -->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
                          -->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
                            -->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
                              -->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
                                -->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
                                  -->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
                                    -->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
                                      //执行真正的invoker调用
                                      -->AbstractProxyInvoker.invoke(Invocation invocation)
                                        -->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
                                          -->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
                                            -->DemoServiceImpl.sayHello(String name)
                                            -->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
              服务端发送响应消息
                  -->channel.send(response)//NettyChannel
                    -->NioAcceptedSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Response实例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]
    
    总体流程(默认情况下):
    1. NettyServer 接收到请求消息后,进行解码,之后交给 provider 端业务线程池进行处理;
    2. 业务线程调用 DubboProtocol$requestHandler#reply 方法,该方法首先根据请求信息 RpcInvocation 组装 serviceKey,之后根据 serviceKey 从 DubboProtocol 的 exporterMap 中获取指定服务的 DubboExporter(serviceKey 是由 RpcInvocation 中的 group/path:version:port 组成,其中 port 来自于 channel)
    3. 之后从 DubboExporter 中获取存储在其内的 InvokerDelegete 实例,之后执行 filter 链,最后执行到 AbstractProxyInvoker#invoke 方法,
    public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
        /**
         * 真实对象 ref, eg. DemoServiceImpl
         */
        private final T proxy;
        ...
        public Result invoke(Invocation invocation) throws RpcException {
            try {
                return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
            } catch (InvocationTargetException e) {
                return new RpcResult(e.getTargetException());
            } catch (Throwable e) {
                throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        // 子类覆写的真正调用的方法
        protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
        ...
    }
    

    AbstractProxyInvoker 的实现是在 ProxyFactory 中创建的匿名内部类。

        public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
            // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
            final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
            return new AbstractProxyInvoker<T>(proxy, type, url) {
                @Override
                protected Object doInvoke(T proxy, String methodName,
                                          Class<?>[] parameterTypes,
                                          Object[] arguments) throws Throwable {
                    return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
                }
            };
        }
    

    这是在讲 provider 服务暴露时的一段代码,这里实现了 AbstractProxyInvoker#doInvoke 方法,在该方法中又调用了 wrapper#invokeMethod

        /**
         * @param o  实现类
         * @param n  方法名称
         * @param p  参数类型
         * @param v  参数值
         * @return
         * @throws java.lang.reflect.InvocationTargetException
         */
        public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
            com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
            try {
                w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
            } catch (Throwable e) {
                throw new IllegalArgumentException(e);
            }
            try {
                if ("sayHello".equals(n) && p.length == 1) {
                    return ($w) w.sayHello((java.lang.String) v[0]);
                }
            } catch (Throwable e) {
                throw new java.lang.reflect.InvocationTargetException(e);
            }
            throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
        }
    

    这里最终调到了 DemoServiceImpl#sayHello 方法。最后将响应结果封装成 RpcResult,返回给客户端。

    2.3 客户端接收响应

    客户端接收响应消息
    NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -->MultiMessageHandler.received(Channel channel, Object message)
      -->HeartbeatHandler.received(Channel channel, Object message)
        -->AllChannelHandler.received(Channel channel, Object message)
          -->ExecutorService cexecutor = getExecutorService()
          -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
            -->ChannelEventRunnable.run()
              -->DecodeHandler.received(Channel channel, Object message)
                -->decode(Object message)
                -->HeaderExchangeHandler.received(Channel channel, Object message)
                  -->handleResponse(Channel channel, Response response)
                    -->DefaultFuture.received(channel, response)
                      -->doReceived(Response res)//异步转同步
    

    客户端接收到服务端的响应之后,先解码,之后将响应信息交给业务线程池去处理,然后就涉及到 Dubbo 的异步转同步的实现,直接看 Dubbo 线程模型。

    三、Dubbo 线程模型

    以同步调用为例。

    Alt pic

    线程池

    在provider端,存在三个线程池:

    • boss 线程池:Netty 所有,默认只包含一个 NioEventLoop。用于接收客户端的连接 channel,并且之后将 channel 注册到 worker 线程池中的一个 NioEventLoop 上(实际上是注册在 NioEventLoop 所拥有的那个 Selector 上);
    • worker 线程池:Netty 所有,在 Dubbo 中默认包含“核数+1”个 NioEventLoop(在 Netty 中默认是2*核数)。worker 线程池中的每一个 NioEventLoop 去阻塞(Selector.select())获取注册在其上的 channel 准备就绪的事件,然后做出相应处理;
    • server 线程池:Dubbo 服务端的业务线程池,默认 worker 线程会将解码后的请求消息交由该线程池进行处理。

    在consumer端,存在两个线程池:

    • worker 线程池:同 provider 的 worker 线程池
    • client 线程池:Dubbo 服务端的业务线程池,默认 worker 线程会将解码后的响应消息交由该线程池进行处理。

    通信流程

    在上一小节中总结了通信流程的源码调用链,这一节从线程模型的角度来看通信流程。(下面以同步调用为例)

    1. consumer 端用户线程在发出请求之前会先创建一个 DefaultFuture 对象;并将 requestID 作为 DefaultFuture 对象的 key 存储在 Map<Long, DefaultFuture> FUTURES 中(注意:每一个 requestID 是一个请求的唯一标识,最后相应的响应 Response 的 responseID 就等于这个 requestID
    2. 之后调用 Netty 编码并发出请求,然后马上调用 DefaultFuture#get 进行阻塞等待(阻塞等待 response 不为空);
    3. provider 端 NettyServer 接收到请求后,解码,然后交由 server 线程池进行处理;
    4. server 线程池处理完成之后,调用 Netty 编码并发送响应消息给 consumer 端;
    5. consumer 端接收到响应后,解码,然后交给 client 线程池处理,client 线程池从 Map<Long, DefaultFuture> FUTURES 中获取 key=responseIDDefaultFuture 对象,然后将响应消息填充到其 response 属性后,唤醒 consumer 端阻塞的用户线程;
    6. 最后 consumer 得到了响应

    相关文章

      网友评论

          本文标题:第9章 Dubbo 服务通信流程的设计与实现

          本文链接:https://www.haomeiwen.com/subject/kuuvlctx.html