美文网首页
探究 dubbo接口超时时间的赋值逻辑,包含Consumer,P

探究 dubbo接口超时时间的赋值逻辑,包含Consumer,P

作者: xiangR | 来源:发表于2019-10-28 17:35 被阅读0次
    • 首先,我们都知道 dubbo 调用,consumer 可以配置接口超时时间,provider 也可以配置超时时间,但是最终会以 consumer 为准,这里还有一个 MethodConfig 的概念。
    • 具体的优先级体现为 conusmer.MethodConfig.timeout > provider.MethodConfig.timeout > conusmer.ServiceBean.timeout

    源码

    • provider.xml
       
        <bean id="innerStockServiceExport" class="com.alibaba.dubbo.config.spring.ServiceBean">
            <property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
            <property name="ref" ref="innerStockServiceImpl"/>
            <property name="application" ref="dubboApplicationConfig"/>
            <property name="registry" ref="dubboRegistryConfig"/>
            <property name="protocol" ref="dubboProtocolConfig"/>
            <property name="version" value="${dubbo.reference.version}"/>
            <property name="timeout" value="${dubbo.export.timeout}"/>
            <property name="retries" value="0"/>
            <property name="methods">
                <list>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="initVirtualStock"/>
                        <property name="timeout" value="4000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="batchFreezeStock"/>
                        <property name="timeout" value="6000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="batchCancelStock"/>
                        <property name="timeout" value="6000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="batchConsumeStock"/>
                        <property name="timeout" value="6000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="cancelForConsumedOrder"/>
                        <property name="timeout" value="30000"/>
                    </bean>
                </list>
            </property>
        </bean>
    
    • provider.xml
    
        <bean id="innerStockService" class="com.alibaba.dubbo.config.spring.ReferenceBean">
            <property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
            <property name="application" ref="dubboApplicationConfig"/>
            <property name="registry" ref="dubboRegistryConfig"/>
            <property name="timeout" value="3000"/>
            <property name="check" value="false"/>
            <property name="version" value="${dubbo.reference.version}"/>
            <property name="methods">
                <list>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="batchFreezeStock"/>
                        <property name="timeout" value="16000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="splitFreezeStock"/>
                        <property name="timeout" value="6000"/>
                    </bean>
                    <bean class="com.alibaba.dubbo.config.MethodConfig">
                        <property name="name" value="finishStockIn"/>
                        <property name="timeout" value="16000"/>
                    </bean>
                </list>
            </property>
        </bean>
    
    

    加载本地的配置信息构建 URL

        com.alibaba.dubbo.config.ReferenceConfig#init
    
        private void init() {
            if (initialized) {
                return;
            }
            initialized = true;
    
            ..... 省略一部分
    
            appendParameters(map, application);
            appendParameters(map, module);
            appendParameters(map, consumer, Constants.DEFAULT_KEY);
            appendParameters(map, this);
            String prifix = StringUtils.getServiceKey(map);
    
            // 这里拿到本地 consumer.xml 中 MethodConfig 的信息初始化 key: method.timeout, value: 毫秒数到 map
            if (methods != null && methods.size() > 0) {
                for (MethodConfig method : methods) {
                    appendParameters(map, method, method.getName());
                    String retryKey = method.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(method.getName() + ".retries", "0");
                        }
                    }
                    appendAttributes(attributes, method, prifix + "." + method.getName());
                    checkAndConvertImplicitConfig(method, map, attributes);
                }
            }
            //attributes通过系统context进行存储.
            StaticContext.getSystemContext().putAll(attributes);
            ref = createProxy(map);
        }
    
    • 我们可以看到此时的map只有comsumer.xml 的配置信息


      此时 URL 的信息.png
    • 与远程配置进行 merge
      其实我们关注的就是 RegistryDirectory.mergeUrl 中的 ClusterUtils.mergeUrl(providerUrl, queryMap);
      providerUrl :远程的接口配置
      queryMap : 本地配置URL 中的 map
      ClusterUtils.mergeUrl 中的实现就是依赖了 map.put() 的实现,先put provider,再put local。最终达到了以本地MethodConfig 为准的目的

        com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl
    
        /**
         * 合并url参数 顺序为override > -D >Consumer > Provider
         * @param providerUrl
         * @param overrides
         * @return
         */
        private URL mergeUrl(URL providerUrl){
            // 其实我们关注的就是这一部分
            providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
            
            List<Configurator> localConfigurators = this.configurators; // local reference
            if (localConfigurators != null && localConfigurators.size() > 0) {
                for (Configurator configurator : localConfigurators) {
                    providerUrl = configurator.configure(providerUrl);
                }
            }
            
            providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // 不检查连接是否成功,总是创建Invoker!
            
            ... 省略一部分
    
            return providerUrl;
        }
    
      
        public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
            Map<String, String> map = new HashMap<String, String>();
            Map<String, String> remoteMap = remoteUrl.getParameters();
            
            
            if (remoteMap != null && remoteMap.size() > 0) {
                // 先put remote
                map.putAll(remoteMap);
                
                //线程池配置不使用提供者的
                map.remove(Constants.THREAD_NAME_KEY);
                map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);
                ... 省略一部分
            }
            
            if (localMap != null && localMap.size() > 0) {
                // 再put local          
                map.putAll(localMap);
            }
       
                ... 省略一部分      
            }
    
            return remoteUrl.clearParameters().addParameters(map);
        }
    
    • 我们可以看到 merge 后的map 已经多出了一些provider.xml 的配置信息


      merge后的map.png

    获取 timeout 的地方

    • 从URL 中的 parameters 中根据 method.timeout 或者 timeout 为key 取出对应的数值
    // 重点就是这句话
    int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    
        com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke    
    
        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
            
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    ResponseFuture future = currentClient.request(inv, timeout) ;
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
        public int getMethodParameter(String method, String key, int defaultValue) {
            String methodKey = method + "." + key;
            Number n = getNumbers().get(methodKey);
            if (n != null) {
                return n.intValue();
            }
            String value = getMethodParameter(method, key);
            if (value == null || value.length() == 0) {
                return defaultValue;
            }
            int i = Integer.parseInt(value);
            getNumbers().put(methodKey, i);
            return i;
        }
    
    

    相关文章

      网友评论

          本文标题:探究 dubbo接口超时时间的赋值逻辑,包含Consumer,P

          本文链接:https://www.haomeiwen.com/subject/zhbbvctx.html