美文网首页
dubbo client启动

dubbo client启动

作者: binecy | 来源:发表于2018-02-08 11:28 被阅读59次

    源码分析基于dubbo 2.6.0

    ReferenceBean继承了ReferenceConfig, 还实现了FactoryBean。
    spring启动时,会通过FactoryBean.getObject创建bean。这里会调用到ReferenceConfig.createProxy

    private T createProxy(Map<String, String> map) {
        ...
    
        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
        } else {
            // 构建url,放入到urls中
            ...
    
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
    
        return (T) proxyFactory.getProxy(invoker);
    }
    

    refprotocol.refer会调用到RegistryProtocol.refer。它会调用doRefer方法。

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        ...
        // 订阅
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
    
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
        return invoker;
    }
    

    这里只关注cluster.join,因为它创建了invoker。
    cluster默认为FailoverCluster

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
    

    但FailoverCluster处理前,要经过装饰者MockClusterWrapper

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
    

    前面dubbo 注册说过了,订阅会调用到RegistryDirectory.notify,进而调用refreshInvoker方法。

    private void refreshInvoker(List<URL> invokerUrls) {
        ...
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
    
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
    }
    

    看看toInvokers

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        for (URL providerUrl : urls) {
            ...
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                
                invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        ;
        return newUrlInvokerMap;
    }
    

    protocol.refer会调用DubboProtocol.refer,进而启动client端的网络通讯服务(如netty)。

    proxyFactory.getProxy默认通过JavassistProxyFactory实现。

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
    

    跟server端处理请求一样,这里也有invoker责任链,节点为InvokerInvocationHandler > MockClusterInvoker > FailoverClusterInvoker

    动态类Proxy由com.alibaba.dubbo.common.bytecode.getProxy(ClassLoader cl, Class<?>... ics)方法生成。跟com.alibaba.dubbo.common.bytecode.Wrapper一样,拼凑代码字符串,通过Javassist生成动态类。
    这里直接看生成动态类,生成动态类的方法体为

    Object[] args = new Object[1]; 
    args[0] = ($w)$1; 
    Object ret = handler.invoke(this, methods[0], args); 
    return (java.lang.String)ret;
    

    就是通过handler来调用业务方法。
    先看看InvokerInvocationHandler

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        ...
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
    

    Invocation是rpc调用上下文信息,包括methodName,arguments等信息。

    invoker的请求调用最后会调用FailoverClusterInvoker,它会调用父类AbstractClusterInvoker

    public Result invoke(final Invocation invocation) throws RpcException {
        ...
        
        LoadBalance loadbalance;
        // 路由
        List<Invoker<T>> invokers = list(invocation);
        // 负载均衡
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        // 调用请求
        return doInvoke(invocation, invokers, loadbalance);
    }
    

    list方法会调用AbstractDirectory.list

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && localRouters.size() > 0) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
    

    RegistryDirectory.doList方法通过Invocation中的methodName,Arguments等信息从methodInvokerMap中获取对应的invokers。(这里的invokers是用于发送网络请求到server进行逻辑处理,而上面说的invoker责任链是用于实现MockCluster/FailoverCluster等扩展功能,注意两者使用场合不同)

    Router接口负责实现路由选择操作,Router共有三个实现MockInvokersSelector/ConditionRouter/ScriptRouter。
    MockInvokersSelector是专用于处理MOCK请求的。

    LoadBalance接口负责实现负载均衡,有RandomLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance。

    AbstractClusterInvoker有FailoverClusterInvoker/FailfastClusterInvoker/FailbackClusterInvoker/AvailableClusterInvoker/...
    不同的实现类支持不同的集群容错。

    看看默认的FailoverClusterInvoker.doInvoke

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        
        // 重试次数
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        // 已调用的invoked
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            ...
            // 选择invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 调用invoker,发送网络请求
                Result result = invoker.invoke(invocation);
                
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(...);
    }
    

    注意,如果是server端抛出的业务异常,不会重发请求,只有rpc异常(如连接超时),才会重发请求。

    相关文章

      网友评论

          本文标题:dubbo client启动

          本文链接:https://www.haomeiwen.com/subject/xqoqzxtx.html