美文网首页
Dubbo远程调用

Dubbo远程调用

作者: DH大黄 | 来源:发表于2021-12-24 10:25 被阅读0次

    远程调用

    服务消费者

    Consumer远程调用流程主要可以用一下的这张流程图表示

    流程图

    Dubbo服务消费者服务调用流程图.png

    主要流程

    主要为以下的几个步骤

    • 调用对应的代理类
    • 被InvokerInvacationHandler拦截
    • ClusterInvoker经过路由过滤,负载均衡,选择其中一个Invoker,发起远程调用(带请求ID)

    相关源码

    public class JavassistProxyFactory extends AbstractProxyFactory {
    
      @Override
      @SuppressWarnings("unchecked")
      public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
                // InvokerInvocationHandler (重点关注)
                // 远程调用时,调用的方法会被 InvokerInvocationHandler 拦截
          return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
      }
    
      @Override
      public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
          // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
          final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
          return new AbstractProxyInvoker<T>(proxy, type, url) {
              @Override
              protected Object doInvoke(T proxy, String methodName,
                                        Class<?>[] parameterTypes,
                                        Object[] arguments) throws Throwable {
                  return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
              }
          };
      }
    

    org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 获取调用的远程方法名
        String methodName = method.getName();
        // 获取参数
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        // 构建一个dubbo rpc invocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
    
        // invoker.getUrl() returns consumer url.
        RpcContext.setRpcContext(invoker.getUrl());
    
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }
    
        // 远程调用
        return invoker.invoke(rpcInvocation).recreate();
    }
    
    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
    
        // binding attachments into invocation.
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }
    
        // 实际上就是directroy.list 通过方法名寻找invokers 里面回去做一些过滤 获取过滤后的invoke列表
        List<Invoker<T>> invokers = list(invocation);
        // 根据@SPI选择负载均衡的策略
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance); // 调用子类的方法
    }
    
    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            // 根据负载均衡规则找出一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 远程调用(每次请求都有一个唯一的ID)
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    

    服务提供者

    在Consumer发起调用后,Provider就需要去处理客户端的请求了。

    消费端发起TCP链接并完成后,服务提供方的NettyServer的connected方法会被激活。摘自 “深度剖析Apache Dubbo核心技术内幕”

    流程图

    Dubbo服务器提供者处理流程图.png

    主要流程

    • 服务端的NettyServer处理请求,最终会调用到DubboProtol#reply
    • 根据客户端的请求,从ExportedMap中选择对应的Invoker
    • 调用Invoker具体业务类的方法
    • 返回处理结果

    相关源码

    NettyServer的Handler处理请求

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            if (channel != null) {
                channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
            }
            handler.connected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    
        if (logger.isInfoEnabled()) {
            logger.info("The connection between " + channel.getRemoteAddress() + " and " + channel.getLocalAddress() + " is established");
        }
    }
    

    AllChannelHandler

    @Override
    public void connected(Channel channel) throws RemotingException {
      ExecutorService executor = getExecutorService();
      try {
            // 封装成ChannelEventRunnable,丢到线程中处理
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
      } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
      }
    }
    

    HeaderExchangeHandler

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        // 通过请求id,构建一个Response
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            // 获取请求信息 方法名之类的
            Object data = req.getData();
    
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
    
            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // 最终调用 DubboProtocol reply
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
    

    DubboProtocol

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    @Override
    public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
        if (!(message instanceof Invocation)) {
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
    
        Invocation inv = (Invocation) message;
        // 根据inv获取Invoker   去exporterMap中找
        Invoker<?> invoker = getInvoker(channel, inv);
        // need to consider backward-compatibility if it's a callback
        if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
            String methodsStr = invoker.getUrl().getParameters().get("methods");
            boolean hasMethod = false;
            if (methodsStr == null || !methodsStr.contains(",")) {
                hasMethod = inv.getMethodName().equals(methodsStr);
            } else {
                String[] methods = methodsStr.split(",");
                for (String method : methods) {
                    if (inv.getMethodName().equals(method)) {
                        hasMethod = true;
                        break;
                    }
                }
            }
            if (!hasMethod) {
                logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                        + " not found in callback service interface ,invoke will be ignored."
                        + " please update the api interface. url is:"
                        + invoker.getUrl()) + " ,invocation is :" + inv);
                return null;
            }
        }
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        // 调用对应的invoke方法(最终wrapper.invokeMethod,参考服务暴露文档)
        Result result = invoker.invoke(inv);
        return result.thenApply(Function.identity());
    }
    

    总结

    总结为一句话就是:客户端在发起远程调用时,具体的代理类会被InvokerInvacationHandler拦截,在这里面根据一些条件和负载均衡策略,选择出其中一个符合条件的Invoker,进行远程调用。提供者收到请求后,会从ExpoterMap中选择对应的Invoker(Wrapper包装),最终调用到具体的实现类。处理完请求后将结果返回。返回后客户端根据之前传过去的请求ID,找到之前的请求,然后再进行自己的业务处理

    相关文章

      网友评论

          本文标题:Dubbo远程调用

          本文链接:https://www.haomeiwen.com/subject/exkkqrtx.html