美文网首页
Dubbo源码分析(三) 服务引用

Dubbo源码分析(三) 服务引用

作者: skyguard | 来源:发表于2018-11-12 11:09 被阅读0次

    下面我们来看一下服务引用的流程。Dubbo 基于 Spring 的 Schema 扩展实现 XML 配置解析,DubboNamespaceHandler 会将 <dubbo:reference> 标签解析为 ReferenceBean,ReferenceBean 实现了 FactoryBean,因此当它在代码中有引用时,会调用 ReferenceBean的getObject 方法进入节点注册和服务发现流程。先来看一下ReferenceBean的getObject方法

     public Object getObject() throws Exception {
        return get();
    }
    

    再到ReferenceConfig的get方法

     public synchronized T get() {
        // 已销毁,不可获得
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        // 初始化
        if (ref == null) {
            init();
        }
        return ref;
    }
    

    再到init方法

    private void init() {
        // 已经初始化,直接返回
        if (initialized) {
            return;
        }
        initialized = true;
        // 校验接口名非空
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
        }
        // 拼接属性配置(环境变量 + properties 属性)到 ConsumerConfig 对象
        // get consumer's global configuration
        checkDefault();
        // 拼接属性配置(环境变量 + properties 属性)到 ReferenceConfig 对象
        appendProperties(this);
        // 若未设置 `generic` 属性,使用 `ConsumerConfig.generic` 属性。
        if (getGeneric() == null && getConsumer() != null) {
            setGeneric(getConsumer().getGeneric());
        }
        // 泛化接口的实现
        if (ProtocolUtils.isGeneric(getGeneric())) {
            interfaceClass = GenericService.class;
        // 普通接口的实现
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 校验接口和方法
            checkInterfaceAndMethods(interfaceClass, methods);
        }
        // 直连提供者
        // 【直连提供者】第一优先级,通过 -D 参数指定 ,例如 java -Dcom.alibaba.xxx.XxxService=dubbo://localhost:20890
        String resolve = System.getProperty(interfaceName);
        String resolveFile = null;
        // 【直连提供者】第二优先级,通过文件映射,例如 com.alibaba.xxx.XxxService=dubbo://localhost:20890
        if (resolve == null || resolve.length() == 0) {
            // 默认先加载,`${user.home}/dubbo-resolve.properties` 文件 ,无需配置
            resolveFile = System.getProperty("dubbo.resolve.file");
            if (resolveFile == null || resolveFile.length() == 0) {
                File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
                if (userResolveFile.exists()) {
                    resolveFile = userResolveFile.getAbsolutePath();
                }
            }
            // 存在 resolveFile ,则进行文件读取加载。
            if (resolveFile != null && resolveFile.length() > 0) {
                Properties properties = new Properties();
                FileInputStream fis = null;
                try {
                    fis = new FileInputStream(new File(resolveFile));
                    properties.load(fis);
                } catch (IOException e) {
                    throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
                } finally {
                    try {
                        if (null != fis) fis.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
                resolve = properties.getProperty(interfaceName);
            }
        }
        // 设置直连提供者的 url
        if (resolve != null && resolve.length() > 0) {
            url = resolve;
            if (logger.isWarnEnabled()) {
                if (resolveFile != null && resolveFile.length() > 0) {
                    logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
                } else {
                    logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
                }
            }
        }
        // 从 ConsumerConfig 对象中,读取 application、module、registries、monitor 配置对象。
        if (consumer != null) {
            if (application == null) {
                application = consumer.getApplication();
            }
            if (module == null) {
                module = consumer.getModule();
            }
            if (registries == null) {
                registries = consumer.getRegistries();
            }
            if (monitor == null) {
                monitor = consumer.getMonitor();
            }
        }
        // 从 ModuleConfig 对象中,读取 registries、monitor 配置对象。
        if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        // 从 ApplicationConfig 对象中,读取 registries、monitor 配置对象。
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        // 校验 ApplicationConfig 配置。
        checkApplication();
        // 校验 Stub 和 Mock 相关的配置
        checkStubAndMock(interfaceClass);
        // 将 `side`,`dubbo`,`timestamp`,`pid` 参数,添加到 `map` 集合中。
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        // methods、revision、interface
        if (!isGeneric()) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
    
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 获得方法数组
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        map.put(Constants.INTERFACE_KEY, interfaceName);
        // 将各种配置对象,添加到 `map` 集合中。
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        // 获得服务键,作为前缀
        String prefix = StringUtils.getServiceKey(map);
        // 将 MethodConfig 对象数组,添加到 `map` 集合中。
        if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                // 将 MethodConfig 对象,添加到 `map` 集合中。
                appendParameters(map, method, method.getName());
                // 当 配置了 `MethodConfig.retry = false` 时,强制禁用重试
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                // 将带有 @Parameter(attribute = true) 配置对象的属性,添加到参数集合。
                appendAttributes(attributes, method, prefix + "." + method.getName());
                // 检查属性集合中的事件通知方法是否正确。若正确,进行转换。
                checkAndConvertImplicitConfig(method, map, attributes);
            }
        }
    
        // 以系统环境变量( DUBBO_IP_TO_REGISTRY ) 作为服务注册地址,参见 https://github.com/dubbo/dubbo-docker-sample 项目。
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    
        // 添加到 StaticContext 进行缓存
        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
    
        // 创建 Service 代理对象
        ref = createProxy(map);
    
    
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
    

    因为通过注册中心,因此在ReferenceConfig的createProxy方法中。,进入 RegistryProtocol的refer方法。

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 获得真实的注册中心的 URL
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 获得注册中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }
    
        // 获得服务引用配置参数集合
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        // 分组聚合
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 执行服务引用
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 执行服务引用
        return doRefer(cluster, registry, type, url);
    }
    

    再到doRefer方法

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 创建 RegistryDirectory 对象,并设置注册中心
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 创建订阅 URL
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 服务引用配置集合
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注册中心注册自己(服务消费者)
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false))); // 不检查的原因是,不需要检查。
        }
        // 向注册中心订阅服务提供者 + 路由规则 + 配置规则
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                        Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
    
        // 创建 Invoker 对象
        Invoker invoker = cluster.join(directory);
        // 向本地注册表,注册消费者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
    

    RegistryDirectory 通过 RegistryDirectory的subscribeUrl方法向 Zookeeper 订阅服务节点信息并 watch 变更,这样就实现了服务自动发现。
    再来看一下Directory接口

    public interface Directory<T> extends Node {
    
    /**
     * get service type.
     *
     * 获得服务类型,例如:com.alibaba.dubbo.demo.DemoService
     *
     * @return service type.
     */
    Class<T> getInterface();
    
    /**
     * list invokers.
     *
     * 获得所有服务 Invoker 集合
     *
     * @return invokers
     */
    List<Invoker<T>> list(Invocation invocation) throws RpcException;
    
    }
    

    看一下AbstractDirectory这个类

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        // 获得所有 Invoker 集合
        List<Invoker<T>> invokers = doList(invocation);
        // 根据路由规则,筛选 Invoker 集合
        List<Router> localRouters = this.routers; // local reference 本地引用,避免并发问题
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
    

    再来看一下RegistryDirectory的具体实现

    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
        }
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        // 获得 Invoker 集合
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            // 获得方法名、方法参数
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            // 【第一】可根据第一个参数枚举路由
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
            //                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
                invokers = localMethodInvokerMap.get(methodName + args[0]); // The routing can be enumerated according to the first parameter
            }
            // 【第二】根据方法名获得 Invoker 集合
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            // 【第三】使用全量 Invoker 集合。例如,`#$echo(name)` ,回声方法
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            // 【第四】使用 `methodInvokerMap` 第一个 Invoker 集合。防御性编程。
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
    

    在 dolist方法中,如果通过服务治理禁止 Consumer 访问的话,此处直接抛出响应的异常。
    RegistryDirectory 实现了 NotifyListener,在 ZK 节点变化时能收到通知更新内存缓存,其中 RegistryDirectory的mergeUrl方法中会按照优先级合并参数(动态配置在此处生效)。
    服务引用时从内存缓存中获取并返回invoker列表,并根据路由规则再进行一次过滤。
    再来看一下AbstractClusterInvoker这个类

    public Result invoke(final Invocation invocation) throws RpcException {
        // 校验是否销毁
        checkWhetherDestroyed();
    
        // 获得所有服务提供者 Invoker 集合
        List<Invoker<T>> invokers = list(invocation);
    
        // 获得 LoadBalance 对象
        LoadBalance loadbalance;
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
    
        // 设置调用编号,若是异步调用
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
        // 执行调用
        return doInvoke(invocation, invokers, loadbalance);
    }
    

    再看一下FailoverClusterInvoker的具体实现

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
        checkInvokers(copyinvokers, invocation);
        // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // 保存最后一次调用的异常
        // retry loop.
        RpcException le = null; // last exception.
        // 保存已经调用过的Invoker
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        // failover机制核心实现:如果出现调用失败,那么重试其他服务器
        for (int i = 0; i < len; i++) {
            // Reselect before retry to avoid a change of candidate `invokers`.
            // NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            // 重试时,进行重新选择,避免重试时invoker列表已发生变化.
            // 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
            if (i > 0) {
                checkWhetherDestroyed();
                // 根据Invocation调用信息从Directory中获取所有可用Invoker
                copyinvokers = list(invocation);
                // check again
                // 重新检查一下
                checkInvokers(copyinvokers, invocation);
            }
            // 根据负载均衡机制从copyinvokers中选择一个Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            // 保存每次调用的Invoker
            invoked.add(invoker);
            // 设置已经调用的 Invoker 集合,到 Context 中
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // RPC 调用得到 Result
                Result result = invoker.invoke(invocation);
                // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                // 如果是业务性质的异常,不再重试,直接抛出
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                // 其他性质的异常统一封装成RpcException
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
        throw new RpcException(le.getCode(), "Failed to invoke the method " + invocation.getMethodName()
                + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers "
                + providers + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost()
                + " using the dubbo version " + Version.getVersion()
                + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
    

    在调用或重试时,每次都通过 LoadBalance 选出一个 Invoker 进行调用。
    服务引用的流程就完成了。

    相关文章

      网友评论

          本文标题:Dubbo源码分析(三) 服务引用

          本文链接:https://www.haomeiwen.com/subject/ryuqfqtx.html