美文网首页Dubbo源码解析我爱编程
Dubbo之ZookeeperRegistry源码分析

Dubbo之ZookeeperRegistry源码分析

作者: 土豆肉丝盖浇饭 | 来源:发表于2018-06-21 15:53 被阅读16次

    ZookeeperRegistry的作用

    ZookeeperRegistry是dubbo中常用的注册中心实现,它主要作用通过Zookeeper的目录监听机制,让消费者能够实时得到在线的提供者列表。并且一些服务治理的功能也是通过zookeeper这个监听特性巧妙的完成。

    在具体讲解ZookeeperRegistry的相关源码之前,先来分析下dubbo在zookeeper的目录结构以及dubbo如何利用这个特性

    Zookeeper目录结构

    dubbo在zookeeper建立的目录是基于接口的,大致如下


    image.png

    针对每个接口节点会存在以下4个子节点

    节点名 作用 子节点是否持久节点
    consumers 存储消费者节点url
    configuators 存储override或者absent url,用于服务治理
    routers 用于设置路由url,用于服务治理
    providers 存储在线提供者url

    consumer节点存在的意义并不大,主要还是为了做监控
    其他三个节点,都会设置被相应的监听器,发生改变时,会触发特定事件

    Dubbo对Zookeeper监听机制的利用

    Dubbo中通过ZookeeperClient的实现类来对zookeeper进行操作


    image.png

    ZookeeperClient提供设置两种监听器的方法,对应子节点监听器和状态监听器,这里我们关注子节点监听器ChildListener

    public interface ChildListener {
    
        /**
         *
         * @param path 监听的节点
         * @param children 监听的节点的所有子节点
         */
        void childChanged(String path, List<String> children);
    
    }
    
    

    ZookeeperClient有两种实现,第一种通过官方提供的jar包,第二个通过Apache的Curator框架,默认使用第二种,我们讲解的也是Curator的对应实现
    添加子节点监听器的方法为addChildListener

    public List<String> addChildListener(String path, final ChildListener listener) {
            //对listener做缓存,因为ChildListener是dubbo提供的监听器接口,需要转换为cruator的监听器接口
            ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
            if (listeners == null) {
                childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
                listeners = childListeners.get(path);
            }
            TargetChildListener targetListener = listeners.get(listener);
            if (targetListener == null) {
                //createTargetChildListener会对监听器进行转换
                listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
                targetListener = listeners.get(listener);
            }
            return addTargetChildListener(path, targetListener);
        }
    

    Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现
    addTargetChildListener方法在添加监听器之后会返回监听path当前的所有的子节点

    public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
            try {
                //添加监听,并且返回这个目录当前所有子节点
                //这种监听方式是一次性的,在listener实现中会再次执行监听逻辑
                return client.getChildren().usingWatcher(listener).forPath(path);
            } catch (NoNodeException e) {
                return null;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    

    上述代码需要注意监听是一次性的,其实curator提供了TreeCache用作永久性的监听,这边不用到这个特性,应该是为了和官方API保持一致吧。
    接下去看下Cruator监听器的封装

    public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
            return new CuratorWatcherImpl(listener);
        }
    
    private class CuratorWatcherImpl implements CuratorWatcher {
    
            private volatile ChildListener listener;
    
            public CuratorWatcherImpl(ChildListener listener) {
                this.listener = listener;
            }
    
            public void unwatch() {
                this.listener = null;
            }
    
            @Override
            public void process(WatchedEvent event) throws Exception {
                if (listener != null) {
                    String path = event.getPath() == null ? "" : event.getPath();
                    listener.childChanged(path,
                            // if path is null, curator using watcher will throw NullPointerException.
                            // if client connect or disconnect to server, zookeeper will queue
                            // watched event(Watcher.Event.EventType.None, .., path = null).
                            StringUtils.isNotEmpty(path)
                                    //再次设置监听,并且把监听path的所有子节点传入childChanged方法
                                    ? client.getChildren().usingWatcher(this).forPath(path)
                                    : Collections.<String>emptyList());
                }
            }
        }
    

    可以看到listener的触发逻辑以及入参来源

    源码分析

    image.png

    通过ZookeeperRegistry的类继承图,逐上而下的分析源码

    Registry接口

    public interface Registry extends Node, RegistryService {
    }
    

    Registry继承Node和RegistryService两个接口,本身不提供接口方法

    public interface Node {
    
        /**
         * get url.
         *
         * @return url.
         */
        URL getUrl();
    
        /**
         * is available.
         *
         * @return available.
         */
        boolean isAvailable();
    
        /**
         * destroy.
         */
        void destroy();
    
    }
    

    Node约束了三个生命周期相关的方法
    getUrl用于获取当前组件的url配置
    isAvailable检测组件是否可用
    destroy用于销毁组件

    public interface RegistryService {
    
       
        void register(URL url);
    
        void unregister(URL url);
    
        void subscribe(URL url, NotifyListener listener);
    
        void unsubscribe(URL url, NotifyListener listener);
    
        List<URL> lookup(URL url);
    
    }
    

    RegistryService规定了和注册中心相关的方法
    register和unregister用于提供者向注册中心注册提供者url
    subscribe和unsubscribe用于消费者向对应接口目录注册监听
    lookup用于查找查找url,通过消费者url查找提供者url以及服务治理有关的url

    AbstractRegistry

    主要提供接口提供者本地缓存功能
    以及基础register,unregister,subscribe,unsubscribe,notify,lookup,recover逻辑

    register,unregister会(接触)注册提供者url,主要操作

    private final Set<URL> registered = new ConcurrentHashSet<URL>();
    

    subscribe,unsubscribe则会针对特定url提供监听,主要操作

    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    

    notify方法会缓存最近通知的url到notified以及触发listener回调

    /**
         * 这个方法不会直接触发,被FailbackRegistry重载
         * FailbackRegistry增加failback逻辑后,还是会调用这个方法
         * @param url
         * @param listener
         * @param urls
         */
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((urls == null || urls.isEmpty())
                    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
            Map<String, List<URL>> result = new HashMap<String, List<URL>>();
            //根据url的category进行分类
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                    List<URL> categoryList = result.get(category);
                    if (categoryList == null) {
                        categoryList = new ArrayList<URL>();
                        result.put(category, categoryList);
                    }
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
            //下面操作notified缓存
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified == null) {
                notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
                categoryNotified = notified.get(url);
            }
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                //对notified内容进行覆盖,相当于会保存上一次的通知
                categoryNotified.put(category, categoryList);
                //每次通知后会刷新本地缓存
                saveProperties(url);
                //进行listener回调,每种category的url分别回调一次
                listener.notify(categoryList);
            }
        }
    

    这个类的recover方法不分析,因为FailbackRegistry完全重写了这个方法

    FailbackRegistry

    FailbackRegistry重载了AbstractRegistry中的subscribe,unsubscribe,register,unregister,notify方法,在AbstractRegistry的基础上提供了失败重试机制,并且暴露模板方法doRegister,doUnregister,doSubscribe,doUnsubscribe让不同类型的注册中心实现。doNotify还是默认父类的逻辑。
    同时也重载了recover方法,通过FailbackRegistry的重试机制实现recover

    以registry方法作为样例看下添加的重试机制

    /**
         * register行为,提供者使用
         * 在AbstractRegistry的基础上,增加失败重试机制
         * @param url
         */
        @Override
        public void register(URL url) {
            super.register(url);
            //这里成功,会删除failedRegistered,failedUnregistered中的url
            failedRegistered.remove(url);
            failedUnregistered.remove(url);
            try {
                // Sending a registration request to the server side
                //具体register逻辑交给子类实现
                doRegister(url);
            } catch (Exception e) {
                Throwable t = e;
    
                // If the startup detection is opened, the Exception is thrown directly.
                //如果注册中心或者提供者url的check为false的话,跳过抛出异常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true)
                        && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
                //如果是注册的时候,抛出这个异常,那么也会忽略,只打日志
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
    
                // Record a failed registration request to a failed list, retry regularly
                //加入到失败重试集合
                failedRegistered.add(url);
            }
        }
    

    注册失败后会把需要注册重试的url放入failedRegistered集合
    然后在FailbackRegistry构造函数中起的定时任务会进行重试

    public FailbackRegistry(URL url) {
            super(url);
            this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
            //重试的定时线程,使用future用于取消
            this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    // Check and connect to the registry
                    try {
                        retry();
                    } catch (Throwable t) { // Defensive fault tolerance
                        logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                    }
                }
            }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
        }
    

    retry方法的具体逻辑,就是循环遍历这些失败集合,然后调用doXXX方法进行重试

    recover方法会在和Zookeeper重连时触发,在断连状态下,dubbo进程内的注册,订阅行为是会被缓存下来的,然后对所有缓存的url进行重新注册,订阅。
    这边有个细节点,可以看到failedRegistered这些集合使用的都是线程安全的集合

    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    

    因为recover,retry这两个操作还是存在资源竞争的,但不仅限于这两个操作

    ZookeeperRegistry

    ZookeeperRegistry的工作就是通过Zookeeper API实现doRegister,doUnregister,doSubscribe,doUnsubscribe具体逻辑

    首先来看下ZookeeperRegistry的构造函数,做的主要工作是初始化zk客户端

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
            super(url);
            if (url.isAnyHost()) {
                throw new IllegalStateException("registry address == null");
            }
            //如果不进行配置,默认dubbo根目录就是/dubbo
            String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
            if (!group.startsWith(Constants.PATH_SEPARATOR)) {
                group = Constants.PATH_SEPARATOR + group;
            }
            this.root = group;
            zkClient = zookeeperTransporter.connect(url);
            //zookeeper添加重连回调,会触发recover方法,进行失败任务重试
            //为什么FailbackRegistry都是用线程安全的集合,因为在这里存在线程竞争资源
            zkClient.addStateListener(new StateListener() {
                @Override
                public void stateChanged(int state) {
                    if (state == RECONNECTED) {
                        try {
                            recover();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            });
        }
    

    使用zookeeperTransporter扩展点加载zk客户端实现,默认为Curator框架

    @SPI("curator")
    public interface ZookeeperTransporter {
    
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        ZookeeperClient connect(URL url);
    
    }
    

    注册/取消注册实现

    然后再来看doRegister和doUnregister方法,对于zk来说,就是创建目录呗

    /**
         * 注册的逻辑,就是在zookeeper创建节点,节点路径为toUrlPath(url)
         * 具体格式为 /{group}/{interfaceName}/{category}/{url.toFullString}
         * DYNAMIC_KEY表示是否创建永久节点,true表示不是,断开连接后会消失,所以需要进行recover
         * @param url
         */
        @Override
        protected void doRegister(URL url) {
            try {
                zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
            } catch (Throwable e) {
                throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    
    /**
         * 取消注册,就是删除那个节点
         * @param url
         */
        @Override
        protected void doUnregister(URL url) {
            try {
                zkClient.delete(toUrlPath(url));
            } catch (Throwable e) {
                throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    需要注意下节点的路径生成格式,也就是toUrlPath(url)方法,格式为 /{group}/{interfaceName}/{category}/{url.toFullString},
    group一般不配置的话为dubbo,
    interfaceName对应具体接口,
    category开始就讲过,分为consumers,configuators,routers,providers
    url.toFullString就是我们的url配置

    对于registry来讲category=providers

    取消注册就是对应删除那个节点

    订阅/取消订阅实现

    订阅的行为对于消费者来讲,用于获取providers和routers,用于得到路由后的提供者
    对于提供者来讲,订阅configuators,通过新的配置重新暴露
    在ZookeeperRegistry,我们只关注如何进行订阅,具体监听器的作用,在用到的模块再讲
    doSubscribe方法支持订阅全局和订阅特定接口
    如果interface=*,即订阅全局,对于新增和已存在的所有接口的改动都会触发回调
    如果interface=特定接口,那么只有这个接口的子节点改变时,才触发回调

    @Override
        protected void doSubscribe(final URL url, final NotifyListener listener) {
            try {
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                for (String child : currentChilds) {
                                    child = URL.decode(child);
                                    if (!anyServices.contains(child)) {
                                        anyServices.add(child);
                                        //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                                        //这里是用来对/dubbo下面提供者新增时的回调,相当于增量
                                        subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                                    }
                                }
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(root, false);
                    //添加监听器会返回子节点集合
                    List<String> services = zkClient.addChildListener(root, zkListener);
                    if (services != null && !services.isEmpty()) {
                        for (String service : services) {
                            service = URL.decode(service);
                            anyServices.add(service);
                            //如果consumer的interface为*,会订阅每一个url,会触发另一个分支的逻辑
                            //这里的逻辑只执行一次,一次全量
                            subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                } else {
                    //这边是针对明确interface的订阅逻辑
                    List<URL> urls = new ArrayList<URL>();
                    //针对每种category路径进行监听
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                        if (listeners == null) {
                            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                            listeners = zkListeners.get(url);
                        }
                        ChildListener zkListener = listeners.get(listener);
                        if (zkListener == null) {
                            //封装回调逻辑
                            listeners.putIfAbsent(listener, new ChildListener() {
                                @Override
                                public void childChanged(String parentPath, List<String> currentChilds) {
                                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                                }
                            });
                            zkListener = listeners.get(listener);
                        }
                        //创建节点
                        zkClient.create(path, false);
                        //增加回调
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
                    //如果有子节点,直接进行触发一次,对应AbstractRegsitry的lookup方法
                    //意思就是第一次订阅,如果订阅目录存在子节点,直接会触发一次
                    notify(url, listener, urls);
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    这边需要注意一点的是,每次进行订阅,最重要的第一次,会使用当前订阅节点的子节点数据触发一次notify,执行对应监听器逻辑,这个在后面RegistryDirectory中会用到这个特性

    取消订阅没什么好讲的,删除订阅数据即可

    讲了这么多,对于lookup方法,使用消费者查找提供者的逻辑其实也很简单。使用消费者url构造出zk中provider的目录,然后返回所有子节点即可

    /**
         * 查找消费者url 对应 提供者url实现
         * 这边的url为消费者url
         * @param url
         * @return
         */
        @Override
        public List<URL> lookup(URL url) {
            if (url == null) {
                throw new IllegalArgumentException("lookup url == null");
            }
            try {
                List<String> providers = new ArrayList<String>();
                //返回inteface下面所有category的url
                for (String path : toCategoriesPath(url)) {
                    List<String> children = zkClient.getChildren(path);
                    if (children != null) {
                        providers.addAll(children);
                    }
                }
                //返回匹配的url
                return toUrlsWithoutEmpty(url, providers);
            } catch (Throwable e) {
                throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    

    总结

    1.Zookeeper监听器的妙用,在Elasticjob也是使用到了这个特性,进行任务触发
    2.通过zookeeperTransporter以及ZookeeperClient对Zookeeper操作进行抽象,进而支持两种zookeeper客户端框架。包括在remoting模块也是采用这种设计模式,和底层框架解耦。
    3.Zookeeper默认的监听是一次性的,Curator框架实现了永久监听,但是dubbo没用到Curator这个特性。
    4.写完这部分,Dirctory模块就比较容易写下去了,东西太多,有些地方的理解肯定存在偏差,希望读者能多多交流

    最后

    希望大家关注下我的公众号


    image

    相关文章

      网友评论

        本文标题:Dubbo之ZookeeperRegistry源码分析

        本文链接:https://www.haomeiwen.com/subject/bscasftx.html