美文网首页
Dubbo系列--消费者服务调用《六》

Dubbo系列--消费者服务调用《六》

作者: Teddy_b | 来源:发表于2023-11-29 12:06 被阅读0次

接口调用

在示例代码中

HelloService demoService = bootstrap.getCache().get(reference);
        String message = demoService.sayHello("dubbo");

这里第一步就是从缓存中获取创建的代理对象,就是上一篇Dubbo系列--消费者服务引用《五》提及的代理对象

然后调用代理对象的接口方法

InvocationHandler

回顾下代理对象的接口方法实现

public java.lang.String sayHello(java.lang.String arg0) {
                     Object[] args = new Object[1];
                    // $w是javassist中的类型转换,基础类型转为包装类型,其它类型会忽略这个
                     args[0] = ($w)$1; 
                     Object ret = handler.invoke(this, methods[0], args); 
                     return (java.lang.String)ret;
            }

可以看到它调用的是InvocationHandlerinvoke方法

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ...
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
        ...
                return invoker.invoke(rpcInvocation).recreate();
       ...
    }

而这个invoke方法实际调用的是上一篇Dubbo系列--消费者服务引用《五》中创建的Invoke对象的invoke方法

在回顾下我们创建的代理对象


image.png

可以看到它引用的是MigrationInvoker对象,而MigrationInvoker对象又同时持有两个invoker

  • invoker: 对应接口级别的Invoker对象

  • serviceDiscoveryInvoker:对应应用级别的Invoker对象

上一篇中我们也提及了默认的currentAvailableInvoker指向的是应用级别的Invoker对象

Invoker

我们进一步分析下MigrationInvokerinvoker方法

  • MigrationInvoker:同时持有两个invoker,分别对应接口级别的Invoker对象和应用级别的Invoker对象
 @Override
    public Result invoke(Invocation invocation) throws RpcException {
        if (currentAvailableInvoker != null) {
            ...
                return decideInvoker().invoke(invocation);
          ...
        }
}

这里需要先决定使用哪一个Invoker对象,因为默认的currentAvailableInvoker指向的是应用级别的Invoker对象,所以只要应用级别的Invoker对象可用,就会优先使用它(只要Netty客户端连接还存活就是可用)

  • ScopeClusterInvoker:增加了Injvm协议的判断,远程调用则什么都没干
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        ...
        return invoker.invoke(invocation);
    }
  • MockClusterInvoker:判断是否是测试接口,非测试接口调用什么都不干
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result;

        String value = getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (ConfigUtils.isEmpty(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
}
  • ClusterFilterInvoker:由于之前创建ClusterInvoker的时候(ClusterInvoker<T>) cluster.join(directory, true);第二个参数指定为了true, 因此会创建Invoker的过滤器链,所以这里会走到ClusterFilterInvoker,它就是开始Invoker的过滤器链
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            return filterInvoker.invoke(invocation);
        }
  • FilterChainBuilder$CallbackRegistrationInvoker:异步接收返回结果
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
                asyncResult = filter.invoke(nextNode, invocation);
}
  • FilterChainBuilder$CopyOfFilterChainNode:是一个接口内部类,它把所有的Invoker过滤器都串起来
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
                asyncResult = filter.invoke(nextNode, invocation);
}

它会依次经过

  • ConsumerContextFilter:设置RpcContext
  • ConsumerClassLoaderFilter:设置当前线程的类加载器
  • ContextHolderParametersSelectedTransferFilter:
  • ObservationSenderFilter:
  • FutureFilter:
  • MetricsClusterFilter:
  • MonitorFilter:
  • RouterSnapshotFilter:

这些过滤器都是针对特性定制的,一般情况下是啥也没干,经过这些过滤器后,才会到达FailoverClusterInvoker,由于它没有重写invoke方法,因此会直接使用父类AbstractClusterInvoker中的

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        ...
        List<Invoker<T>> invokers = list(invocation);
        ...

        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        ...
            return doInvoke(invocation, invokers, loadbalance);
        ...
    }

这里主要包括三步

  • 首先会从RegistryDirectory中获取可用的Invoker,对应上一篇文章里记录的
    image.png
@Override
    public List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,
                                   BitList<Invoker<T>> invokers, Invocation invocation) {
        ...
            List<Invoker<T>> result = singleRouterChain.route(getConsumerUrl(), invokers, invocation);
            return result == null ? BitList.emptyList() : result;
        ...
    }

可用看到RegistryDirectory是从RouterChain中获取的Invoker,这里的RouterChain对应的也是上一篇文章里保存在RegistryDirectory中的

image.png

默认使用的是当前的RouterChain,对应的是mainChain

public List<Invoker<T>> simpleRoute(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {
        BitList<Invoker<T>> resultInvokers = availableInvokers.clone();

        // 1. route state router
        resultInvokers = headStateRouter.route(resultInvokers, url, invocation, false, null);
        ...

        if (routers.isEmpty()) {
            return resultInvokers;
        }
        ...
    }

最终会进入路由链里找Invoker

首先进入的是StateRouter链,这个链通过抽象类AbstractStateRouter中的方法串起来

@Override
    public final BitList<Invoker<T>> route(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder) throws RpcException {
       ...
        BitList<Invoker<T>> routeResult;

        routeResult = doRoute(invokers, url, invocation, needToPrintMessage, nodeHolder, messageHolder);
        if (routeResult != invokers) {
            routeResult = invokers.and(routeResult);
        }
        // check if router support call continue route by itself
        if (!supportContinueRoute()) {
            // use current node's result as next node's parameter
            if (!shouldFailFast || !routeResult.isEmpty()) {
                routeResult = continueRoute(routeResult, url, invocation, needToPrintMessage, nodeHolder);
            }
        }

       ...
        return routeResult;
    }

它会调用子类的doRoute方法寻找Invoker,如果子类返回的Invoker和当前的Invoker不同,则把子类的Invoker添加到当前的Invoker中

这里当前的Invoker对应的就是RegistryDirectory中记录的Invoker

然后通过continueRoute继续寻找下一个子类,它会依次经过

  • MockInvokersSelector

  • MeshRuleRouter

  • TagStateRouter

  • XdsRouter

  • ListenableStateRouter

  • AppScriptStateRouter

  • TailStateRouter

经过所有StateRouter的子类链处理后,得到所有的Invoker,一般情况下得到的Invoker仍然是只有RegistryDirectory中记录的Invoker

然后会进入Router链,由于默认情况下Router链为空,所以会直接返回

最终返回的仍然是RegistryDirectory中记录的Invoker,但是经过了StateRouter链和Router链的处理

  • 第二步是初始化负载均衡
    也是通过SPI方式加载LoadBalance的实现类,接口定义上指定了默认的实现类是random对应的RandomLoadBalance
@SPI("random")
public interface LoadBalance {
}
  • 第三步是进入FailoverClusterInvokerdoInvoker
@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 重试次数,默认是3次
        int len = calculateInvokeTimes(methodName);
        ...
        for (int i = 0; i < len; i++) {
            ...
             // 从众多Invoker中挑选一个,通过负载均衡策略来挑选
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getServiceContext().setInvokers((List) invoked);
            boolean success = false;
            try {
                Result result = invokeWithContext(invoker, invocation);
                ...
                success = true;
                return result;
            }...

这里通过重试的方式来执行Invoker,默认情况下重试3次

然后通过负载均衡策略挑选一个Invoker执行,由于我们这里只有一个Invoker,所有会直接返回,还没到负载均衡去挑选的那一步

最后就是通过这个Invoker去执行了,按照上一篇文章中,我们的Invoker是通过Wrapper包装的DubboInvoker

image.png
  • ReferenceCountInvokerWrapper:这个Invoker没干啥
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            ...
            return invoker.invoke(invocation);
        ...
    }

然后又要经过过滤器链

  • MetricsFilter

  • ListenerInvokerWrapper

最终达到DubboInvoker

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        ...
        List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();
        if (exchangeClients.size() == 1) {
            currentClient = exchangeClients.get(0);
        } else {
            currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            Request request = new Request();
            if (payload != null) {
                request.setPayload(payload);
            }
            request.setData(inv);
            request.setVersion(Version.getProtocolVersion());

            if (isOneway) {
                ...
            } else {
                request.setTwoWay(true);
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
                ...
                return result;
            }
        }...
    }

可用看到这里就是取出之前创建的Netty客户端,然后判断是否oneWay(即不需要返回结果的)

然后通过Netty客户端把请求发送出去,请求包装了Invocation对象

拿到返回结果后在进行解析

至此才完成了一次远程调用

相关文章

  • dubbo源码分析-服务调用

    dubbo服务调用,就是服务消费者发起调用服务提供者接口的过程。是对前面服务导出,服务引用,服务集群、路由和负载均...

  • Dubbo 服务调用 总结(八)

    笔记简述结合之前学习的两篇笔记 Dubbo 服务调用 源码学习(上)(六)和 Dubbo 服务调用 源码学习(下)...

  • 2.Dubbo源码阅读-配置篇

    Dubbo的分支: 3.0Dubbo的服务提供者会将RPC服务的调用说明,导出到配置中心。然后服务的消费者向配置中...

  • Java进阶-Dubbo-进阶

    一、服务调用过程 1.1 服务调用方式   Dubbo 服务调用过程:   Dubbo 支持同步和异步两种调用方式...

  • Maven打包插件

    Dubbo做一个服务提供者,以Dubbo方法自提供的Main方法启动,供消费者调用,希望做到类似SpringBoo...

  • Dubbo剖析-监控平台的搭建与使用

    一、前言 dubbo-monitor主要用来统计服务的调用次调和调用时间的监控中心,服务消费者和提供者,在内存中累...

  • dubbo整体架构

    一、dubbo 核心调用链路 消费者、生产者、注册中心、监控中心 二、dubbo详细流程调用图 三、dubbo 分...

  • Dubbo后台管理和监控中心部署

    通过dubbo监控中心和后台管理可以很好的监控dubbo服务,监控服务端服务和客户端调用情况,调用次数,调用日志,...

  • Dubbo剖析-服务消费方Invoker到客户端接口的转换

    一、前言 前面dubbo整体架构分析里面我们讲解了服务消费者消费一个服务的详细过程是,首先 调用 Protocol...

  • Dubbo剖析-服务消费方远程服务到Invoker的转换

    一、前言 前面dubbo整体架构分析里面我们讲解了服务消费者消费一个服务的详细过程是,首先 调用 Protocol...

网友评论

      本文标题:Dubbo系列--消费者服务调用《六》

      本文链接:https://www.haomeiwen.com/subject/wrlxgdtx.html