美文网首页分布式程序员技术干货
zk源码阅读11:watch之模型以及client端存储

zk源码阅读11:watch之模型以及client端存储

作者: 赤子心_d709 | 来源:发表于2017-06-22 16:32 被阅读0次

    1.摘要

    前面讲完了ACL,QUOTAS,DataNode等等,最后讲一下WatchManager,讲完了就可以讲DataTree了
    watchManager涉及watch机制,内容较多,又要针对watch进行展开了

    本节讲解

    Watcher相关类简介,类图说明
    Watcher的意义,通知状态(keeperState)与事件类型(EventType)
    WatchedEvent 和 WatcherEvent 描述zk检测到变化的事件,以及对应用于网络传输的封装类
    ClientWatchManager接口以及实现类ZKWatchManager :client端完成根据Event找到需要触发的watches
    WatcherSetEventPair 将Event以及对应需要触发的watches集合进行组合绑定
    

    2.简介

    UML图如下,红色线代表内部类


    watcher相关类图

    主要的类简介如下:

    Watcher,接口类型,其定义了process方法,另外定义内部类Event,再包含内部类KeeperState和EventType来描述Event发生时zk的状态以及对应event类型
    WatchedEvent,代表zk上一个Watcher能够回应的变化,包含了变化事件的类型,zk状态以及变化影响的znode的path
    WatcherEvent : 是WatchedEvent用于网络传输的封装类
    ClientWatchManager:接口,根据Event得到需要通知的watcher
    ZKWatchManager为ClientWatchManager的实现
    

    下面进行源码讲解

    3.Watcher

    Watcher是什么

    ZK中引入Watcher机制来实现分布式的通知功能
    ZK允许客户端向服务端注册一个Watcher监听,当服务点的的指定事件触发监听时,那么服务端就会向客户端发送事件通知,以便客户端完成逻辑操作(即客户端向服务端注册监听,并将watcher对象存在客户端的Watchermanager中
    服务端触发事件后,向客户端发送通知,客户端收到通知后从wacherManager中取出对象来执行回调逻辑)
    

    特性

    一次性:一旦一个watcher被触发,ZK都会将其从相应的的存储中移除,所以watcher是需要每注册一次,才可触发一次。
    客户端串行执行:客户端watcher回调过程是一个串行同步的过程
    轻量:watcher数据结构中只包含:通知状态、事件类型和节点路径
    

    在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。

    类图如下


    Watcher类图

    简单介绍上面类图就是

    Watcher接口拥有process函数,用于处理回调
    内部类Event又包含内部类KeeperState以及EventType
    KeeperState用于记录Event发生时的zk状态(通知状态)
    EventType用于记录Event的类型
    

    3.1方法process

    //回调函数实现该函数,表示根据event执行的行为
    abstract public void process(WatchedEvent event);
    

    3.2内部类Event

    包含KeeperState和EventType两个内部类,通过枚举类实现
    方法很简单,就是int值与对应枚举类型的转换
    两者的枚举类型以及两者之间的关系,触发条件可以参考《paxos到zk》中的图

    KeeperState与EventType一览表

    4.WatchedEvent 和 WatcherEvent

    WatchedEvent :代表zk上一个Watcher能够回应的变化,包含了变化事件的类型,zk状态以及变化影响的znode的path
    WatcherEvent : 是WatchedEvent用于网络传输的封装类
    

    WatchedEvent 类图如下


    WatchedEvent类图

    三个成员变量很好的解释了WatchedEvent的意义,即事件的类型,zk状态以及变化影响的znode的path
    方法基本都好理解,涉及WatcherEvent 有一个构造方法和一个getWrapper方法
    这里稍微强调一下 getWrapper方法

       /**
         *  Convert WatchedEvent to type that can be sent over network
         */
        //转化成可供网络传输,序列化的WatcherEvent
        public WatcherEvent getWrapper() {
            return new WatcherEvent(eventType.getIntValue(), 
                                    keeperState.getIntValue(), 
                                    path);
        }
    }
    

    WatcherEvent实现了Record接口,可以理解为WatchedEvent用于网络传输的封装类

    5.ClientWatchManager接口和实现类ZKWatchManager

    ClientWatchManager接口用户根据Event得到需要通知的watcher
    ZKWatchManager为ClientWatchManager的实现
    

    ClientWatchManager接口只有一个函数,源码分析如下

        //ClientWatchManager负责根据Event得到需要通知的watcher,该manager本身并不进行通知
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
            Watcher.Event.EventType type, String path);
    

    默认实现类ZKWatchManager,在Zookeeper类中,源码分析如下

    private static class ZKWatchManager implements ClientWatchManager {
            private final Map<String, Set<Watcher>> dataWatches =
                new HashMap<String, Set<Watcher>>();//针对内容的watch
            private final Map<String, Set<Watcher>> existWatches =
                new HashMap<String, Set<Watcher>>();//针对exist API相关的watch
            private final Map<String, Set<Watcher>> childWatches =
                new HashMap<String, Set<Watcher>>();//针对getChildren API相关的watch
    
            private volatile Watcher defaultWatcher;//client传递的,默认的watcher实现
    
            final private void addTo(Set<Watcher> from, Set<Watcher> to) {
                if (from != null) {
                    to.addAll(from);
                }
            }
    
            /* (non-Javadoc)
             * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
             *                                                        Event.EventType, java.lang.String)
             */
            @Override
            public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                            Watcher.Event.EventType type,
                                            String clientPath)
            {
                Set<Watcher> result = new HashSet<Watcher>();
    
                switch (type) {
                case None://eventType是null
                    // 则所有dataWatches,existWatches,childWatches都需要被通知,???为什么要这样干
                    result.add(defaultWatcher);//添加默认watcher
                    boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                            state != Watcher.Event.KeeperState.SyncConnected;//获取clear标记
    
                    synchronized(dataWatches) {
                        for(Set<Watcher> ws: dataWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            dataWatches.clear();
                        }
                    }
    
                    synchronized(existWatches) {
                        for(Set<Watcher> ws: existWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            existWatches.clear();
                        }
                    }
    
                    synchronized(childWatches) {
                        for(Set<Watcher> ws: childWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            childWatches.clear();
                        }
                    }
    
                    return result;
                case NodeDataChanged:
                case NodeCreated:
                    //如果节点内容变化或者创建
                    synchronized (dataWatches) {
                        addTo(dataWatches.remove(clientPath), result);//从dataWatches中移除,并且添加到result中
                    }
                    synchronized (existWatches) {
                        addTo(existWatches.remove(clientPath), result);//从existWatches中移除,并且添加到result中
                    }
                    break;
                case NodeChildrenChanged:
                    synchronized (childWatches) {
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                case NodeDeleted:
                    synchronized (dataWatches) {
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    // XXX This shouldn't be needed, but just in case
                    synchronized (existWatches) {
                        Set<Watcher> list = existWatches.remove(clientPath);
                        if (list != null) {
                            addTo(existWatches.remove(clientPath), result);
                            LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                        }
                    }
                    synchronized (childWatches) {
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                default://默认处理
                    String msg = "Unhandled watch event type " + type
                        + " with state " + state + " on path " + clientPath;
                    LOG.error(msg);
                    throw new RuntimeException(msg);
                }
                //返回结果
                return result;
            }
        }
    

    该方法在事件发生后,返回需要被通知的Watcher集合。
    是根据已经注册的watches(分为三类,data,children,exist),根据path找到对应的watches,得到一个result集合进行返回
    这里留下个疑问

    watches的注册是在哪里完成,这个后面再讲
    为什么碰到case None,所有watches都要被触发,这个目前不是很理解
    

    6.WatcherSetEventPair

    WatcherSetEventPair 将Event以及对应需要触发的watches集合进行组合绑定
    

    这个类在ClientCnxn中,代码很简单

        private static class WatcherSetEventPair {
            private final Set<Watcher> watchers;//事件触发需要被通知的watches集合
            private final WatchedEvent event;//事件
    
            public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
                this.watchers = watchers;
                this.event = event;
            }
        }
    

    7.思考

    Watcher.Event.KeeperState

    这个可以叫成通知状态,也可以理解为事件发生时的zk状态

    watcher特性中,"一次性"在client端的体现

    ZooKeeper.ZKWatchManager#materialize 中可以看到
    被触发的watches从相应的类别(data,children,exist)中删除了,所以在client端是一次性的

    为什么需要WatcherSetEventPair 这个类

    因为watcher接口process函数需要event参数
    那么在ClientWatchManager完成了根据event找到对应的watchers之后
    就可以直接调用watcher.process(event)了

    但是!!!由于ClientCnxn.EventThread是异步处理的,通过生产消费完成
    在processEvent的函数中,要取出一个数据结构Object,既包含watchers集合,又要包含event,所以就把两者组合在一起出现了WatcherSetEventPair 这个类

    watcher特性中,"一次性"在server端的体现

    在下面几讲WatchManager会讲

    ZooKeeper.ZKWatchManager#materialize 里面三个watches的注册是如何完成的

    这一块的代码只有三个watches的remove操作
    这个在watch机制中会讲

    8.问题

    ZooKeeper.ZKWatchManager#materialize 为什么碰到case None,所有watches都要被触发

    这个目前不是很理解,不知道为什么要这样设计

    9.refer

    概念
    http://www.cnblogs.com/leesf456/p/6286827.html
    http://blog.csdn.net/qianshangding0708/article/details/50084155
    http://blog.csdn.net/u012291108/article/details/59698624
    《paxos到zk》第7章

    相关文章

      网友评论

        本文标题:zk源码阅读11:watch之模型以及client端存储

        本文链接:https://www.haomeiwen.com/subject/icstcxtx.html