美文网首页
zookeeper watcher机制

zookeeper watcher机制

作者: 海蟾子_null | 来源:发表于2018-05-17 17:21 被阅读0次

    写在前面

    分布式系统就是多个进程协同工作,干好一件事。进程间协调工作,当前我们一般都用zookeeper或者类似的替代品。其中有一个很重要的机制就是watcher机制。为了了解zookeeper的watcher机制,笔者就花了点时间撸了下zookeeper的源码。
    下面就记录一下走读的过程。

    代码走读

    watcher机制的整个流程

    使用zk的watcher需要客户端向服务端注册watcher。流程为:


    watcher.png

    zk客户端向服务端注册watcher,并将watcher存放到自己的ZKWatchManager里。服务端有变化时,判断当前节点是否注册了watcher,注册了,则发送WatchedEvent给客户端。客户端通过发送过来的path找到watcher,进行相应的操作。

    客户端注册watcher源码走读

    阅读Zookeeper.java类发现,只有通过exists,getChildren,getData 三个操作才能注册watcher。


    zookeeper类 注册watcher方法

    以getData操作为例来走读注册过程。
    先上代码:

     public void getData(final String path, Watcher watcher,
                DataCallback cb, Object ctx)
        {
            final String clientPath = path;
            PathUtils.validatePath(clientPath);
    
            // the watch contains the un-chroot path
            WatchRegistration wcb = null;
            if (watcher != null) {
              //包装watcher
                wcb = new DataWatchRegistration(watcher, clientPath);
            }
    
            final String serverPath = prependChroot(clientPath);
    
            RequestHeader h = new RequestHeader();
            h.setType(ZooDefs.OpCode.getData);
            GetDataRequest request = new GetDataRequest();
            request.setPath(serverPath);
            request.setWatch(watcher != null);
            GetDataResponse response = new GetDataResponse();
          // 发送请求到服务端
            cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                    clientPath, serverPath, ctx, wcb);
        }
    
    Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                Record response, AsyncCallback cb, String clientPath,
                String serverPath, Object ctx, WatchRegistration watchRegistration)
        {
            Packet packet = null;
            synchronized (outgoingQueue) {
                packet = new Packet(h, r, request, response, watchRegistration);
                packet.cb = cb;
                packet.ctx = ctx;
                packet.clientPath = clientPath;
                packet.serverPath = serverPath;
                if (!state.isAlive() || closing) {
                    conLossPacket(packet);
                } else {
                    if (h.getType() == OpCode.closeSession) {
                        closing = true;
                    }
                    //将请求放入待发送队列里
                    outgoingQueue.add(packet);
                }
            }
            //唤醒发送线程,发送请求到服务端
            sendThread.getClientCnxnSocket().wakeupCnxn();
            return packet;
        }
    

    这里看下sendThread发送线程做了啥操作。
    在SendThread的run方法里正在去处理发送的是clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);这句。

     void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                         ClientCnxn cnxn)
                throws IOException, InterruptedException {
            selector.select(waitTimeOut);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            // Everything below and until we get back to the select is
            // non blocking, so time is effectively a constant. That is
            // Why we just have to do this once, here
            updateNow();
            for (SelectionKey k : selected) {
                SocketChannel sc = ((SocketChannel) k.channel());
                if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                    if (sc.finishConnect()) {
                        updateLastSendAndHeard();
                        sendThread.primeConnection();
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    doIO(pendingQueue, outgoingQueue, cnxn);
                }
            }
            if (sendThread.getZkState().isConnected()) {
                synchronized(outgoingQueue) {
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        enableWrite();
                    }
                }
            }
            selected.clear();
        }
    

    上面一段代码就是用多路复用的方式发送请求到服务端了。
    doIO 里处理write请求的代码片段如下:

    if (sockKey.isWritable()) {
                synchronized(outgoingQueue) {
                    Packet p = findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress());
    
                    if (p != null) {
                        updateLastSend();
                        // If we already started writing p, p.bb will already exist
                        if (p.bb == null) {
                            if ((p.requestHeader != null) &&
                                    (p.requestHeader.getType() != OpCode.ping) &&
                                    (p.requestHeader.getType() != OpCode.auth)) {
                                p.requestHeader.setXid(cnxn.getXid());
                            }
                            p.createBB();
                        }
                        sock.write(p.bb);
                        if (!p.bb.hasRemaining()) {
                            sentCount++;
                            outgoingQueue.removeFirstOccurrence(p);
                            if (p.requestHeader != null
                                    && p.requestHeader.getType() != OpCode.ping
                                    && p.requestHeader.getType() != OpCode.auth) {
                                synchronized (pendingQueue) {
                                    pendingQueue.add(p);
                                }
                            }
                        }
                    }
    

    到这里,注册流程中第一步完成了。客户端向服务端注册完成后,服务端返回结果给客户端。这里就是相反的路径了。从channel里读取流,然后反序列化成watchEvent,并通知客户端,经watcher放到客户的ZkWatchmanager里。
    首先是读取流。

    //读事件准备就绪,从通道里读取数据。
    if (sockKey.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from server sessionid 0x"
                                    + Long.toHexString(sessionId)
                                    + ", likely server has closed socket");
                }
            //此处省略不影响分析流程的代码
                    } else {
                      //这里进行响应的处理
                        sendThread.readResponse(incomingBuffer);
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                    }
                }
            }
    

    readResponse的代码片段如下:

    void readResponse(ByteBuffer incomingBuffer) throws IOException {
                ByteBufferInputStream bbis = new ByteBufferInputStream(
                        incomingBuffer);
                BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                ReplyHeader replyHdr = new ReplyHeader();
    
                replyHdr.deserialize(bbia, "header");
          // ping操作的响应
                if (replyHdr.getXid() == -2) {
                    // -2 is the xid for pings
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got ping response for sessionid: 0x"
                                + Long.toHexString(sessionId)
                                + " after "
                                + ((System.nanoTime() - lastPingSentNs) / 1000000)
                                + "ms");
                    }
                    return;
                }
    // 表示一个鉴权的响应
                if (replyHdr.getXid() == -4) {
                    // -4 is the xid for AuthPacket               
                    if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                        state = States.AUTH_FAILED;                    
                        eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                                Watcher.Event.KeeperState.AuthFailed, null) );                                      
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got auth sessionid:0x"
                                + Long.toHexString(sessionId));
                    }
                    return;
                }
    // 表示服务端有变化的响应
                if (replyHdr.getXid() == -1) {
                    // -1 means notification
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got notification sessionid:0x"
                            + Long.toHexString(sessionId));
                    }
                    WatcherEvent event = new WatcherEvent();
    // 反序列化event,通过deserialize方法看以看出,服务端只返回了event的类型,状态及变化的节点路径path
                    event.deserialize(bbia, "response");
    
                    // convert from a server path to a client path
                    if (chrootPath != null) {
                        String serverPath = event.getPath();
                        if(serverPath.compareTo(chrootPath)==0)
                            event.setPath("/");
                        else if (serverPath.length() > chrootPath.length())
                            event.setPath(serverPath.substring(chrootPath.length()));
                        else {
                            LOG.warn("Got server path " + event.getPath()
                                    + " which is too short for chroot path "
                                    + chrootPath);
                        }
                    }
    
                    WatchedEvent we = new WatchedEvent(event);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Got " + we + " for sessionid 0x"
                                + Long.toHexString(sessionId));
                    }
    //放到待处理的队列里,通过path找到对应的watcher进行回调处理。
                    eventThread.queueEvent( we );
                    return;
                }
                } finally {
              //注册
                    finishPacket(packet);
                }
            }
    

    注册代码

      private void finishPacket(Packet p) {
            if (p.watchRegistration != null) {
            // 注册
                p.watchRegistration.register(p.replyHeader.getErr());
            }
    //...
    
      public void register(int rc) {
                if (shouldAddWatch(rc)) {
          //获取ZKWatchManager中的watcher缓存
                    Map<String, Set<Watcher>> watches = getWatches(rc);
                    synchronized(watches) {
                        Set<Watcher> watchers = watches.get(clientPath);
                        if (watchers == null) {
                            watchers = new HashSet<Watcher>();
                            watches.put(clientPath, watchers);
                        }
                        watchers.add(watcher);
                    }
                }
            }
    

    从以上代码中可以看出, 客户端在定义 watcher 之后会将其与 path 绑定添加到 ZKWatchManager.dataWatches; 从而完成 watcher 的注册。

    watcher在服务端注册代码走读

    客户端通过doIo操作向服务端发送注册请求。服务端在接受到请求后代码如下:
    NIOServerCnxnFactory.java 主要就是接受客户端请求。代码如下:

     public void run() {
    
                    for (SelectionKey k : selectedList) {
                   // .... 如果通道准备就绪,且可以读或写
                        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                            NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                            //开始处理网络的读写请求
                            c.doIO(k);
                        } else {
                        。。。。
        }
    

    只有当服务端的通道读准备就绪时,才能接受来自客户端的注册请求。
    走读NIOServerCnxn的doIO代码:

    void doIO(SelectionKey k) throws InterruptedException {
            try {
                if (isSocketOpen() == false) {
                    LOG.warn("trying to do i/o on a null socket for session:0x"
                             + Long.toHexString(sessionId));
    
                    return;
                }
                if (k.isReadable()) {
                    int rc = sock.read(incomingBuffer);
                    if (rc < 0) {
                        throw new EndOfStreamException(
                                "Unable to read additional data from client sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely client has closed socket");
                    }
                    if (incomingBuffer.remaining() == 0) {
                        boolean isPayload;
                        if (incomingBuffer == lenBuffer) { // start of next request
                            incomingBuffer.flip();
                            isPayload = readLength(k);
                            incomingBuffer.clear();
                        } else {
                            // continuation
                            isPayload = true;
                        }
                        if (isPayload) { // not the case for 4letterword
    //准备就绪,开始处理。
                            readPayload();
                        }
                        else {
                            // four letter words take care
                            // need not do anything else
                            return;
                        }
                    }
                }
    //...省略
    
     private void readPayload() throws IOException, InterruptedException {
         // 读request
                    readRequest();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
            }
        }
    

    最后会调用到zookeeperServer类里的submitRequest方法。这里会调用FinalRequestProcessor的processRequest。
    本文以getData为例走读watcher的注册流程,摘录processRequest的getData的处理。

       case OpCode.getData: {
                    lastOp = "GETD";
                    GetDataRequest getDataRequest = new GetDataRequest();
                    ByteBufferInputStream.byteBuffer2Record(request.request,
                            getDataRequest);
                    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                    if (n == null) {
                        throw new KeeperException.NoNodeException();
                    }
                    Long aclL;
                    synchronized(n) {
                        aclL = n.acl;
                    }
                    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                            ZooDefs.Perms.READ,
                            request.authInfo);
                    Stat stat = new Stat();
                    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                            getDataRequest.getWatch() ? cnxn : null);
                    rsp = new GetDataResponse(b, stat);
                    break;
    

    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);中是注册watcher操作。进一步跟进。

    public byte[] getData(String path, Stat stat, Watcher watcher)
                throws KeeperException.NoNodeException {
            DataNode n = nodes.get(path);
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            synchronized (n) {
                n.copyStat(stat);
                if (watcher != null) {
    //watcher不为空,则注册watcher
                    dataWatches.addWatch(path, watcher);
                }
                return n.data;
            }
        }
    
    public synchronized void addWatch(String path, Watcher watcher) {
            HashSet<Watcher> list = watchTable.get(path);
            if (list == null) {
                list = new HashSet<Watcher>(4);
                watchTable.put(path, list);
            }
            list.add(watcher);
    
            HashSet<String> paths = watch2Paths.get(watcher);
            if (paths == null) {
                // cnxns typically have many watches, so use default cap here
                paths = new HashSet<String>();
                watch2Paths.put(watcher, paths);
            }
            paths.add(path);
        }
    

    上面代码表示watcher注册时,放入到WatchManager类里的watchTable和watch2Paths里。watchTable通过path索引watcher,watch2Paths表示watcher对应的path。
    到此,服务端的watcher注册就完成了。通过流程图来总结一下:


    watcher服务端注册时序图

    至此,watcher注册流程代码就走读完成了。

    watcher触发

    通过一个时序图来看流程。具体的代码之前已经有所涉及。


    watcher触发过程

    到此watcher的相关代码应经全部走读完成。

    相关文章

      网友评论

          本文标题:zookeeper watcher机制

          本文链接:https://www.haomeiwen.com/subject/lcljdftx.html