美文网首页
dubbo的网络通信

dubbo的网络通信

作者: 剑道_7ffc | 来源:发表于2020-06-09 08:13 被阅读0次

    RegistryProtocol#doRefer

    //构建RegistryDirectory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    //注册consumer://协议的url
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        //consumeUrl
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    //订阅节点的变化
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    

    RegistryDirectory#subscribe

    public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
            //这里的registry是zookeeperRegsitry
        registry.subscribe(url, this);
    }
    

    FailbackRegistry#subscribe

    @Override
    public void subscribe(URL url, NotifyListener listener) {
         super.subscribe(url, listener);
         removeFailedSubscribed(url, listener);
         // Sending a subscription request to the server side
         doSubscribe(url, listener);
    }
    

    ZookeeperRegistry#doSubscribe

    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        List<URL> urls = new ArrayList<>();
        //path:/dubbo/com.my.dubbo.IQueryService/providers
        for (String path : toCategoriesPath(url)) {
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, (parentPath, currentChilds) ->
                        ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkListener = listeners.get(listener);
            }
            zkClient.create(path, false);
            //dubbo//ip:port
            List<String> children = zkClient.addChildListener(path, zkListener);
            if (children != null) {
                urls.addAll(toUrlsWithEmpty(url, path, children));
            }
        }
        notify(url, listener, urls);
    }
    

    FailbackRegistry#notify

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        //key:consumerURL value:providerUrls
        Map<String, List<URL>> result = new HashMap<>();
        //url:consumerURL urls:provider latest urls
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //listener:RegistryDirectory
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }
    

    RegistryDirectory.notify

    public synchronized void notify(List<URL> urls) {
        //providers --> providersUrl
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));
    
        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }
    

    RegistryDirectory#refreshOverrideAndInvoker

    private void refreshOverrideAndInvoker(List<URL> urls) {
        // ProviderUrl
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }
    

    RegistryDirectory#refreshInvoker

    private void refreshInvoker(List<URL> invokerUrls) {
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
    
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
    
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
    

    RegistryDirectory#toInvokers

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        for (URL providerUrl : urls) {
            URL url = mergeUrl(providerUrl);
            String key = url.toFullString(); // The parameter urls are sorted
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            }
        }
        return newUrlInvokerMap;
    }
    

    protocol.refer的解析

    protocol是自适用扩展点,而url是dubbo开头的,所以选择dubbo协议,即对象是
    ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)

    protocol.refer(serviceType, url)
    

    AbstractProtocol#refer

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }
    

    DubboProtocol#protocolBindingRefer

    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
    
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
    
        return invoker;
    }
    

    DubboProtocol#getClients

    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
    
        boolean useShareConnect = false;
    
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;
    
            /**
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            //connections为1
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }
    
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);
    
            } else {
                clients[i] = initClient(url);
            }
        }
    
        return clients;
    }
    

    DubboProtocol#getSharedClient

    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }
    
        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            clients = referenceClientMap.get(key);
            // dubbo check
            if (checkClientCanUse(clients)) {
                batchClientRefIncr(clients);
                return clients;
            }
    
            // connectNum must be greater than or equal to 1
            connectNum = Math.max(connectNum, 1);
    
            // If the clients is empty, then the first initialization is
            if (CollectionUtils.isEmpty(clients)) {
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                referenceClientMap.put(key, clients);
    
            } else {
                for (int i = 0; i < clients.size(); i++) {
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                    // If there is a client in the list that is no longer available, create a new one to replace him.
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }
    
                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }
    
            /**
             * I understand that the purpose of the remove operation here is to avoid the expired url key
             * always occupying this memory space.
             */
            locks.remove(key);
    
            return clients;
        }
    }
    

    DubboProtocol#buildReferenceCountExchangeClientList

    private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
        List<ReferenceCountExchangeClient> clients = new ArrayList<>();
    
        for (int i = 0; i < connectNum; i++) {
            clients.add(buildReferenceCountExchangeClient(url));
        }
    
        return clients;
    }
    

    DubboProtocol#buildReferenceCountExchangeClient

    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
        ExchangeClient exchangeClient = initClient(url);
    
        return new ReferenceCountExchangeClient(exchangeClient);
    }
    

    DubboProtocol#initClient

    requestHandler是ExchangeHandlerAdapter的局部内部类。

    private ExchangeClient initClient(URL url) {
    
        client = Exchangers.connect(url, requestHandler);
    
        return client;
    }
    

    Exchangers#connect(URL, ExchangeHandler)

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return getExchanger(url).connect(url, handler);
    }
    

    Exchangers#getExchanger(org.apache.dubbo.common.URL)

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
    

    Exchangers#getExchanger(java.lang.String)

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
    

    HeaderExchanger#connect

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
    

    Transporters#connect(URL, ChannelHandler...)

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }
    

    Transporters#getTransporter

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
    

    NettyTransporter#connect

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
    

    相关文章

      网友评论

          本文标题:dubbo的网络通信

          本文链接:https://www.haomeiwen.com/subject/fiwktktx.html