美文网首页dubbo
Dubbo Rest 服务引用流程

Dubbo Rest 服务引用流程

作者: 晴天哥_王志 | 来源:发表于2020-02-22 21:29 被阅读0次

    开篇

    • 最近在公众号看到一篇记一次dubbo服务发现导致的OOM的文章,这篇文章的核心是Rest协议在服务引用过程中由于异常导致内存OOM,借助这篇文章顺带梳理下Rest协议的服务引用过程。

    • 整个Rest协议的引用过程按照 RegistryDirectory#notify => RegistryDirectory#refreshInvoker => RegistryDirectory#toInvokers => RestProtocol#refer => RestProtocol#doRefer的流程执行。

    RegistryDirectory#notify

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
        public synchronized void notify(List<URL> urls) {
            List<URL> invokerUrls = new ArrayList<URL>();
            List<URL> routerUrls = new ArrayList<URL>();
            List<URL> configuratorUrls = new ArrayList<URL>();
            // 分解URLS,这里只关注invokerUrls即provider对应的Url地址
            for (URL url : urls) {
                String protocol = url.getProtocol();
                String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                if (Constants.ROUTERS_CATEGORY.equals(category)
                        || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                    routerUrls.add(url);
                } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                        || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                    configuratorUrls.add(url);
                } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                    invokerUrls.add(url);
                } else {
                    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
                }
            }
            // configurators
            if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
                this.configurators = toConfigurators(configuratorUrls);
            }
            // routers
            if (routerUrls != null && !routerUrls.isEmpty()) {
                List<Router> routers = toRouters(routerUrls);
                if (routers != null) { // null - do nothing
                    setRouters(routers);
                }
            }
            List<Configurator> localConfigurators = this.configurators; // local reference
            // merge override parameters
            this.overrideDirectoryUrl = directoryUrl;
            if (localConfigurators != null && !localConfigurators.isEmpty()) {
                for (Configurator configurator : localConfigurators) {
                    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
                }
            }
            // 根据invokerUrls转为Invoker对象
            refreshInvoker(invokerUrls);
        }
    }
    
    • RegistryDirectory#notify核心操作包括两步:分类Url和转换Url。
    • 分类Url是指根据Url的类别属性分为不同类别,这里关心invokerUrls。
    • 转换Url是指对invokerUrls进行转换操作,通过refreshInvoker(invokerUrls)实现。

    RegistryDirectory#refreshInvoker

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
        private void refreshInvoker(List<URL> invokerUrls) {
            if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.methodInvokerMap = null; // Set the method invoker map to null
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<URL>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
                // toInvokers(invokerUrls)执行将url转为invoker对象
                Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
                Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
                // state change
                // If the calculation is wrong, it is not processed.
                if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                    return;
                }
                this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
                this.urlInvokerMap = newUrlInvokerMap;
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
            }
        }
    }
    
    • RegistryDirectory#refreshInvoker核心通过toInvokers()转换invoker对象。

    RegistryDirectory#toInvokers

    public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    
        private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
            // newUrlInvokerMap的key为provider的URL,value为对应的invoker对象
            Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
            if (urls == null || urls.isEmpty()) {
                return newUrlInvokerMap;
            }
            Set<String> keys = new HashSet<String>();
            String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                // If protocol is configured at the reference side, only the matching protocol is selected
                if (queryProtocols != null && queryProtocols.length() > 0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) {
                        continue;
                    }
                }
               
                URL url = mergeUrl(providerUrl);
    
                String key = url.toFullString(); // The parameter urls are sorted
                if (keys.contains(key)) { // Repeated url
                    continue;
                }
                keys.add(key);
                // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
                Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(Constants.DISABLED_KEY)) {
                            enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(Constants.ENABLED_KEY, true);
                        }
                        if (enabled) {
                            // protocol.refer()创建invoker对象
                            invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                    }
                    // invoker不为null保存到newUrlInvokerMap当中
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(key, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    }
    
    • RegistryDirectory#toInvokers内部针对providerUrl执行refer()生成invoker对象。
    • RegistryDirectory#toInvokers内部针对成功生成invoker对象按照Url和invoker的kv形式进行存储。
    • 如果protocol.refer()生成invoker异常,导致newUrlInvokerMap永远不存在Url对应的invoker,那么该Url对应的invoker会被重复创建,但是因为失败永远不会建成功。
    • RegistryDirectory#toInvokers内部针对newUrlInvokerMap中不存在的URL才会重新生成invoker。

    RestProtocol#refer

    public abstract class AbstractProxyProtocol extends AbstractProtocol {
    
        public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
            // 执行 doRefer(type, url)
            final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
            // 生成AbstractInvoker对应的invoker对象
            Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
                @Override
                protected Result doInvoke(Invocation invocation) throws Throwable {
                    try {
                        Result result = target.invoke(invocation);
                        Throwable e = result.getException();
                        if (e != null) {
                            for (Class<?> rpcException : rpcExceptions) {
                                if (rpcException.isAssignableFrom(e.getClass())) {
                                    throw getRpcException(type, url, invocation, e);
                                }
                            }
                        }
                        return result;
                    } catch (RpcException e) {
                        if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                            e.setCode(getErrorCode(e.getCause()));
                        }
                        throw e;
                    } catch (Throwable e) {
                        throw getRpcException(type, url, invocation, e);
                    }
                }
            };
            invokers.add(invoker);
            return invoker;
        }
    }
    
    • RestProtocol#refer()中会调用proxyFactory.getInvoker(doRefer())方法生成target对象,调用RestProtocol#doRefer的方法。
    • 通过AbstractInvoker类二次封装target返回invoker对象。

    RestProtocol#doRefer

    public class RestProtocol extends AbstractProxyProtocol {
    
        private final List<ResteasyClient> clients = Collections.synchronizedList(new LinkedList<ResteasyClient>());
    
        protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
            if (connectionMonitor == null) {
                connectionMonitor = new ConnectionMonitor();
            }
    
            // TODO more configs to add
            PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
            // 20 is the default maxTotal of current PoolingClientConnectionManager
            connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20));
            connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20));
    
            connectionMonitor.addConnectionManager(connectionManager);
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT))
                    .setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
                    .build();
    
            SocketConfig socketConfig = SocketConfig.custom()
                    .setSoKeepAlive(true)
                    .setTcpNoDelay(true)
                    .build();
    
            CloseableHttpClient httpClient = HttpClientBuilder.create()
                    .setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
                        @Override
                        public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
                            HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
                            while (it.hasNext()) {
                                HeaderElement he = it.nextElement();
                                String param = he.getName();
                                String value = he.getValue();
                                if (value != null && param.equalsIgnoreCase("timeout")) {
                                    return Long.parseLong(value) * 1000;
                                }
                            }
                            // TODO constant
                            return 30 * 1000;
                        }
                    })
                    .setDefaultRequestConfig(requestConfig)
                    .setDefaultSocketConfig(socketConfig)
                    .build();
    
            ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/);
    
            // 创建client并添加到clients
            ResteasyClient client = new ResteasyClientBuilder().httpEngine(engine).build();
            clients.add(client);
    
            client.register(RpcContextFilter.class);
            for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(url.getParameter(Constants.EXTENSION_KEY, ""))) {
                if (!StringUtils.isEmpty(clazz)) {
                    try {
                        client.register(Thread.currentThread().getContextClassLoader().loadClass(clazz.trim()));
                    } catch (ClassNotFoundException e) {
                        throw new RpcException("Error loading JAX-RS extension class: " + clazz.trim(), e);
                    }
                }
            }
    
            // TODO protocol
            ResteasyWebTarget target = client.target("http://" + url.getHost() + ":" + url.getPort() + "/" + getContextPath(url));
            return target.proxy(serviceType);
        }
    }
    
    • RestProtocol对象在Dubbo的上下文中只存在一个实例,所以类中的clients保存了所有的Rest的client对象。
    • RestProtocol#doRefer方法内部会创建ResteasyClient的client对象。
    • RestProtocol#doRefer方法内部通过client.target()方法返回target对象。
    • 如果创建ResteasyClient对象成功但是创建ResteasyWebTarget失败,那么client依然会增加,但是外层的invoker却因为异常导致无法创建成功。
    • 上述的异常导致了provider的Url在每次创建invoker对象都会失败进而造成每次该Url重新发布就回走一次创建invoker的过程,最终结果client不停增加而OOM。

    参考文章

    相关文章

      网友评论

        本文标题:Dubbo Rest 服务引用流程

        本文链接:https://www.haomeiwen.com/subject/fzxrqhtx.html