美文网首页
六 捋代码--dubbo源码调用之服务端

六 捋代码--dubbo源码调用之服务端

作者: 爱编程的凯哥 | 来源:发表于2018-10-29 08:28 被阅读30次

说完调用的消费端,下面该说说请求到了服务端,是怎么处理的了。首先让我们回顾下服务端服务发布模型图


dubbo服务端模型

根据netty通信的知识和模型图,我们知道此时服务端接收到消息会调用NettyServerHandler的channelRead方法

  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

貌似和消费端上章我们分析端类似,依次类推,我们一样会找到AllChannelHandler类

   public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
            //fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

一样的逻辑,通过ChannelEventRunnable异步线程进行消息处理,按模型图走,一样会走到HeaderExchangeHandler类中

  public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

处理逻辑

                   if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }

当是双向时会调用有返回结果到方法handleRequest

   Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) msg = null;
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            return res;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }

这时候我们看到new了一个Response类,他到id就是request到id,对应上我们消费端返回时到双线程通信问题了。最后调用落到 handler.reply(channel, msg)方法,模型图显示到了DubboProtocol$requestHandler类中

      public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
          if (message instanceof Invocation) {
              Invocation inv = (Invocation) message;
              //获取通道缓存对应的invoker
              Invoker<?> invoker = getInvoker(channel, inv);
              //如果是callback 需要处理高版本调用低版本的问题
              if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                  String methodsStr = invoker.getUrl().getParameters().get("methods");
                  boolean hasMethod = false;
                  if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                      hasMethod = inv.getMethodName().equals(methodsStr);
                  } else {
                      String[] methods = methodsStr.split(",");
                      for (String method : methods) {
                          if (inv.getMethodName().equals(method)) {
                              hasMethod = true;
                              break;
                          }
                      }
                  }
                  if (!hasMethod) {
                      logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                      return null;
                  }
              }
              RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
              return invoker.invoke(inv);
          }
          throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
      }

我们可以看到, Invoker<?> invoker = getInvoker(channel, inv)方法去获取了我们发布时生成的invoker,这个invoker是谁?对,就是缓存在exporterMap的DubboExporter重点invoker对象


    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
      .......
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

        return exporter.getInvoker();
    }

找到它就好办了,结合模型图和我们分析的发布知识,我们可以看到


模型图invoker部分

最后调用了当时proxyFactory生成的抽象类中

return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };

而此类的实际调用者是我们生成的一个泛式的wrapper

public class Wrapper1 extends Wrapper {

    public static String[] pns;
    public static java.util.Map pts;
    public static String[] mns = new String[]{"sayHello"};
    public static String[] dmns = new String[]{"sayHello"};
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String n) {
        return pts.containsKey(n);
    }

    public Class getPropertyType(String n) {
        return (Class) pts.get(n);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object o, String n, Object v) {
        com.alibaba.dubbo.kai.api.imp.HelloApiImpl w;
        try {
            w = ((com.alibaba.dubbo.kai.api.imp.HelloApiImpl) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + n + "\" filed or setter method in class com.alibaba.dubbo.kai.api.imp.HelloApiImpl.");
    }


    public Object getPropertyValue(Object o, String n) {
        com.alibaba.dubbo.kai.api.imp.HelloApiImpl w;
        try {
            w = ((com.alibaba.dubbo.kai.api.imp.HelloApiImpl) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + n + "\" filed or setter method in class com.alibaba.dubbo.kai.api.imp.HelloApiImpl.");
    }

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.alibaba.dubbo.kai.api.imp.HelloApiImpl w;
        try {
            w = ((com.alibaba.dubbo.kai.api.imp.HelloApiImpl) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            //和cglib写法类似,不通过反射调用,效率更高
            if ("sayHello".equals(n) && p.length == 0) {
                return  w.sayHello();
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.kai.api.imp.HelloApiImpl.");
    }
}

调用他的invokeMethod方法,显示最后直接调用了ref(我们实际业务对象)的相应方法,最后返回业务结果。
获取到结果后,我们返回到HeaderExchangeHandler到received方法

  Response response = handleRequest(exchangeChannel, request);
  channel.send(response);

下面就是发送结果了,那么根据我们的分析channel应该是对应的NettyChannel

  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //包装后的NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

调用NettyChannel的send方法

 public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

这就和上一章一样,且对应上了上章的消费端接收。

到此,调用的消费端和服务端我们就都分析完了。最后再回到我们开章的调用时序图,就基本已经分析完了。


dubbo调用时许图

好了,dubbo基本调用完成,但其中我们还有些遗留

  • 负载均衡算法
  • 各个filter实现作用
  • 其他协议但实现概览
  • 监控但实现
  • 其他注册中心的实现
    以后有时间大家再一起学习下其他内容。

下一章     手写dubbo核心调用
首页     dubbo源码欣赏简介

相关文章

网友评论

      本文标题:六 捋代码--dubbo源码调用之服务端

      本文链接:https://www.haomeiwen.com/subject/kbbmtqtx.html