美文网首页
从核心源码分析zookeeper-watch机制

从核心源码分析zookeeper-watch机制

作者: DoubleFooker | 来源:发表于2019-10-19 22:39 被阅读0次

    基于3.4.14
    zookeeper提供watch机制实现,客户端对节点数据变化的事件监听。再zk实现的分布式锁,注册中心中都用到这个机制,能够很方便的实现数据的变化通知。
    首先从客户端连接开始,查看zkCli.sh脚本:

    "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
         -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
         org.apache.zookeeper.ZooKeeperMain "$@"
    

    通过命令zkCli.sh -server 127.0.0.1:2181实质启动的是ZooKeeperMain的main方法,只是客户端的启动入口,整个启动过程并不复杂,可以自行查看源码,这里只讲核心部分。
    启动之后实质创建了一个ZooKeeper实例

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                boolean canBeReadOnly, HostProvider aHostProvider,
                ZKClientConfig clientConfig) throws IOException {
            LOG.info("Initiating client connection, connectString=" + connectString
                    + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    
            if (clientConfig == null) {
                clientConfig = new ZKClientConfig();
            }
            this.clientConfig = clientConfig;
            watchManager = defaultWatchManager();
            watchManager.defaultWatcher = watcher;
            ConnectStringParser connectStringParser = new ConnectStringParser(
                    connectString);
            hostProvider = aHostProvider;
    
            cnxn = createConnection(connectStringParser.getChrootPath(),
                    hostProvider, sessionTimeout, this, watchManager,
                    getClientCnxnSocket(), canBeReadOnly);
    // 启动命令发送线程,sendThread.start();以及watch事件线程,eventThread.start()
            cnxn.start();
        }
    

    最终创建的是SendThreadEventThread。我们来看下SendThread负责什么事情.

        class SendThread extends ZooKeeperThread {
            @Override
            public void run() {
               // ...
                while (state.isAlive()) {
                    try {
                     // ...  
                        // 发送命令packet
                        clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                    } catch (Throwable e) {
                       // ...
                    }
                }
               // ...
            }
    

    SendThread循环执行发包的操作,使用的是生产者/消费者模型,我们看下doTransport做了什么,以NIO通信为例

      @Override
        void doTransport(
            // ...
           for (SelectionKey k : selected) {
                SocketChannel sc = ((SocketChannel) k.channel());
                if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                    if (sc.finishConnect()) {
                        updateLastSendAndHeard();
                        updateSocketAddresses();
                        sendThread.primeConnection();
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    // 这里才是执行发包的操作
                    doIO(pendingQueue, cnxn);
                }
            }
          // ...
        }
    

    客户端的操作可以总结为


    image.png

    再看下服务端如何去处理watch任务
    在之前讲过的Zookeeper的leader选举源码中,我们直到zk的启动过程,创建了NIOServerCnxnFactory(以NIO为例)他的主要任务就是负责于客户端的通信。
    看下NIOServerCnxnFactory#start启动了线程,跟踪到最后执行的是NIOServerCnxn#doIO->NIOServerCnxn#readPayload->NIOServerCnxn#readRequest->ZooKeeperServer#processPacket->ZooKeeperServer#submitRequest

    // 责任链模式的处理过程
     public void submitRequest(Request si) {
            if (firstProcessor == null) {
                synchronized (this) {
                    try {
                        // Since all requests are passed to the request
                        // processor it should wait for setting up the request
                        // processor chain. The state will be updated to RUNNING
                        // after the setup.
                        while (state == State.INITIAL) {
                            wait(1000);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption", e);
                    }
                    if (firstProcessor == null || state != State.RUNNING) {
                        throw new RuntimeException("Not started");
                    }
                }
            }
            try {
                touch(si.cnxn);
                boolean validpacket = Request.isValid(si.type);
                if (validpacket) {
                    // 执行责任链调用各个环节
                    firstProcessor.processRequest(si);
                    if (si.cnxn != null) {
                        incInProcess();
                    }
                } else {
                    LOG.warn("Received packet at server of unknown type " + si.type);
                    new UnimplementedRequestProcessor().processRequest(si);
                }
            } catch (MissingSessionException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Dropping request: " + e.getMessage());
                }
            } catch (RequestProcessorException e) {
                LOG.error("Unable to process request:" + e.getMessage(), e);
            }
        }
    

    那么这个责任链是怎么构建的呢,在芙蕖其的启动过程中,调用了NIOServerCnxnFactory#startup方法对责任链进行设置,那么我们看下整个责任链包含的功能。

     protected void setupRequestProcessors() {
            // 最后一个处理器:结果的处理
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            // 第二个处理器:更新数据        
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                    finalProcessor);
            ((SyncRequestProcessor)syncProcessor).start();
            // 第一个处理器:解析命令
            firstProcessor = new PrepRequestProcessor(this, syncProcessor);
            ((PrepRequestProcessor)firstProcessor).start();
        }
    
    

    以create命令为例,跟踪代码最终在FinalRequestProcessor#processRequest->ZooKeeperServer#processTxn->ZKDatabase#processTxn->DataTree#processTxn->DataTree#createNode->dataWatches.triggerWatch 那么可以看到最终的watch事件触发就是在triggerWatch发送

        public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
            WatchedEvent e = new WatchedEvent(type,
                    KeeperState.SyncConnected, path);
            HashSet<Watcher> watchers;
            synchronized (this) {
                // 移除监听事件
                watchers = watchTable.remove(path);
                if (watchers == null || watchers.isEmpty()) {
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                "No watchers for " + path);
                    }
                    return null;
                }
                // 去除监听的路径
                for (Watcher w : watchers) {
                    HashSet<String> paths = watch2Paths.get(w);
                    if (paths != null) {
                        paths.remove(path);
                    }
                }
            }
            // 遍历通知
            for (Watcher w : watchers) {
                if (supress != null && supress.contains(w)) {
                    continue;
                }
                //发送resp
                w.process(e);
            }
            return watchers;
        }
    

    总结服务端的处理过程为


    image.png

    总结

    源码部分整个过程代码较多,总结比较核心的地方看下服务端于客户端watch机制的实现。服务端为了资源的优化,设计成watch只执行一次,把是否重新watch的控制叫个客户端去处理,省去服务端管理保存watch绑定关系的而外负担。

    相关文章

      网友评论

          本文标题:从核心源码分析zookeeper-watch机制

          本文链接:https://www.haomeiwen.com/subject/wdqnpctx.html