美文网首页
Dubbo源码分析(六) 服务调用

Dubbo源码分析(六) 服务调用

作者: skyguard | 来源:发表于2018-11-12 15:31 被阅读0次

下面我们来说一下Dubbo的服务调用。我们之前说过,通过ProxyFactory获取到Invoker对象,然后执行方法调用,那么具体是怎么执行服务的调用呢,下面我们就来分析一下。
先看一下AbstractInvoker的invoke方法

public Result invoke(Invocation inv) throws RpcException {
    if (destroyed.get()) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion()
                + " is DESTROYED, can not be invoked any more!");
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 设置 `invoker` 属性
    invocation.setInvoker(this);
    // 添加公用的隐式传参,例如,`path` `interface` 等等,详见 RpcInvocation 类。
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    // 添加自定义的隐式参数
    Map<String, String> context = RpcContext.getContext().getAttachments();
    if (context != null) {
        invocation.addAttachmentsIfAbsent(context);
    }
    // 设置 `async=true` ,若为异步方法
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    // 执行调用
    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { 
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e);
        } else {
            throw e;
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

再来看一下DubboInvoker的doInvoke方法

protected Result doInvoke(final Invocation invocation) {
    RpcInvocation inv = (RpcInvocation) invocation;
    // 获得方法名
    final String methodName = RpcUtils.getMethodName(invocation);
    // 获得 `path`( 服务名 ),`version`
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    // 获得 ExchangeClient 对象
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    // 远程调用
    try {
        // 获得是否异步调用
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 获得是否单向调用
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 获得超时时间
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 单向调用
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        // 异步调用
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        // 同步调用
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

然后就是调用ExchangeClient去发送请求数据了。
服务端接受到请求,通过NettyHandler处理,调用ExchangeChannel处理,根据channel和Invocation 获取之前暴露的DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);这个key是接口全限定名:端口号,然后exporter的getInvoker方法返回具体的方法代理Invoker。给上下文RpcContext设置远程地址,开始invoke调用,然后调用方法,获取返回结果。往HeaderExchangeHandler的channel通道里写入回应 channel.send(response);在NettyChannel父类AbstractChannel父类AbstractPeer设置发送标志sent也就是超时,ChannelFuture future = channel.write(message);消费端接收到服务端的返回结果后,在HeaderExchangeHandler的received方法中处理结果handleResponse方法,调用DefaultFuture的received方法接收结果。然后调用returnFromResponse方法返回结果。
Dubbo的服务调用就分析到这里了。

相关文章

网友评论

      本文标题:Dubbo源码分析(六) 服务调用

      本文链接:https://www.haomeiwen.com/subject/ebxcfqtx.html