美文网首页
dubbo阅读(三) -- consumer消费流程

dubbo阅读(三) -- consumer消费流程

作者: msrpp | 来源:发表于2019-01-11 11:07 被阅读1次

本文基于dubbo的默认配置来说明:注册中心:zk rpc协议:dubbo协议

基于前两篇文章的分析。comsumer在启动的时候需要根据dubbo的配置文件启动spring容器,并用自定义命名空间和自定义<dubbo::?>类型节点解析器,来完成对消费接口的注入。当读取到配置文件的reference节点时,会创建一个ReferenceBean的bean对象。ReferenceBean继承自spirng的FactoryBean接口。因此当我们调用以下程序时,返回的对象demoServiceReferenceBean#getObject来获得:

DemoService demoService = (DemoService) context.getBean("demoService");

核心代码在ReferenceConfig#createProxy


    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                List<URL> us = loadRegistries(false);
                if (us != null && us.size() > 0) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.size() == 0) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

关键步骤就一处:invoker = refprotocol.refer(interfaceClass, url);
获取实际的消费者调用接口时的实际处理对象invoker
我们看下这个流程:ExtensionLoader根据配置的url(registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?...),选取到了RegistryProtocol#refer来生成调用链对象。定义了一个RegistryDirectory对象directory,将消费者注册到zk上(创建一个临时节点),同时订阅该接口对应的zk节点变化通知。生成一个携带directoryFailoverClusterInvoker类型对象返回。(注意:FailoverClusterInvoker对象只需要一个)。

返回的refer对象最终调用堆栈

provider注册到zk时,携带了自身地址,接口信息,启动时间戳,pid,weight权重等信息。FailoverClusterInvoker中的变量directory在监听到provider的节点信息变化时,会将相应的url转化成invoker保存下来,url是如何转化成invoker的?核心实现位于RegistryDirectory#Map<String, Invoker<T>> toInvokers(List<URL> urls),provider采用dubbo协议提供服务的情况下,则根据url定位到协议处理时invoker是DubboProtocol#refer生成的。根据拿到的url中的provider地址,用netty创建了一个client链接,放入DubboInvoker。同时将新生成的DubboInvoker放入FailoverClusterInvoker#directory存起来。

新的provider连接上zk时comsumer生成一个DubboInvoker的堆栈

当用户调用dubbo方法的时候,会从FailoverClusterInvoker#directory中取到所有该reference接口的DubboInvoker链接,用配置的负载均衡算法选取一条DubboInvoker来执行invoke方法,根据上述流程可知,每条DubboInvoker携带着一个netty客户端,调用invoke时用客户端发起请求报文并同步等待结果,最终将结果返回给调用用户。

用户调用dubbo方法的最终堆栈

Protocol装饰器

,之前有说过,用ExtensionLoader的,获取代理实例的时候,还会将对应接口的装饰器加到返回的实例上,refprotocol对象实际在返回时还包含了3层装饰器:QosProtocolWrapper,ProtocolFilterWrapper,ProtocolListenerWrapper。

1.QosProtocolWrapper

QosProtocolWrapper用作provider的服务控制。端口取的配置dubbo.application.qos.port,默认22222。
QosProtocolWrapper实现了一个netty的telnet服务(用LineBasedFrameDecoder解码)。
2.5版本的buddo提供的功能有两个:对服务接口或者下线(offline/online +interfaceName);当控制下线时,会删去zk上对应接口的临时节点。这样consumer就不会发现到这个接口了。

负载均衡策略

RandomLoadBalance

RandomLoadBalance是默认的负载策略,随机从invokers中选择一条,同时支持invoker带有weight。

ConsistentHashLoadBalance

一致性hash算法,用"要消费的服务组+接口"做key,invoker选择器做value。对于每个invoker的地址做hash运算(ip+port+数字),生成160个虚拟节点,放入整个hash环中。这样当以相同的参数多次调用时,就可以打到相同的服务了(作用:相同的参数用同一个服务消费,如果provider做了redis缓存,LRU之类会更加节省调用时间)。对一个任意参数的hash值,都会落在这些分割好的节点中间,向后选一个最靠近的虚拟节点对应的provider来消费。为什么每个地址生成了这么多虚拟节点呢?因为一个节点的随机性太大了,在provider数量比较少的时候,会导致大部分数据都打到相同的provider上。

RoundRobinLoadBalance

轮询,内部维护一个自增id,取(id%总权重)的值来判断当次执行应该选取哪个provider。

LeastActiveLoadBalance

每次选取活跃次数最少的链路。效果和轮训差不多,略去。

接口超时

我们可以在consumer配置中配置timeout(单位ms)参数来控制接口超时时间,具体逻辑:在consumer给服务发送报文时,会记录报文Id。启动一个线程定时轮训每个会话报文是否超时,超时的话直接调用DefaultFuture#received给出一个超时Response。

失败策略

调用接口可能失败,可能由于链接还没有成功,也可能接口超时未返回,这种情况下会产生一个RpcException,dubbo中有如下的策略可供选择来处理这些异常情况。

FailoverClusterInvoker

FailoverClusterInvoker是默认的失败策略。如果调用失败了,会重新调用负载均衡算法尝试其他provider。可以在reference节点中配置retries来控制重试次数。

FailfastClusterInvoker

失败一次直接返回

FailbackClusterInvoker

5s后再次尝试,尝试retries次。

FailsafeClusterInvoker

调用一次,忽略错误

BroadcastClusterInvoker

广播调用,只有所有provider都成功了才算成功。

异步调用

异步调用的示例代码如下:


Future<String> f = RpcContext.getContext().asyncCall(new Callable<String>() {
    @Override
    public String call() {
        return demoService.sayHello("123");
    }
});
String result = f.get();

对象f是一个FutureAdapter类型的Future对象,内部存储了一个DefaultFuture对象,当调用provider接口返回数据时,用Hessian2库根据接口的返回类型反序列化二进制数据,并触发条件变量。这样调用DefaultFuture#get就可以返回结果了。序列化部分后续分析。

相关文章

网友评论

      本文标题:dubbo阅读(三) -- consumer消费流程

      本文链接:https://www.haomeiwen.com/subject/maworqtx.html