美文网首页
dubbo的网络通信

dubbo的网络通信

作者: 剑道_7ffc | 来源:发表于2020-06-09 08:13 被阅读0次

RegistryProtocol#doRefer

//构建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//注册consumer://协议的url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
    //consumeUrl
    directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
    registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//订阅节点的变化
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
        PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

RegistryDirectory#subscribe

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        //这里的registry是zookeeperRegsitry
    registry.subscribe(url, this);
}

FailbackRegistry#subscribe

@Override
public void subscribe(URL url, NotifyListener listener) {
     super.subscribe(url, listener);
     removeFailedSubscribed(url, listener);
     // Sending a subscription request to the server side
     doSubscribe(url, listener);
}

ZookeeperRegistry#doSubscribe

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    //path:/dubbo/com.my.dubbo.IQueryService/providers
    for (String path : toCategoriesPath(url)) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners == null) {
            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
            listeners = zkListeners.get(url);
        }
        ChildListener zkListener = listeners.get(listener);
        if (zkListener == null) {
            listeners.putIfAbsent(listener, (parentPath, currentChilds) ->
                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
            zkListener = listeners.get(listener);
        }
        zkClient.create(path, false);
        //dubbo//ip:port
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    notify(url, listener, urls);
}

FailbackRegistry#notify

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    //key:consumerURL value:providerUrls
    Map<String, List<URL>> result = new HashMap<>();
    //url:consumerURL urls:provider latest urls
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //listener:RegistryDirectory
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

RegistryDirectory.notify

public synchronized void notify(List<URL> urls) {
    //providers --> providersUrl
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(url -> {
                if (UrlUtils.isConfigurator(url)) {
                    return CONFIGURATORS_CATEGORY;
                } else if (UrlUtils.isRoute(url)) {
                    return ROUTERS_CATEGORY;
                } else if (UrlUtils.isProvider(url)) {
                    return PROVIDERS_CATEGORY;
                }
                return "";
            }));

    // providers
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    refreshOverrideAndInvoker(providerURLs);
}

RegistryDirectory#refreshOverrideAndInvoker

private void refreshOverrideAndInvoker(List<URL> urls) {
    // ProviderUrl
    overrideDirectoryUrl();
    refreshInvoker(urls);
}

RegistryDirectory#refreshInvoker

private void refreshInvoker(List<URL> invokerUrls) {
    Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

    List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
    // pre-route and build cache, notice that route cache should build on original Invoker list.
    // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
    routerChain.setInvokers(newInvokers);
    this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
    this.urlInvokerMap = newUrlInvokerMap;

    try {
        destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
    } catch (Exception e) {
        logger.warn("destroyUnusedInvokers error. ", e);
    }
}

RegistryDirectory#toInvokers

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    for (URL providerUrl : urls) {
        URL url = mergeUrl(providerUrl);
        String key = url.toFullString(); // The parameter urls are sorted
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        }
    }
    return newUrlInvokerMap;
}

protocol.refer的解析

protocol是自适用扩展点,而url是dubbo开头的,所以选择dubbo协议,即对象是
ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)

protocol.refer(serviceType, url)

AbstractProtocol#refer

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}

DubboProtocol#protocolBindingRefer

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

DubboProtocol#getClients

private ExchangeClient[] getClients(URL url) {
    // whether to share connection

    boolean useShareConnect = false;

    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // if not configured, connection is shared, otherwise, one connection for one service
    if (connections == 0) {
        useShareConnect = true;

        /**
         * The xml configuration should have a higher priority than properties.
         */
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        //connections为1
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        shareClients = getSharedClient(url, connections);
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);

        } else {
            clients[i] = initClient(url);
        }
    }

    return clients;
}

DubboProtocol#getSharedClient

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    String key = url.getAddress();
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

    if (checkClientCanUse(clients)) {
        batchClientRefIncr(clients);
        return clients;
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        clients = referenceClientMap.get(key);
        // dubbo check
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        // connectNum must be greater than or equal to 1
        connectNum = Math.max(connectNum, 1);

        // If the clients is empty, then the first initialization is
        if (CollectionUtils.isEmpty(clients)) {
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);

        } else {
            for (int i = 0; i < clients.size(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }

                referenceCountExchangeClient.incrementAndGetCount();
            }
        }

        /**
         * I understand that the purpose of the remove operation here is to avoid the expired url key
         * always occupying this memory space.
         */
        locks.remove(key);

        return clients;
    }
}

DubboProtocol#buildReferenceCountExchangeClientList

private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
    List<ReferenceCountExchangeClient> clients = new ArrayList<>();

    for (int i = 0; i < connectNum; i++) {
        clients.add(buildReferenceCountExchangeClient(url));
    }

    return clients;
}

DubboProtocol#buildReferenceCountExchangeClient

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}

DubboProtocol#initClient

requestHandler是ExchangeHandlerAdapter的局部内部类。

private ExchangeClient initClient(URL url) {

    client = Exchangers.connect(url, requestHandler);

    return client;
}

Exchangers#connect(URL, ExchangeHandler)

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return getExchanger(url).connect(url, handler);
}

Exchangers#getExchanger(org.apache.dubbo.common.URL)

public static Exchanger getExchanger(URL url) {
    String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
    return getExchanger(type);
}

Exchangers#getExchanger(java.lang.String)

public static Exchanger getExchanger(String type) {
    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

HeaderExchanger#connect

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect(URL, ChannelHandler...)

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().connect(url, handler);
}

Transporters#getTransporter

public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

NettyTransporter#connect

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}

相关文章

网友评论

      本文标题:dubbo的网络通信

      本文链接:https://www.haomeiwen.com/subject/fiwktktx.html