美文网首页
Dubbo的调用栈详细介绍

Dubbo的调用栈详细介绍

作者: liuliuzo | 来源:发表于2020-11-14 16:30 被阅读0次

    Dubbo framework示意图

    image.png

    整个调用过程

    (1)protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
    (2)exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
    (3)transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

    请求过程

    首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。


    image.png

    Javassist 框架为服务接口生成动态代理类

    image.png

    服务消费端

    就是上图中服务消费端的 proxy,用户代码通过这个 proxy 调用其对应的 Invoker 。DubboInvoker、 HessianRpcInvoker、 InjvmInvoker、 RmiInvoker、 WebServiceInvoker 中的任何一个

    public class DemoClientAction {
    
        private DemoService demoService;
    
        public void setDemoService(DemoService demoService) {
            this.demoService = demoService;
        }
    
        public void start() {
            String hello = demoService.sayHello("world" + i);
        }
    }
    

    Javassist 框架为服务接口生成动态代理类

    public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
        // 方法数组
        public static Method[] methods;
        private InvocationHandler handler;  
    
        public proxy0(InvocationHandler invocationHandler) {
            this.handler = invocationHandler;
        }
    
        public proxy0() {
        }
    
        public String sayHello(String string) {
            // 将参数存储到 Object 数组中
            Object[] arrobject = new Object[]{string};
            // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
            Object object = this.handler.invoke(this, methods[0], arrobject);
            // 返回调用结果
            return (String)object;
        }
    
        /** 回声测试方法 */
        public Object $echo(Object object) {
            Object[] arrobject = new Object[]{object};
            Object object2 = this.handler.invoke(this, methods[1], arrobject);
            return object2;
        }
    }
    

    InvokerInvocationHandler

    public class InvokerInvocationHandler implements InvocationHandler {
    
        private final Invoker<?> invoker;    //MockClusterInvoker 内部封装了服务降级逻辑
    
        public InvokerInvocationHandler(Invoker<?> handler) {
            this.invoker = handler;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            
            // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            
            // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
            if ("toString".equals(methodName) && parameterTypes.length == 0) {
                return invoker.toString();
            }
            if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
                return invoker.hashCode();
            }
            if ("equals".equals(methodName) && parameterTypes.length == 1) {
                return invoker.equals(args[0]);
            }
            
            // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
            return invoker.invoke(new RpcInvocation(method, args)).recreate();
        }
    }
    

    客户端调用栈

    proxy0#sayHello(String)
      —> InvokerInvocationHandler#invoke(Object, Method, Object[])
        —> MockClusterInvoker#invoke(Invocation)               //失败后的降级处理
          —> AbstractClusterInvoker#invoke(Invocation)         //负载均衡等操作均会在此阶段被执行
            —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)       //集群的容错处理
              —> Filter#invoke(Invoker, Invocation) 
                —> ListenerInvokerWrapper#invoke(Invocation)  
                  —> AbstractInvoker#invoke(Invocation) 
                    —> DubboInvoker#doInvoke(Invocation)
                      —> ReferenceCountExchangeClient#request(Object, int)
                        —> HeaderExchangeClient#request(Object, int)
                          —> HeaderExchangeChannel#request(Object, int)
                            —> AbstractPeer#send(Object)
                              —> AbstractClient#send(Object, boolean)
                                —> NettyChannel#send(Object, boolean)
                                  —> NioClientSocketChannel#write(Object)
    

    AbstractInvoker 具体实现

    public abstract class AbstractInvoker<T> implements Invoker<T> {
        
        public Result invoke(Invocation inv) throws RpcException {
            if (destroyed.get()) {
                throw new RpcException("Rpc invoker for service ...");
            }
            RpcInvocation invocation = (RpcInvocation) inv;
            // 设置 Invoker
            invocation.setInvoker(this);
            if (attachment != null && attachment.size() > 0) {
                // 设置 attachment
                invocation.addAttachmentsIfAbsent(attachment);
            }
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
                invocation.addAttachments(contextAttachments);
            }
            if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
                // 设置异步信息到 RpcInvocation#attachment 中
                invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            try {
                // 抽象方法,由子类实现
                return doInvoke(invocation);
            } catch (InvocationTargetException e) {
                // ...
            } catch (RpcException e) {
                // ...
            } catch (Throwable e) {
                return new RpcResult(e);
            }
        }
    
        protected abstract Result doInvoke(Invocation invocation) throws Throwable;
        
        // 省略其他方法
    }
    

    DubboInvoker 具体实现

    public class DubboInvoker<T> extends AbstractInvoker<T> {
        
        private final ExchangeClient[] clients;
        
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            // 设置 path 和 version 到 attachment 中
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                // 从 clients 数组中获取 ExchangeClient
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                // 获取异步配置
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                // isOneway 为 true,表示“单向”通信
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    
                // 异步无返回值
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    // 发送请求
                    currentClient.send(inv, isSent);
                    // 设置上下文中的 future 字段为 null
                    RpcContext.getContext().setFuture(null);
                    // 返回一个空的 RpcResult
                    return new RpcResult();
                } 
    
                // 异步有返回值
                else if (isAsync) {
                    // 发送请求,并得到一个 ResponseFuture 实例
                    ResponseFuture future = currentClient.request(inv, timeout);
                    // 设置 future 到上下文中
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    // 暂时返回一个空结果
                    return new RpcResult();
                } 
    
                // 同步调用
                else {
                    RpcContext.getContext().setFuture(null);
                    // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(..., "Invoke remote method timeout....");
            } catch (RemotingException e) {
                throw new RpcException(..., "Failed to invoke remote method: ...");
            }
        }
        
        // 省略其他方法
    }
    

    ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。HeaderExchangeClient 封装了一些关于心跳检测的逻辑

    HeaderExchangeChannel定义request 并且send

    final class HeaderExchangeChannel implements ExchangeChannel {
        
        private final Channel channel;
        
        HeaderExchangeChannel(Channel channel) {
            if (channel == null) {
                throw new IllegalArgumentException("channel == null");
            }
            
            // 这里的 channel 指向的是 NettyClient
            this.channel = channel;
        }
        
        @Override
        public ResponseFuture request(Object request) throws RemotingException {
            return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        }
    
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(..., "Failed to send request ...);
            }
            // 创建 Request 对象
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            // 设置双向通信标志为 true
            req.setTwoWay(true);
            // 这里的 request 变量类型为 RpcInvocation
            req.setData(request);
                                            
            // 创建 DefaultFuture 对象
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 调用 NettyClient 的 send 方法发送请求
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            // 返回 DefaultFuture 对象
            return future;
        }
    }
    
    image.png

    客户端encodeRequest

    public class ExchangeCodec extends TelnetCodec {
    
        // 消息头长度
        protected static final int HEADER_LENGTH = 16;
        // 魔数内容
        protected static final short MAGIC = (short) 0xdabb;
        protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
        protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
        protected static final byte FLAG_REQUEST = (byte) 0x80;
        protected static final byte FLAG_TWOWAY = (byte) 0x40;
        protected static final byte FLAG_EVENT = (byte) 0x20;
        protected static final int SERIALIZATION_MASK = 0x1f;
        private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
    
        public Short getMagicCode() {
            return MAGIC;
        }
    
        @Override
        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            if (msg instanceof Request) {
                // 对 Request 对象进行编码
                encodeRequest(channel, buffer, (Request) msg);
            } else if (msg instanceof Response) {
                // 对 Response 对象进行编码,后面分析
                encodeResponse(channel, buffer, (Response) msg); 
            } else {
                super.encode(channel, buffer, msg);
            }
        }
    
        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
            Serialization serialization = getSerialization(channel);
    
            // 创建消息头字节数组,长度为 16
            byte[] header = new byte[HEADER_LENGTH];
    
            // 设置魔数
            Bytes.short2bytes(MAGIC, header);
    
            // 设置数据包类型(Request/Response)和序列化器编号
            header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    
            // 设置通信方式(单向/双向)
            if (req.isTwoWay()) {
                header[2] |= FLAG_TWOWAY;
            }
            
            // 设置事件标识
            if (req.isEvent()) {
                header[2] |= FLAG_EVENT;
            }
    
            // 设置请求编号,8个字节,从第4个字节开始设置
            Bytes.long2bytes(req.getId(), header, 4);
    
            // 获取 buffer 当前的写位置
            int savedWriteIndex = buffer.writerIndex();
            // 更新 writerIndex,为消息头预留 16 个字节的空间
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            // 创建序列化器,比如 Hessian2ObjectOutput
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            if (req.isEvent()) {
                // 对事件数据进行序列化操作
                encodeEventData(channel, out, req.getData());
            } else {
                // 对请求数据进行序列化操作
                encodeRequestData(channel, out, req.getData(), req.getVersion());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();
            
            // 获取写入的字节数,也就是消息体长度
            int len = bos.writtenBytes();
            checkPayload(channel, len);
    
            // 将消息体长度写入到消息头中
            Bytes.int2bytes(len, header, 12);
    
            // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
            buffer.writerIndex(savedWriteIndex);
            // 从 savedWriteIndex 下标处写入消息头
            buffer.writeBytes(header);
            // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        }
        
        // 省略其他方法
    }
    

    服务端调用栈

    public class DemoServiceImpl implements DemoService {
     
        public String sayHello(String name) throws RemoteException {
            return "Hello " + name;
        }
    }
    

    上面这个类会被封装成为一个 AbstractProxyInvoker 实例,并新生成一个 Exporter 实例。
    这样当网络通讯层收到一个请求后,会找到对应的 Exporter 实例,并调用它所对应的 AbstractProxyInvoker 实例,从而真正调用了服务提供者的代码。
    Dubbo 里还有一些其他的 Invoker 类,但上面两种是最重要的。

    NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
      —> AbstractPeer#received(Channel, Object)
        —> MultiMessageHandler#received(Channel, Object)
          —> HeartbeatHandler#received(Channel, Object)
            —> AllChannelHandler#received(Channel, Object)
              —> ExecutorService#execute(Runnable)    // 由线程池执行后续的调用逻辑
    

    服务端 decode

    public class ExchangeCodec extends TelnetCodec {
        
        @Override
        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int readable = buffer.readableBytes();
            // 创建消息头字节数组
            byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
            // 读取消息头数据
            buffer.readBytes(header);
            // 调用重载方法进行后续解码工作
            return decode(channel, buffer, readable, header);
        }
    
        @Override
        protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
            // 检查魔数是否相等
            if (readable > 0 && header[0] != MAGIC_HIGH
                    || readable > 1 && header[1] != MAGIC_LOW) {
                int length = header.length;
                if (header.length < readable) {
                    header = Bytes.copyOf(header, readable);
                    buffer.readBytes(header, length, readable - length);
                }
                for (int i = 1; i < header.length - 1; i++) {
                    if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                        buffer.readerIndex(buffer.readerIndex() - header.length + i);
                        header = Bytes.copyOf(header, i);
                        break;
                    }
                }
                // 通过 telnet 命令行发送的数据包不包含消息头,所以这里
                // 调用 TelnetCodec 的 decode 方法对数据包进行解码
                return super.decode(channel, buffer, readable, header);
            }
            
            // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
            if (readable < HEADER_LENGTH) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // 从消息头中获取消息体长度
            int len = Bytes.bytes2int(header, 12);
            // 检测消息体长度是否超出限制,超出则抛出异常
            checkPayload(channel, len);
    
            int tt = len + HEADER_LENGTH;
            // 检测可读的字节数是否小于实际的字节数
            if (readable < tt) {
                return DecodeResult.NEED_MORE_INPUT;
            }
            
            ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    
            try {
                // 继续进行解码工作
                return decodeBody(channel, is, header);
            } finally {
                if (is.available() > 0) {
                    try {
                        StreamUtils.skipUnusedStream(is);
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }
    

    Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler

    image.png
            ExecutorService cexecutor = getExecutorService();
            try {
                // 将连接事件派发到线程池中处理
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException(..., " error when process connected event .", t);
            }
    
    

    ChannelEventRunnable

    ChannelEventRunnable#run()
      —> DecodeHandler#received(Channel, Object)
        —> HeaderExchangeHandler#received(Channel, Object)
          —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
            —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
              —> Filter#invoke(Invoker, Invocation)
                —> AbstractProxyInvoker#invoke(Invocation)
                  —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                    —> DemoServiceImpl#sayHello(String)
    

    相关文章

      网友评论

          本文标题:Dubbo的调用栈详细介绍

          本文链接:https://www.haomeiwen.com/subject/lckvbktx.html