美文网首页
从核心源码分析zookeeper-watch机制

从核心源码分析zookeeper-watch机制

作者: DoubleFooker | 来源:发表于2019-10-19 22:39 被阅读0次

基于3.4.14
zookeeper提供watch机制实现,客户端对节点数据变化的事件监听。再zk实现的分布式锁,注册中心中都用到这个机制,能够很方便的实现数据的变化通知。
首先从客户端连接开始,查看zkCli.sh脚本:

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
     org.apache.zookeeper.ZooKeeperMain "$@"

通过命令zkCli.sh -server 127.0.0.1:2181实质启动的是ZooKeeperMain的main方法,只是客户端的启动入口,整个启动过程并不复杂,可以自行查看源码,这里只讲核心部分。
启动之后实质创建了一个ZooKeeper实例

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider,
            ZKClientConfig clientConfig) throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;

        cnxn = createConnection(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
// 启动命令发送线程,sendThread.start();以及watch事件线程,eventThread.start()
        cnxn.start();
    }

最终创建的是SendThreadEventThread。我们来看下SendThread负责什么事情.

    class SendThread extends ZooKeeperThread {
        @Override
        public void run() {
           // ...
            while (state.isAlive()) {
                try {
                 // ...  
                    // 发送命令packet
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                   // ...
                }
            }
           // ...
        }

SendThread循环执行发包的操作,使用的是生产者/消费者模型,我们看下doTransport做了什么,以NIO通信为例

  @Override
    void doTransport(
        // ...
       for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                // 这里才是执行发包的操作
                doIO(pendingQueue, cnxn);
            }
        }
      // ...
    }

客户端的操作可以总结为


image.png

再看下服务端如何去处理watch任务
在之前讲过的Zookeeper的leader选举源码中,我们直到zk的启动过程,创建了NIOServerCnxnFactory(以NIO为例)他的主要任务就是负责于客户端的通信。
看下NIOServerCnxnFactory#start启动了线程,跟踪到最后执行的是NIOServerCnxn#doIO->NIOServerCnxn#readPayload->NIOServerCnxn#readRequest->ZooKeeperServer#processPacket->ZooKeeperServer#submitRequest

// 责任链模式的处理过程
 public void submitRequest(Request si) {
        if (firstProcessor == null) {
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                // 执行责任链调用各个环节
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

那么这个责任链是怎么构建的呢,在芙蕖其的启动过程中,调用了NIOServerCnxnFactory#startup方法对责任链进行设置,那么我们看下整个责任链包含的功能。

 protected void setupRequestProcessors() {
        // 最后一个处理器:结果的处理
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        // 第二个处理器:更新数据        
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        // 第一个处理器:解析命令
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

以create命令为例,跟踪代码最终在FinalRequestProcessor#processRequest->ZooKeeperServer#processTxn->ZKDatabase#processTxn->DataTree#processTxn->DataTree#createNode->dataWatches.triggerWatch 那么可以看到最终的watch事件触发就是在triggerWatch发送

    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            // 移除监听事件
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            // 去除监听的路径
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        // 遍历通知
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            //发送resp
            w.process(e);
        }
        return watchers;
    }

总结服务端的处理过程为


image.png

总结

源码部分整个过程代码较多,总结比较核心的地方看下服务端于客户端watch机制的实现。服务端为了资源的优化,设计成watch只执行一次,把是否重新watch的控制叫个客户端去处理,省去服务端管理保存watch绑定关系的而外负担。

相关文章

网友评论

      本文标题:从核心源码分析zookeeper-watch机制

      本文链接:https://www.haomeiwen.com/subject/wdqnpctx.html