美文网首页
Zookeeper Watcher实现分析

Zookeeper Watcher实现分析

作者: huiwq1990 | 来源:发表于2017-06-28 09:09 被阅读0次

    Watcher向服务器提交

    整体流程是将Watcher与节点信息发送到服务器。

    org.apache.zookeeper.ZooKeeper#getChildren

       public void getChildren(final String path, Watcher watcher,
                ChildrenCallback cb, Object ctx)
        {
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
    // 创建ChildWatchRegistration
            WatchRegistration wcb = null;
            if (watcher != null) {
                wcb = new ChildWatchRegistration(watcher, clientPath);
            }
    
            final String serverPath = prependChroot(clientPath);
    
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.getChildren);
            GetChildrenRequest request = new GetChildrenRequest();
            request.setPath(serverPath);
            request.setWatch(watcher != null);
            GetChildrenResponse response = new GetChildrenResponse();
    // 创建package
            cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                    clientPath, serverPath, ctx, wcb);
        }
    

    org.apache.zookeeper.ClientCnxn#queuePacket

        Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                Record response, AsyncCallback cb, String clientPath,
                String serverPath, Object ctx, WatchRegistration watchRegistration)
        {
            Packet packet = null;
            synchronized (outgoingQueue) {
    // 将watchRegistration封装到package中
                packet = new Packet(h, r, request, response, watchRegistration);
                packet.cb = cb;
                packet.ctx = ctx;
                packet.clientPath = clientPath;
                packet.serverPath = serverPath;
                if (!state.isAlive() || closing) {
                    conLossPacket(packet);
                } else {
                    if (h.getType() == OpCode.closeSession) {
                        closing = true;
                    }
                    outgoingQueue.add(packet);
                }
            }
            sendThread.getClientCnxnSocket().wakeupCnxn();
            return packet;
        }
    

    Wacher管理器

    ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher及默认的Watcher。Watcher被分为dataWatches,existWatches,childWatches。

    org.apache.zookeeper.ZooKeeper.ZKWatchManager

        private static class ZKWatchManager implements ClientWatchManager {
            private final Map<String, Set<Watcher>> dataWatches =
                new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> existWatches =
                new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> childWatches =
                new HashMap<String, Set<Watcher>>();
    // zk初始化的时候会设置这个值
    //new ZooKeeper("localhost:2181", 2000, new DefualtWatcher());
            private volatile Watcher defaultWatcher;
    

    Watcher Client注册机制

    image.png

    当请求包成功返回后,触发org.apache.zookeeper.ZooKeeper.WatchRegistration#register方法。

    // 根据packet的返回值决定是否要注册Watcher
           protected boolean shouldAddWatch(int rc) {
                return rc == 0;
            }
    // ChildWatcher org.apache.zookeeper.ZooKeeper.ChildWatchRegistration
            protected Map<String, Set<Watcher>> getWatches(int rc) {
                return watchManager.childWatches;
            }
    
            public void register(int rc) {
                if (shouldAddWatch(rc)) {
                    Map<String, Set<Watcher>> watches = getWatches(rc);
    // 需要加锁
                    synchronized(watches) {
    // 获取这个目录的watcher
                        Set<Watcher> watchers = watches.get(clientPath);
                        if (watchers == null) {
                            watchers = new HashSet<Watcher>();
                            watches.put(clientPath, watchers);
                        }
                        watchers.add(watcher);
                    }
                }
            }
    

    Watcher怎么触发

    Watcher为什么重新注册

    Watcher服务端实现

    http://xkorey.iteye.com/blog/2204495
    http://blog.jobbole.com/104833/
    https://github.com/llohellohe/zookeeper
    https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html

    相关文章

      网友评论

          本文标题:Zookeeper Watcher实现分析

          本文链接:https://www.haomeiwen.com/subject/knaecxtx.html