美文网首页
详解Dubbo(三):消费端构造Invoker

详解Dubbo(三):消费端构造Invoker

作者: 空挡 | 来源:发表于2020-04-18 23:07 被阅读0次

    前言

    上一篇讲Proxy的文章中看到,构建Proxy需要传入Invoker参数。除基本方法外,其它接口方法的调用最终都是调用的invoker.invoke()方法。从rpc调用的整个流程来说,Invoker正好处在中间的位置,它的左边是用户的应用,调用的都是对象和方法。而它的右边是传输层,操作的是Request/Response,所以Invoker就是中间的桥梁。

    Invoker结构

    下面Invoker相关类的关系图,这只是其中最重要的部分:

    Invoker
    从上面图中可以看到,Invoker大体上分成两个部分,针对集群的ClusterInvoker和针对特定协议的Invoker。下面先从针对特定协议的Invoker开始。

    Protocol和Invoker

    从上一篇的ReferenceBean初始化中可以知道,消费端针对某个服务接口创建Invoker的时候,首先需要获取到URL。最简单的例子就是在@Reference注解上配置了url地址,而且这个地址不是注册中心的地址。

    指定协议的URL

    最简单的url比如dubbo://10.0.75.1:20880/org.apache.dubbo.demo.DemoService?&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&side=provider&timestamp=1585553085050
    ReferenceBean拿到这个url后就会去找它对应的Protocol类,根据url的schema, Dubbo可以找到DubboProtocol,然后调用Protocol的refer方法获取到Invoker,这个方法在AbstractProtocol类里面。

       @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
        }
    

    实际上调用的是子类的protocolBindingRefer()方法,这里外层封装的AsyncToSyncInvoker是一个装饰类,因为新版本的dubbo把所有Invoker调用都改成了异步返回,如果Consumer仍然希望同步调用,则用这个装饰类转换一下。下面看下DubboProtocol的protocolBindingRefer()方法实现:

        @Override
        public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
    
            // 创建Dubbo Invoker
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
    
            return invoker;
        }
    

    该方法直接创建了一个DubboInvoker,总共传入四个参数,除了接口和url外,第三个参数是构建传输层Client,前面讲过Invoker连接了Proxy和传输层,当Invoker发起调用时,就需要这个ExchangeClient来发送请求和接收Response,Exchange层的解析会包含在后续的文章中。第四个参数是Invoker的缓存集合,不是Protocol用的,所以不去管它。
    前一篇文章讲过,当Proxy最终接收到方法调用后,会调用Invoker.invoke()来发起远程调用,下面来看下DubboInvoker.invoke()是怎么实现的。
    DubboInvoker
    对invoke()方法的调用首先会进到DubboInvoker的父类AbstractInvoker中:

        @Override
        public Result invoke(Invocation inv) throws RpcException {
            // 判断invoker是否已经destroy了,是则打印警告,调用继续
            if (destroyed.get()) {
                logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                        + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
            }
            //追加RpcContext中的附加信息到Invocation中,比如链路追踪的Id等
            RpcInvocation invocation = (RpcInvocation) inv;
            invocation.setInvoker(this);
            if (CollectionUtils.isNotEmptyMap(attachment)) {
                invocation.addObjectAttachmentsIfAbsent(attachment);
            }
            Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
            if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
                invocation.addObjectAttachments(contextAttachments);
            }
            //设置是同步还是异步调用
            invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
            //如果是异步调用,给这次请求加一个唯一id
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            AsyncRpcResult asyncResult;
            try {
                //调用子类的doInvoke()方法
                asyncResult = (AsyncRpcResult) doInvoke(invocation);
            } catch (InvocationTargetException e) { // biz exception
                //异常处理
                ...
            } catch (RpcException e) {
                //异常处理
                ...
            } catch (Throwable e) {
                //异常处理
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            }
            RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
            return asyncResult;
        }
    

    AbstractInvoker最终调用了DubboInvokerdoInvoke()方法。

    @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            inv.setAttachment(VERSION_KEY, version);
            //获取Dubbo协议的exchangeClient
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
                //如果Oneway调用,即Consumer端不关心调用是否成功,则发送请求后直接返回结果。多用在日志发送这种可以容忍数据丢失的场景
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else {
                    //2.7之后所有调用都改成异步,讲Future放入result中,如果Consumer调用是同步的,上面的Protocol的refer()会阻塞等待异步结果返回
                    ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                    CompletableFuture<AppResponse> appResponseFuture =
                            currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                    AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                    result.setExecutor(executor);
                    return result;
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    上面就是当@Reference上配置了单个url,并且这个url指定了具体协议的情况,下面看下当url是注册中心的情况。

    注册中心的URL

    之前的文章讲过,@Reference关联的注册中心的url格式类似于registry://localhost:2181?refer=version%3f1.0.0,所以dubbo可以基于url找到对应的Protocol类为RegistryProtocol,现在看下这个类的refer()方法如何处理的:

        @Override
        @SuppressWarnings("unchecked")
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            //1. 转换成具体注册中心实现的url
            url = getRegistryUrl(url);
            //2. 获取注册中心实现
            Registry registry = registryFactory.getRegistry(url);
            //3. 如果是获取RegistryService的代理,则直接获取本地暴露的invoker
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            //4. 判断url是否指定了分组信息
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                    //指定了分组,则使用MergeableCluster
                    return doRefer(getMergeableCluster(), registry, type, url);
                }
            }
            //5. 获取Cluster Invoker
            return doRefer(cluster, registry, type, url);
        }
    

    第1步,首先需要将url转换成真实注册中心的地址。dubbo是支持多注册中心的,而配置中获取的是一个通用的注册中心url,以registry://开头,这一步转成真正的注册中心url,比如从registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService&registry=zookeeper 转成 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService
    第2步,根据真实的url获取到注册中心的实现类,比如上面的url获取到的就是使用zookeeper注册中心,获取的就是ZookeeperRegistry
    第3步,这里是对获取注册中心实例代理的特殊处理,暂时不看
    第4步,dubbo支持将多个远程服务调用结果做合并来做为最终结果,通过配置一个merger类来实现
    第5步,没有指定group的话,则使用默认的Cluster构造Invoker
    上面方法的主要就是获取Registry的实现,然后调用doRefer()方法:

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            //1. 构建directory实例
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // 2. 生成consumer URL
            Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
            URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            //3. 将consumer信息写入注册中心
            if (directory.isShouldRegister()) {
                directory.setRegisteredConsumerUrl(subscribeUrl);
                registry.register(directory.getRegisteredConsumerUrl());
            }
            //4. 构建RouteChain
            directory.buildRouterChain(subscribeUrl);
            //5. 订阅服务变化通知
            directory.subscribe(toSubscribeUrl(subscribeUrl));
            //6. 生成ClusterInvoker
            Invoker<T> invoker = cluster.join(directory);
            List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
            if (CollectionUtils.isEmpty(listeners)) {
                return invoker;
            }
            //7. 回调Listener
            RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
            for (RegistryProtocolListener listener : listeners) {
                listener.onRefer(this, registryInvokerWrapper);
            }
            return registryInvokerWrapper;
        }
    

    在上面的doRefer()方法中,首先为服务生成RegistryDirectory实例,该类的作用是关联DirectoryRegistry接口,前面的白话Dubbo系列中已经讲过,不清楚的话可以回查一下。随后,Consumer会将自己也注册到注册中心,所以可以通过注册中心的数据看到某个Provider都被谁消费,也可以看到某个Consumer都调用了哪些服务。
    第5步中,订阅注册中心的数据变化,在provider变化时可以实时收到通知
    第6步中,生成最终的ClusterInvoker,Dubbo默认配置中,这里的Cluster是FailoverCluster,join()方法返回FailoverClusterInvoker

    ClusterInoker实现

    ClusterInvoker是Dubbo支持集群调用的核心实现,包括负载均衡、特殊路由、容错处理等。默认实现类FailoverClusterInvoker支持用户配置重试次数,可以在一个节点失败重试其它节点。
    AbstractClusterInvoker:

    @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            //判断Invoker是否已经destroy,是则抛出异常
            checkWhetherDestroyed();
            // 将attachments加到Invocation中
            Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
            }
           // 获取可用invoker列表
            List<Invoker<T>> invokers = list(invocation);
           //根据配置获取指定的负载均衡实现
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance);
        }
    

    ClusterInvoker的invoke()方法首先调用list()方法获取所有可用invoker列表,这里的是直接调用的Directory的list方法,Directory缓存了从注册中心获取的provider url列表,会将每个url生成invoker。
    在获取到一组invoker后需要从其中选择一个发起调用,这时候就需要用到负载均衡,最终根据获取的invoker列表和负载均衡器调用子类的具体实现。
    FailoverClusterInvoker:

    @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            // 获取重试次数,最低可配置在方法粒度
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                // 使用负载均衡最终选择一个invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn(...);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(...);
        }
    

    上面的逻辑主要是两点,根据配置的重试次数来决定是否重试,根据负载均衡实现从注册中心返回的可用服务中选择其中一个,然后发起调用,当重试结束还未成功,则抛出异常。

    构造带Filter的Invoker

    上面讲了两种Invoker的获取和invoke的工作原理,其实Dubbo中上面得到的Invoker不会直接返回给Proxy,而是需要和Filter集成最终返回Invoker链。这部分的代码前面白话部分讲Filter的时候已分解,传送门

    总结

    消费端的Proxy通过Invoker发起调用,Invoker对Proxy屏蔽了集群和服务治理等一系列逻辑,同时从Invoker层开始,提供了对多协议的支持。从Invoker再往后走,将不存在接口和方法的概念,下一篇将分解传输层的实现。

    相关文章

      网友评论

          本文标题:详解Dubbo(三):消费端构造Invoker

          本文链接:https://www.haomeiwen.com/subject/gomvuhtx.html