美文网首页DubboDubbo 源码学习
Dubbo 服务暴露 源码学习(下)(四)

Dubbo 服务暴露 源码学习(下)(四)

作者: jwfy | 来源:发表于2018-05-08 18:43 被阅读112次

    笔记简述
    上文Dubbo 服务暴露 源码学习(上)(三)已经大致介绍了服务暴露的大致流程,不过还差了最后的invoke和export操作,本学习笔记就来继续介绍服务暴露的实现。
    更多内容可看[目录]Dubbo 源码学习

    目录

    Dubbo 服务暴露 源码学习(下)(四)
    1、protocol & proxyFactory
    2、获取Invoke
    2.1、JavassistProxyFactory获取Invoke
    2.2、JdkProxyFactory获取Invoke
    2.3、Invoke是什么
    3、Invoke暴露为export
    3.1、获取真实的Protocol类
    3.2、注册协议 暴露
    3.2.1、获取远程控制中心地址 getRegistry
    3.2.2、注册到注册中心
    3.2.3、暴露服务之注册
    3.2.4、网络端口开启
    3.2.5、开启心跳检测

    首先服务暴露的函数是如下的一行代码,接下来也主要是介绍这行代码的真正的操作操作细节。

    Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
    

    1、protocol & proxyFactory

    根据我们之前对dubbo spi的学习和了解,到这里已经知道了protocol和proxyFactory指的具体是什么了。

    protocol 类

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null) 
                throw new IllegalArgumentException("url == null");
    
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    
            return extension.refer(arg0, arg1);
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null) 
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    
            if (arg0.getUrl() == null) 
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
            //根据URL配置信息获取Protocol协议,默认是dubbo
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
                //根据协议名,获取Protocol实体类
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    
            return extension.export(arg0);
        }
    
        public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    }
    

    proxyFactory 类

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
        public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
            if (arg2 == null) 
                throw new IllegalArgumentException("url == null");
    
            com.alibaba.dubbo.common.URL url = arg2;
            String extName = url.getParameter("proxy", "javassist");
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    
            com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
            // 获取ProxyFactory类的名称为extName的实现类
    
            return extension.getInvoker(arg0, arg1, arg2);
            // arg0 是具体被调用的实现类
            // arg1是实现类的接口
            // arg2 是对外暴露的URL信息
            // 只是调用链路已经转交给具体的extension了
        }
    
        public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null) 
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    
           if (arg0.getUrl() == null) 
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
    
            String extName = url.getParameter("proxy", "javassist");
            // 获取URL中的proxy参数信息,没有则设置为javassist
            if(extName == null) 
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    
            com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    
            return extension.getProxy(arg0);
        }
    }
    

    可以很明显的看出来默认的protocol是使用的dubbo协议,对应的实例是包装DubboProtocol后的实例,ProxyFactory使用的是javassist,对应的实例是JavassistProxyFactory

    暴露操作包含了两个部分,一个Invoke,另一个是export

    2、获取Invoke

    2.1、JavassistProxyFactory获取Invoke

    proxyFactory.getInvoker(ref, (Class) interfaceClass, local)方法获得具体的Invoke。
    此时的ref是暴露的具体实现类,interfaceClass是对应的接口信息,local就是URL信息,具体内容是

    registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F172.16.109.110%3A20880%2Fcom.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3%26interface%3Dcom.jwfy.dubbo.product.ProductService%26methods%3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D13859%26side%3Dprovider%26timestamp%3D1525772505371%26token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=13859&registry=zookeeper&timestamp=1525772500246

    需要对外暴露的服务就是包含在URL信息中的ProductService信息

    在本demo中,getInvoke操作获取到JavassistProxyFactory对象后执行他的getInvoke操作

    JavassistProxyFactory 类

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 这里生产的wrapper也是动态生成的
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                                      // 这个proxy也就是具体的实现类,对应上面的ref
                                      // methodName是方法名
                                      // 剩下的是参数类型以及名称
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    

    生产的invoke对象其实是个AbstractProxyInvoker,只不过在调用他的doInvoke操作时,最后会执行拼接生成的wrapper对象的invokeMethod方法上。

    getWrapper

    在获取wrapper操作,也同样是动态拼接字符串生成的,重点看其中的invokeMethod方法

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) 
                         throws java.lang.reflect.InvocationTargetException{ 
    com.jwfy.dubbo.product.ProductServiceImpl w; 
    // 看这个就已经非常明显的说明实现类的具体对象
    try{ 
       w = ((com.jwfy.dubbo.product.ProductServiceImpl)$1); 
       // 格式化,强转类型
    } catch (Throwable e) { 
      throw new IllegalArgumentException(e); 
    } 
    
    try{ 
      // 对函数名称和参数进行匹配校验操作
      if( "print".equals( $2 )  &&  $3.length == 0 ) {  
         w.print(); 
         // 函数本身是返回void类型,则直接返回
         return null; 
       } 
       if( "getStr".equals( $2 )  &&  $3.length == 0 ) {  
          // 调用执行结果后强转格式返回
          return ($w)w.getStr(); 
        } 
     } catch(Throwable e) {      
        throw new java.lang.reflect.InvocationTargetException(e);  
     } 
    
    // 每个接口的方法都会遍历一遍,如果啥都没匹配到,就提示没有方法异常
    throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class com.jwfy.dubbo.product.ProductServiceImpl."); 
    }
    

    现在应该就很清楚了,在执行invokeMethod的时候,背后其实就是调用了实现类的对应方法,只是这个wrapper本身是动态生成的

    2.2、JdkProxyFactory获取Invoke

    上面说了在动态生成的代理工厂中默认实现的是JavassistProxyFactory,但是也可以使用java本身的协议,也就是JdkProxyFactory

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }
    

    完全就是通过java的反射去调用执行

    2.3、Invoke是什么

    其实刚刚开始看源码的时候并不是非常的理解Invoke到底是个什么,现在可以说Invoke是实现类的包装类,并包含了URL等信息,后续可以通过invoke方法去调用具体服务方。

    image

    3、Invoke暴露为export

    3.1、获取真实的Protocol类

    protocol.export(invoke),其中的invoke就是上面生成的抽象invoke类,可是在单步调试的时候却发现并没有直接进入到我们设想的RegistryProtocol类中

    这个需要追踪到Dubbo SPI中的cachedWrapperClasses数据处理中

    image

    上述代码已经很清楚了,获取wrapper,首先不应该被Adaptive注解(未贴出),其次一定得存在包含了参数为type的构造函数,而如下文件则是protocol的spi文件,可以知道只有filter和listener符合操作

    image image

    这样就非常清楚了,在获取getExtension中,我们应该是获取到了RegistryProtocol对象,但是后续的cachedWrapperClasses操作又加上了包装操作,先后加入了ProtocolFilterWrapper、ProtocolListenerWrapper对象,使得在后续protocol.export操作不是进入到RegistryProtocol中,而是首先进入到ProtocolFilterWrapper

    ProtocolFilterWrapper 类

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            // 如果invoke的协议是registry类型
           return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    

    然后来到了ProtocolListenerWrapper类

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            // 如果invoke的协议是registry类型
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }
    

    3.2、注册协议 暴露

    不经过任何处理,通过两个wrapper的转发,直接来到RegistryProtocol的export操作

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        // 本地暴露服务
        final Registry registry = getRegistry(originInvoker);
        // 获取远程注册中心
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        // 注册到注册中心的URL信息,其中包含了一个接口的信息以及协议等信息
        registry.register(registedProviderUrl);
        // 注册操作,如果查看zk记录,可以发现注册成功的操作日志
        
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }
    

    3.2.1、获取远程控制中心地址 getRegistry

    private Registry getRegistry(final Invoker<?> originInvoker){
        URL registryUrl = originInvoker.getUrl();
        // 获取的invoke的URL信息
        // registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?
        //application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo
        //%3A%2F%2F192.168.10.123%3A20880%2Fcom.jwfy.dubbo.product.ProductService
        //%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom
        //%26dubbo%3D2.5.3%26interface%3Dcom.jwfy.dubbo.product.ProductService%26methods
        // %3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D12663%26side%3Dprovider
        // %26timestamp%3D1525733684167%26token%3Dfdfdf&
        // group=dubbo-demo&owner=jwfy&pid=12663&registry=zookeeper&timestamp=1525733671009
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
           // 如果协议是register
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            // 默认协议是dubbo
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
            // 替换协议为dubbo,并移除参数中的注册数据
            // 现在的URL信息是
            // zookeeper://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?
            // application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.10.123%3A20880
            //%2Fcom.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3
            //%26interface%3Dcom.jwfy.dubbo.product.ProductService%26
            //methods%3Dprint%2CgetStr%26owner%3Djwfy%26pid%3D12663%26
            // side%3Dprovider%26timestamp%3D1525733684167%26
            //token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=12663&timestamp=1525733671009
        }
        return registryFactory.getRegistry(registryUrl);
    }
    

    registryFactory 同样是在RegistryProtocol实例完后注入的动态对象

    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
        public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
        
            if (arg0 == null) throw new IllegalArgumentException("url == null");
    
            com.alibaba.dubbo.common.URL url = arg0;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    
            if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
    
            com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
    
            return extension.getRegistry(arg0);
        }
    }
    

    对应实现的对象是ZookeeperRegistryFactory,调用其getRegistry方法,来到了AbstractRegistryFactory类

    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // 获取的key是zookeeper://127.0.0.1:2182/dubbo-demo/com.alibaba.dubbo.registry.RegistryService
        // 锁定注册中心获取过程,保证注册中心单一实例
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            // 从缓存获取注册中心,REGISTRIES是一个线程安全的map
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            // 创建一个注册中心ZookeeperRegistry
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // 释放锁
            LOCK.unlock();
        }
    }
    

    AbstractRegistry 类

    public AbstractRegistry(URL url) {
        setUrl(url);
        // 启动文件保存定时器
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
        // 文件名为/Users/XXX/.dubbo/dubbo-registry-127.0.0.1.cache
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
                if(! file.getParentFile().mkdirs()){
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        loadProperties();
        // 从.cache 文件中获取已经存储的zk信息
        notify(url.getBackupUrls());
        // 通知订阅
    }
    
    image image

    来到订阅notify方法

    protected void notify(List<URL> urls) {
        if(urls == null || urls.isEmpty()) return;
        
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            // 获取订阅者
            URL url = entry.getKey();
            
            if(! UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                        // 真正的通知触发
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " +  urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }
    
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.size() == 0) 
                && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 处理的URL信息u和url是匹配的
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        // 一个类目下存储的多个URL信息
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);
            // 存储到文件中,对于上面的读取文件
            listener.notify(categoryList);
            // 由监听器掌握处理
        }
    }
    

    FailbackRegistry 类

    public FailbackRegistry(URL url) {
        super(url);
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 设置重试的时间,默认为5s,当链接失败就会触发这个重连操作
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // 检测并连接注册中心
                try {
                    retry();
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
    

    链接失败时,进行重试的操作就是在这里进行的,retry就是获取当前registry中的failedRegistered等信息,如果failedRegistered中有URL信息存在,意味着之前存在链接失败的情况,现在执行retry进行重连操作

    ZookeeperRegistry 类

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        // dubbo 分组概念 group
        if (! group.startsWith(Constants.PATH_SEPARATOR)) {
            // 如果组名称不是/开头,则添加该数据
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        // 调用zk的操作方式连接注册中心,并持有该client
        zkClient.addStateListener(new StateListener() {
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
    

    通过上述操作得到的注册中心对象实例,并且其URL为zookeeper://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService?application=dubbo-demo&client=zkclient&dubbo=2.5.3&group=dubbo-demo&interface=com.alibaba.dubbo.registry.RegistryService&owner=jwfy&pid=12663&timestamp=1525733671009

    3.2.2、注册到注册中心

    final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
    // 先获取invoke对象的注册URL
    registry.register(registedProviderUrl);
    // 注册到注册中心
    // 此时观察zk的日志会发现注册操作
    

    从invoke对错获取的注册URL是dubbo://192.168.10.123:20880/com.jwfy.dubbo.product.ProductService?anyhost=true&application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&interface=com.jwfy.dubbo.product.ProductService&methods=print,getStr&owner=jwfy&pid=12663&side=provider&timestamp=1525733684167&token=fdfdf

    其包含了当前bean的基本信息,把这些信息提交给注册中心,服务使用方就可以获取到这些数据,然后反转生成invoke去调用执行

    FailbackRegistry 类

    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向服务器端发送注册请求
            // 真正调用ZK包的接口注册到zk注册中心
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;
    
            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if(skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
    
            // 将失败的注册请求记录到失败列表,定时重试
            // 上面的rety方法就有使用这个failedRegistered容器内的数据
            failedRegistered.add(url);
        }
    }
    

    以下就是zk的输出日志,可以很清晰的看到确实创建了节点信息

    2018-05-08 22:52:52,808 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn$Factory@251] - Accepted socket connection from /127.0.0.1:55708
    2018-05-08 22:52:52,926 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@770] - Client attempting to renew session 0x163267acb7f000b at /127.0.0.1:55708
    2018-05-08 22:52:52,941 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@1573] - Invalid session 0x163267acb7f000b for client /127.0.0.1:55708, probably expired
    2018-05-08 22:52:52,949 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@1435] - Closed socket connection for client /127.0.0.1:55708 which had sessionid 0x163267acb7f000b
    2018-05-08 22:52:53,437 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn$Factory@251] - Accepted socket connection from /127.0.0.1:55709
    2018-05-08 22:52:53,444 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2182:NIOServerCnxn@777] - Client attempting to establish new session at /127.0.0.1:55709
    2018-05-08 22:52:53,454 - INFO  [SyncThread:0:NIOServerCnxn@1580] - Established session 0x163267acb7f000c with negotiated timeout 30000 for client /127.0.0.1:55709
    2018-05-08 22:52:56,957 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo Error:KeeperErrorCode = NodeExists for /dubbo-demo
    2018-05-08 22:52:57,018 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo/com.jwfy.dubbo.product.ProductService Error:KeeperErrorCode = NodeExists for /dubbo-demo/com.jwfy.dubbo.product.ProductService
    2018-05-08 22:52:57,029 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x163267acb7f000c type:create cxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/dubbo-demo/com.jwfy.dubbo.product.ProductService/providers Error:KeeperErrorCode = NodeExists for /dubbo-demo/com.jwfy.dubbo.product.ProductService/providers
    

    到这里服务注册到注册中心就已经完成了,同时还伴随着从文件加载注册信息和保存注册信息,可自行通过zKcli命令去

    3.2.3、暴露服务之注册

    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        // invoke的URL信息是registry://127.0.0.1:2182/com.alibaba.dubbo.registry.RegistryService? 
        // application=dubbo-demo&client=zkclient&dubbo=2.5.3&export=dubbo%3A%2F%2F172.16.109.110%3A20880%2F
        // com.jwfy.dubbo.product.ProductService%3Fanyhost%3Dtrue%26application%3D
        // dubbo-demo%26default.loadbalance%3Drandom%26dubbo%3D2.5.3%26
        // interface%3Dcom.jwfy.dubbo.product.ProductService%26methods%3Dprint%2CgetStr%26
        // owner%3Djwfy%26pid%3D13375%26side%3Dprovider%26timestamp%3D1525749656129%26
        // token%3Dfdfdf&group=dubbo-demo&owner=jwfy&pid=13375&registry=zookeeper&
        // timestamp=1525749652060
        String key = getCacheKey(originInvoker);
        // key就是dubbo://172.16.109.110:20880/com.jwfy.dubbo.product.ProductService?  
        // anyhost=true&application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&interface=com.jwfy.dubbo.product.ProductService
        // &methods=print,getStr&owner=jwfy&pid=13375&side=provider&timestamp=1525749656129&token=fdfdf
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    // 又出现了double-check操作
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    // 包装类
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }
    

    在上面的protocol.export操作中,protocol也不是DubboProtocol本身,而是包装了ProtocolFilterWrapper、ProtocolListenerWrapper,协议不是register,各种处理之后进入到DubboProtocol的export进行暴露操作。

    3.2.4、网络端口开启

    DubboProtocol 类

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // URL是dubbo://172.16.109.110:20880/com.jwfy.dubbo.product.ProductService?anyhost=true&
        //application=dubbo-demo&default.loadbalance=random&dubbo=2.5.3&
        //interface=com.jwfy.dubbo.product.ProductService&methods=print,getStr&owner=jwfy
       // &pid=13375&side=provider&timestamp=1525749656129&token=fdfdf
        
        // export service.
        String key = serviceKey(url);
        // key是com.jwfy.dubbo.product.ProductService:20880
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 组成成为一个新的包装类DubboExporter
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    
        openServer(url);
        // 来了,最关键的时候
        
        return exporter;
    }
    
    private ExchangeServer createServer(URL url) {
        //默认开启server关闭时发送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //默认开启heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        // RPC服务的名称,默认是netty
    
        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
            // 绑定操作
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
    

    Exchangers 类

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
        // getExchanger 是通过SPI返回一个HeaderExchanger对象
    }
    

    HeaderExchanger 类

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // 包装一个HeaderExchangeHandler对象
        // 再包装一个DecodeHandler对象
        // Transporters.bind绑定操做
        // 最后包装成HeaderExchangeServer返回
        return new HeaderExchangeServer(
            Transporters.bind(url, 
                new DecodeHandler(
                    new HeaderExchangeHandler(handler)
                )
            )
        );
    }
    

    其中Transporters.bind会先获取当前可用的其中Transporters,默认也是NettyTransporter对象,调用其bind方法
    new NettyServer(url, listener),来到了AbstractServer类

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        // 获取本地套接字地址,也就是IP端口
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        // 绑定地址,如果是本地项目,host为"0.0.0.0"
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            // 连接打开,当前默认是使用的NettyServer里的方法
            // 关于Netty的操作细节目前也不是很理解,后续补充
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }
    

    其最终返回一个NettyService,服务端已经开启了,客户端可以连接了,继续跳入到HeaderExchangeServer中

    3.2.5、开启心跳检测

    HeaderExchangeServer 类

    public HeaderExchangeServer(Server server) {
        // 此处传递的service就是上面生成的NettyService
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        // 开始心跳检测
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        // 默认是0(可是没找到在哪设置为了6000,也就是6s)
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        // 定时任务间隔期 6*3 = 18s
        // 如果没有设置心跳检测的间隔期,则设置为心跳延迟时间的3倍
        if (heartbeatTimeout < heartbeat * 2) {
            // 设置的时间不符合要求
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
        // 开启心跳检测
    }
    

    心跳检测最后真正执行的任务是如下代码

    HeartBeatTask 类

    public void run() {
        try {
            long now = System.currentTimeMillis();
            // 具体所有IO的channel是通过HeaderExchangeServer.this.getChannels()获取到的
            for ( Channel channel : channelProvider.getChannels() ) {
                if (channel.isClosed()) {
                    // 确保可用的channel
                    continue;
                }
                try {
                    Long lastRead = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP );
                    Long lastWrite = ( Long ) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
                    if ( ( lastRead != null && now - lastRead > heartbeat )
                            || ( lastWrite != null && now - lastWrite > heartbeat ) ) {
                        Request req = new Request();
                        req.setVersion( "2.0.0" );
                        req.setTwoWay( true );
                        req.setEvent( Request.HEARTBEAT_EVENT );
                        channel.send( req );
                        // 发生心跳检测包 req是内容
                        if ( logger.isDebugEnabled() ) {
                            logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress()
                                                  + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" );
                        }
                    }
                    if ( lastRead != null && now - lastRead > heartbeatTimeout ) {
                        // channel因为超时可能存在关闭的情况
                        logger.warn( "Close channel " + channel
                                             + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" );
                        if (channel instanceof Client) {
                            try {
                                ((Client)channel).reconnect();
                            }catch (Exception e) {
                                //do nothing
                            }
                        } else {
                            channel.close();
                        }
                    }
                } catch ( Throwable t ) {
                    logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t );
                }
            }
        } catch ( Throwable t ) {
            logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
        }
    }
    

    到此整个的服务暴露就全部结束了

    相关文章

      网友评论

      • IT人故事会:老铁,经常看别人的分享.感谢别人的分享,感谢!关注了

      本文标题:Dubbo 服务暴露 源码学习(下)(四)

      本文链接:https://www.haomeiwen.com/subject/kzvwrftx.html