美文网首页
深入浅出Zookeeper源码(四):Watch实现剖析

深入浅出Zookeeper源码(四):Watch实现剖析

作者: 泊浮目 | 来源:发表于2020-04-08 20:19 被阅读0次

    本文首发于泊浮目的简书:https://www.jianshu.com/u/204b8aaab8ba

    版本 日期 备注
    1.0 2020.4.8 文章首发
    1.1 2020.4.18 优化小结部分描述,使其更加详细易懂
    1.2 2020.8.10 删除部分不重要的代码,减少篇幅,优化描述方式
    1.3 2021.6.23 标题从深入浅出Zookeeper(四):Watch实现剖析改变为深入浅出Zookeeper源码(四):Watch实现剖析

    1. 前言

    本文面向读者:有java基础并对zk源码实现感兴趣的同学。

    用过zookeeper的同学都知道watch是一个非常好用的机制,今天我们就来看看它的实现原理。

    在正文开始前,我们先来简单回忆一下watch是什么?

    zk提供了分布式数据的发布/订阅功能——即典型的发布订阅模型,其定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态变化时,则会通知所有订阅者。具体来说,则是zk允许一个客户端向服务端注册一个watch监听,当服务端的一些指定事件触发了这个watch,那么就会向该客户端发送事件通知。

    2. 实现剖析

    在剖析其实现前,我们不妨来想一想,如果自己动手实现一个watch机制,该怎么做呢?

    最简单的方法是在client保存当前节点的版本,并去轮询这个节点的状态。如果发现版本变化,则client触发watch。不过比起轮询,不是有更多的好方法,不是吗?

    轮询会给服务器带来不小的压力,或许我们可以考虑采用类似webhook的方式,让server保存和client约定好的地址,当watch的数据节点发生变化时,便通知client。

    想到这儿,其实已经和zk自己的watch实现有点像了。沿着这个思路,我们开始剖析:

    一般我们在使用client时,getDatagetChildrenexist都可以用来向zk注册watcher。其原理都是一样的,我们以exist方法为例子进行剖析——即当关注的节点被创建or删除时,client会收到通知。

    2.1 客户端的实现

    先从客户端部分开始解析——即以我们调用的原生API为入口。

    2.1.1 Zookeeper.exists

        /**
         * The asynchronous version of exists.
         *
         * @see #exists(String, Watcher)
         */
        public void exists(final String path, Watcher watcher,
                StatCallback cb, Object ctx)
        {
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
            // the watch contains the un-chroot path
            WatchRegistration wcb = null;  //1.
            if (watcher != null) {
                wcb = new ExistsWatchRegistration(watcher, clientPath);
            }
    
            final String serverPath = prependChroot(clientPath);
    
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.exists);
            ExistsRequest request = new ExistsRequest();
            request.setPath(serverPath);
            request.setWatch(watcher != null);//3.
            SetDataResponse response = new SetDataResponse();
            cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                    clientPath, serverPath, ctx, wcb); //2.
        }
    

    和watch相关的地方有两处,一个是其被转换成WatchRegistration中的一个属性(见注释标记1),最后变成一个Packet——即client与server的最小通信单元(见注释标记2)。

    另外一处则是request.setWatch(watcher != null),是个布尔变量。(见注释标记3)

    2.1.2 ClientCnxn.queuePacket

    在上文中的最后一段代码是 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,clientPath, serverPath, ctx, wcb);,我们继续看下去。

       public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                Record response, AsyncCallback cb, String clientPath,
                String serverPath, Object ctx, WatchRegistration watchRegistration) {
            return queuePacket(h, r, request, response, cb, clientPath, serverPath,
                    ctx, watchRegistration, null);
        }
    
        public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                Record response, AsyncCallback cb, String clientPath,
                String serverPath, Object ctx, WatchRegistration watchRegistration,
                WatchDeregistration watchDeregistration) {
            Packet packet = null;
    
            // Note that we do not generate the Xid for the packet yet. It is
            // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
            // where the packet is actually sent.
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            packet.watchDeregistration = watchDeregistration;
            // The synchronized block here is for two purpose:
            // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
            // 2. synchronized against each packet. So if a closeSession packet is added,
            // later packet will be notified.
            synchronized (state) {
                if (!state.isAlive() || closing) {
                    conLossPacket(packet);
                } else {
                    // If the client is asking to close the session then
                    // mark as closing
                    if (h.getType() == OpCode.closeSession) {
                        closing = true;
                    }
                    outgoingQueue.add(packet);
                }
            }
            sendThread.getClientCnxnSocket().packetAdded();
            return packet;
        }
    

    这段代码看起来很多,不过仅仅在做一件事——即拼装Packet,并将其加入发送队列。该队列由ClientCnxn中的一个SendThread消费(见SendThread.run)。该方法有较多的条件分支,且不够clean code,故在此不再贴代码,避免扰乱视听。

    从下面的代码可以得知,在Zk的client中,会维护发送队列和等待回复的队列,里面都是一个个Packet。

        /**
         * These are the packets that have been sent and are waiting for a response.
         */
        private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
    
        /**
         * These are the packets that need to be sent.
         */
        private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
    

    接下来,我们查看SendThread.readReponse,即消费client队列中Packet的逻辑:

            void readResponse(ByteBuffer incomingBuffer) throws IOException {
                ByteBufferInputStream bbis = new ByteBufferInputStream(
                        incomingBuffer);
                BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                ReplyHeader replyHdr = new ReplyHeader();
    
                replyHdr.deserialize(bbia, "header");
                if (replyHdr.getXid() == -2) {
                    // -2 is the xid for pings
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got ping response for sessionid: 0x"
                                + Long.toHexString(sessionId)
                                + " after "
                                + ((System.nanoTime() - lastPingSentNs) / 1000000)
                                + "ms");
                    }
                    return;
                }
                if (replyHdr.getXid() == -4) {
                    // -4 is the xid for AuthPacket               
                    if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                        state = States.AUTH_FAILED;                    
                        eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                                Watcher.Event.KeeperState.AuthFailed, null) );
                        eventThread.queueEventOfDeath();
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got auth sessionid:0x"
                                + Long.toHexString(sessionId));
                    }
                    return;
                }
                if (replyHdr.getXid() == -1) {
                    // -1 means notification
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got notification sessionid:0x"
                            + Long.toHexString(sessionId));
                    }
                    WatcherEvent event = new WatcherEvent();
                    event.deserialize(bbia, "response");
    
                    // convert from a server path to a client path
                    if (chrootPath != null) {
                        String serverPath = event.getPath();
                        if(serverPath.compareTo(chrootPath)==0)
                            event.setPath("/");
                        else if (serverPath.length() > chrootPath.length())
                            event.setPath(serverPath.substring(chrootPath.length()));
                        else {
                            LOG.warn("Got server path " + event.getPath()
                                    + " which is too short for chroot path "
                                    + chrootPath);
                        }
                    }
    
                    WatchedEvent we = new WatchedEvent(event);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got " + we + " for sessionid 0x"
                                + Long.toHexString(sessionId));
                    }
    
                    eventThread.queueEvent( we );
                    return;
                }
    
                // If SASL authentication is currently in progress, construct and
                // send a response packet immediately, rather than queuing a
                // response as with other packets.
                if (tunnelAuthInProgress()) {
                    GetSASLRequest request = new GetSASLRequest();
                    request.deserialize(bbia,"token");
                    zooKeeperSaslClient.respondToServer(request.getToken(),
                      ClientCnxn.this);
                    return;
                }
    
                Packet packet;
                synchronized (pendingQueue) {
                    if (pendingQueue.size() == 0) {
                        throw new IOException("Nothing in the queue, but got "
                                + replyHdr.getXid());
                    }
                    packet = pendingQueue.remove();
                }
                /*
                 * Since requests are processed in order, we better get a response
                 * to the first request!
                 */
                try {
                    if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                        packet.replyHeader.setErr(
                                KeeperException.Code.CONNECTIONLOSS.intValue());
                        throw new IOException("Xid out of order. Got Xid "
                                + replyHdr.getXid() + " with err " +
                                + replyHdr.getErr() +
                                " expected Xid "
                                + packet.requestHeader.getXid()
                                + " for a packet with details: "
                                + packet );
                    }
    
                    packet.replyHeader.setXid(replyHdr.getXid());
                    packet.replyHeader.setErr(replyHdr.getErr());
                    packet.replyHeader.setZxid(replyHdr.getZxid());
                    if (replyHdr.getZxid() > 0) {
                        lastZxid = replyHdr.getZxid();
                    }
                    if (packet.response != null && replyHdr.getErr() == 0) {
                        packet.response.deserialize(bbia, "response");
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Reading reply sessionid:0x"
                                + Long.toHexString(sessionId) + ", packet:: " + packet);
                    }
                } finally {
                    finishPacket(packet);
                }
            }
    

    synchronized (pendingQueue)中,我们可以看到从队列中拿出了Packet,并最后将其丢入了finishPacket

        protected void finishPacket(Packet p) {
            int err = p.replyHeader.getErr();
            if (p.watchRegistration != null) {
                p.watchRegistration.register(err);
            }
            // Add all the removed watch events to the event queue, so that the
            // clients will be notified with 'Data/Child WatchRemoved' event type.
            if (p.watchDeregistration != null) {
                Map<EventType, Set<Watcher>> materializedWatchers = null;
                try {
                    materializedWatchers = p.watchDeregistration.unregister(err);
                    for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
                            .entrySet()) {
                        Set<Watcher> watchers = entry.getValue();
                        if (watchers.size() > 0) {
                            queueEvent(p.watchDeregistration.getClientPath(), err,
                                    watchers, entry.getKey());
                            // ignore connectionloss when removing from local
                            // session
                            p.replyHeader.setErr(Code.OK.intValue());
                        }
                    }
                } catch (KeeperException.NoWatcherException nwe) {
                    p.replyHeader.setErr(nwe.code().intValue());
                } catch (KeeperException ke) {
                    p.replyHeader.setErr(ke.code().intValue());
                }
            }
    
            if (p.cb == null) {
                synchronized (p) {
                    p.finished = true;
                    p.notifyAll();
                }
            } else {
                p.finished = true;
                eventThread.queuePacket(p);
            }
        }
    

    这段方法中,我们会分析这两段逻辑:

    • p.watchRegistration.register
    • queueEvent

    2.1.3 watchRegistration

    需要注意的是,WatchRegistration在Packet发送前并不会被序列化发送过去,避免发送不必要的信息,毕竟已经在request中标记为watch了。那么这个WatchRegistration有什么用呢?还好register的逻辑很简单,我们来看一下:

            /**
             * Register the watcher with the set of watches on path.
             * @param rc the result code of the operation that attempted to
             * add the watch on the path.
             */
            public void register(int rc) {
                if (shouldAddWatch(rc)) {
                    Map<String, Set<Watcher>> watches = getWatches(rc);
                    synchronized(watches) {
                        Set<Watcher> watchers = watches.get(clientPath);
                        if (watchers == null) {
                            watchers = new HashSet<Watcher>();
                            watches.put(clientPath, watchers);
                        }
                        watchers.add(watcher);
                    }
                }
            }
    

    2.1.4 queueEvent

    代码比较少,可以看到client维护了一个path-watchers的字典,到这里,相信大多数读者都能猜到实现了——即收到回复时根据相应的path去找对应wacher。接下来来看queueEvent

        void queueEvent(String clientPath, int err,
                Set<Watcher> materializedWatchers, EventType eventType) {
            KeeperState sessionState = KeeperState.SyncConnected;
            if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
                    || KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
                sessionState = Event.KeeperState.Disconnected;
            }
            WatchedEvent event = new WatchedEvent(eventType, sessionState,
                    clientPath);
            eventThread.queueEvent(event, materializedWatchers);
        }
    

    逻辑很简单,判断状态,然后组装event,交给eventThread去做通知。

            private void queueEvent(WatchedEvent event,
                    Set<Watcher> materializedWatchers) {
                if (event.getType() == EventType.None
                        && sessionState == event.getState()) {
                    return;
                }
                sessionState = event.getState();
                final Set<Watcher> watchers;
                if (materializedWatchers == null) {
                    // materialize the watchers based on the event
                    watchers = watcher.materialize(event.getState(),
                            event.getType(), event.getPath());
                } else {
                    watchers = new HashSet<Watcher>();
                    watchers.addAll(materializedWatchers);
                }
                WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
                // queue the pair (watch set & event) for later processing
                waitingEvents.add(pair);
            }
    

    ClientWatchManager.materialize不再展示源码,我们只要知道,在这段逻辑中
    ,当watch被触发后,即会被移除,而状态正是保存在ZkWatchManager里:

       static class ZKWatchManager implements ClientWatchManager {
            private final Map<String, Set<Watcher>> dataWatches =
                new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> existWatches =
                new HashMap<String, Set<Watcher>>();
            private final Map<String, Set<Watcher>> childWatches =
                new HashMap<String, Set<Watcher>>();
    //......
    }
    

    再说回来eventThread.run最后做的事情——即入队。那么我们来看看这个线程的核心方法:

    @Override        
    @SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
            public void run() {
               try {
                  isRunning = true;
                  while (true) {
                     Object event = waitingEvents.take();
                     if (event == eventOfDeath) {
                        wasKilled = true;
                     } else {
                        processEvent(event);
                     }
                     if (wasKilled)
                        synchronized (waitingEvents) {
                           if (waitingEvents.isEmpty()) {
                              isRunning = false;
                              break;
                           }
                        }
                  }
               } catch (InterruptedException e) {
                  LOG.error("Event thread exiting due to interruption", e);
               }
    
                LOG.info("EventThread shut down for session: 0x{}",
                         Long.toHexString(getSessionId()));
            }
    

    又是熟悉的配方熟悉的味道——就是一个死循环去消费队列里的元素,然后我们来看看processEvent:

           private void processEvent(Object event) {
              try {
                  if (event instanceof WatcherSetEventPair) {
                      // each watcher will process the event
                      WatcherSetEventPair pair = (WatcherSetEventPair) event;
                      for (Watcher watcher : pair.watchers) {
                          try {
                              watcher.process(pair.event);
                          } catch (Throwable t) {
                              LOG.error("Error while calling watcher ", t);
                          }
                      }
                    } else if (event instanceof LocalCallback) {
                        //在本文中这些逻辑不重要,skip
                      }
    

    当process被调用后,我们自己编写的逻辑就会被触发。

    看完客户端部分的代码,大家不妨可以思考思考,将一个上层的机制(watch)与底层的通信代码(如finishPacket)显示的写在一起真的好吗?如果让你来写,你会怎么做呢?

    2.2 服务端实现

    在上文,我们了解了client的watch相关实现,接下来,我们就来捋一捋服务端的watch实现。

    我们直接到ZkServer handle request的地方——FinalRequestProcessorprocessRequest中的相关部分:

                case OpCode.exists: {
                    lastOp = "EXIS";
                    // TODO we need to figure out the security requirement for this!
                    ExistsRequest existsRequest = new ExistsRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            existsRequest);
                    String path = existsRequest.getPath();
                    if (path.indexOf('\0') != -1) {
                        throw new KeeperException.BadArgumentsException();
                    }
                    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
                            .getWatch() ? cnxn : null);
                    rsp = new ExistsResponse(stat);
                    break;
                }
    

    可以看到,如果request是要求watch的,那么会将ServerCnxn传递下去,ServerCnxn代表了客户端和服务器之间的连接。这样当数据事件发生时,可以通过连接触发client的watch。

    跳转DataTree.statNode:

        public Stat statNode(String path, Watcher watcher)
                throws KeeperException.NoNodeException {
            Stat stat = new Stat();
            DataNode n = nodes.get(path);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            synchronized (n) {
                n.copyStat(stat);
                return stat;
            }
        }
    

    watcher != null时,则会添加一个watcher当服务端的dataWatches中。接下来,我们来看一下服务端的watch核心类——WatchManager:

    /**
     * This class manages watches. It allows watches to be associated with a string
     * and removes watchers and their watches in addition to managing triggers.
     */
    class WatchManager {
        private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
    
        private final HashMap<String, HashSet<Watcher>> watchTable =
            new HashMap<String, HashSet<Watcher>>();
    
        private final HashMap<Watcher, HashSet<String>> watch2Paths =
            new HashMap<Watcher, HashSet<String>>();
    
        synchronized int size(){
            int result = 0;
            for(Set<Watcher> watches : watchTable.values()) {
                result += watches.size();
            }
            return result;
        }
    
        synchronized void addWatch(String path, Watcher watcher) {
            HashSet<Watcher> list = watchTable.get(path);
            if (list == null) {
                // don't waste memory if there are few watches on a node
                // rehash when the 4th entry is added, doubling size thereafter
                // seems like a good compromise
                list = new HashSet<Watcher>(4);
                watchTable.put(path, list);
            }
            list.add(watcher);
    
            HashSet<String> paths = watch2Paths.get(watcher);
            if (paths == null) {
                // cnxns typically have many watches, so use default cap here
                paths = new HashSet<String>();
                watch2Paths.put(watcher, paths);
            }
            paths.add(path);
        }
    
        synchronized void removeWatcher(Watcher watcher) {
            HashSet<String> paths = watch2Paths.remove(watcher);
            if (paths == null) {
                return;
            }
            for (String p : paths) {
                HashSet<Watcher> list = watchTable.get(p);
                if (list != null) {
                    list.remove(watcher);
                    if (list.size() == 0) {
                        watchTable.remove(p);
                    }
                }
            }
        }
    
        Set<Watcher> triggerWatch(String path, EventType type) {
            return triggerWatch(path, type, null);
        }
    
        Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
            WatchedEvent e = new WatchedEvent(type,
                    KeeperState.SyncConnected, path);
            HashSet<Watcher> watchers;
            synchronized (this) {
                watchers = watchTable.remove(path);
                if (watchers == null || watchers.isEmpty()) {
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                "No watchers for " + path);
                    }
                    return null;
                }
                for (Watcher w : watchers) {
                    HashSet<String> paths = watch2Paths.get(w);
                    if (paths != null) {
                        paths.remove(path);
                    }
                }
            }
            for (Watcher w : watchers) {
                if (supress != null && supress.contains(w)) {
                    continue;
                }
                w.process(e);
            }
            return watchers;
        }
    
        /**
         * Brief description of this object.
         */
        @Override
        public synchronized String toString() {
            StringBuilder sb = new StringBuilder();
    
            sb.append(watch2Paths.size()).append(" connections watching ")
                .append(watchTable.size()).append(" paths\n");
    
            int total = 0;
            for (HashSet<String> paths : watch2Paths.values()) {
                total += paths.size();
            }
            sb.append("Total watches:").append(total);
    
            return sb.toString();
        }
    
        /**
         * String representation of watches. Warning, may be large!
         * @param byPath iff true output watches by paths, otw output
         * watches by connection
         * @return string representation of watches
         */
        synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
            if (byPath) {
                for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
                    pwriter.println(e.getKey());
                    for (Watcher w : e.getValue()) {
                        pwriter.print("\t0x");
                        pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
                        pwriter.print("\n");
                    }
                }
            } else {
                for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) {
                    pwriter.print("0x");
                    pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
                    for (String path : e.getValue()) {
                        pwriter.print("\t");
                        pwriter.println(path);
                    }
                }
            }
        }
    
        /**
         * Checks the specified watcher exists for the given path
         *
         * @param path
         *            znode path
         * @param watcher
         *            watcher object reference
         * @return true if the watcher exists, false otherwise
         */
        synchronized boolean containsWatcher(String path, Watcher watcher) {
            HashSet<String> paths = watch2Paths.get(watcher);
            if (paths == null || !paths.contains(path)) {
                return false;
            }
            return true;
        }
    
        /**
         * Removes the specified watcher for the given path
         *
         * @param path
         *            znode path
         * @param watcher
         *            watcher object reference
         * @return true if the watcher successfully removed, false otherwise
         */
        synchronized boolean removeWatcher(String path, Watcher watcher) {
            HashSet<String> paths = watch2Paths.get(watcher);
            if (paths == null || !paths.remove(path)) {
                return false;
            }
    
            HashSet<Watcher> list = watchTable.get(path);
            if (list == null || !list.remove(watcher)) {
                return false;
            }
    
            if (list.size() == 0) {
                watchTable.remove(path);
            }
    
            return true;
        }
    
        /**
         * Returns a watch report.
         *
         * @return watch report
         * @see WatchesReport
         */
        synchronized WatchesReport getWatches() {
            Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
            for (Entry<Watcher, HashSet<String>> e: watch2Paths.entrySet()) {
                Long id = ((ServerCnxn) e.getKey()).getSessionId();
                HashSet<String> paths = new HashSet<String>(e.getValue());
                id2paths.put(id, paths);
            }
            return new WatchesReport(id2paths);
        }
    
        /**
         * Returns a watch report by path.
         *
         * @return watch report
         * @see WatchesPathReport
         */
        synchronized WatchesPathReport getWatchesByPath() {
            Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
            for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
                Set<Long> ids = new HashSet<Long>(e.getValue().size());
                path2ids.put(e.getKey(), ids);
                for (Watcher watcher : e.getValue()) {
                    ids.add(((ServerCnxn) watcher).getSessionId());
                }
            }
            return new WatchesPathReport(path2ids);
        }
    
        /**
         * Returns a watch summary.
         *
         * @return watch summary
         * @see WatchesSummary
         */
        synchronized WatchesSummary getWatchesSummary() {
            int totalWatches = 0;
            for (HashSet<String> paths : watch2Paths.values()) {
                totalWatches += paths.size();
            }
            return new WatchesSummary (watch2Paths.size(), watchTable.size(),
                                       totalWatches);
        }
    }
    
    

    整个类非常好理解,先看两个核心成员变量:

    • watchTable:path-watchs
    • watch2Paths:watcher-paths

    addWatch就是往两个map中添加数据,而触发便是根据path遍历出那些watcher,并从内存中删除它们,然后调用它们的process——这时ServerCnxn就会发送一个Packet到client。

    那么什么时候触发呢?也很简单。就在DataTree的代码里,对相应数据进行操作时,就会触发watcher。我们以DataTree.setData为例:

        public Stat setData(String path, byte data[], int version, long zxid,
                long time) throws KeeperException.NoNodeException {
            Stat s = new Stat();
            DataNode n = nodes.get(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            byte lastdata[] = null;
            synchronized (n) {
                lastdata = n.data;
                n.data = data;
                n.stat.setMtime(time);
                n.stat.setMzxid(zxid);
                n.stat.setVersion(version);
                n.copyStat(s);
            }
            // now update if the path is in a quota subtree.
            String lastPrefix = getMaxPrefixWithQuota(path);
            if(lastPrefix != null) {
              this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
                  - (lastdata == null ? 0 : lastdata.length));
            }
            // 触发处
            dataWatches.triggerWatch(path, EventType.NodeDataChanged);
            return s;
        }
    

    至此,我们就理清watch在Zk里到底是怎么一回事了。同时,我们也了解watcher的几个特性:

    1. 一次性:无论是client还是server,一旦watcher被触发,zk都会将其移除。这意味着开发者需要反复注册,但是好处也很明显——降低了服务器压力,避免频繁更新的节点一直触发watcher。
    2. 客户端串行执行:客户端回调是一个串行同步的过程,这使得回调是有序的。同样,开发者要注意不要因为一个watcher的逻辑影响整个client回调。
    3. 轻量:client的request是否要watch其实仅仅通过一个boolean来决定,同样的,server的response的watch回调——WatchedEvent也仅仅只有三个属性:
      • 通知状态
      • 事件类型
      • 节点路径

    这种轻量化的设计使得网络开销和服务端内存开销上都是很廉价的。

    3. 小结

    在本文中,我们一起了解了watch的实现机理。简单总结如下:

    1. client在发送请求时候,会将watch的具体状态保存在client中,即存在于等待回复队列中
    2. 标记watch的request到达服务端后,服务端会将这个watcher(包含client的连接属性)以字典的形式保存在内存中
    3. 当watch的数据发生相应变化时,去字典里找出注册的watch,并拿到对应client连接
    4. 根据连接,发送一个通知到client
    5. client从等待回复队列中取出元素,watch的回调被触发

    相关文章

      网友评论

          本文标题:深入浅出Zookeeper源码(四):Watch实现剖析

          本文链接:https://www.haomeiwen.com/subject/fziaphtx.html