美文网首页
Zookeeper NIO处理

Zookeeper NIO处理

作者: huiwq1990 | 来源:发表于2017-06-28 19:32 被阅读0次

    问题

    NIO请求一般需要在请求内容中设置一个唯一的ID,服务器处理完成返回后,根据请求ID找到Future对象,然后进行异步处理。

    你应该知道的RPC原理

    Zookeeper的NIO是怎么处理的?

    顺序一致性:从客户端提交的更新操作是按照先后循序排序的。例如,如果一个客户端将一个znode z赋值为a,然后又将z的值改变成b,那么在这个过程中不会有客户端在z的值变为b后,取到的值是a。

    请求的时候讲packet放到队列,然后

    Client请求返回处理

    image.png

    org.apache.zookeeper.ClientCnxn.SendThread#run

    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
    

    org.apache.zookeeper.ClientCnxnSocketNIO#doTransport

           for (SelectionKey k : selected) {
                SocketChannel sc = ((SocketChannel) k.channel());
                if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                    if (sc.finishConnect()) {
                        updateLastSendAndHeard();
                        sendThread.primeConnection();
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    doIO(pendingQueue, outgoingQueue, cnxn);
                }
            }
    

    org.apache.zookeeper.ClientCnxnSocketNIO#doIO

    // 处理读请求
    
            if (sockKey.isReadable()) {
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from server sessionid 0x"
                                    + Long.toHexString(sessionId)
                                    + ", likely server has closed socket");
                }
                if (!incomingBuffer.hasRemaining()) {
                    incomingBuffer.flip();
                    if (incomingBuffer == lenBuffer) {
                        recvCount++;
                        readLength();
                    } else if (!initialized) {
                        readConnectResult();
                        enableRead();
                        if (findSendablePacket(outgoingQueue,
                                cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                            // Since SASL authentication has completed (if client is configured to do so),
                            // outgoing packets waiting in the outgoingQueue can now be sent.
                            enableWrite();
                        }
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                        initialized = true;
                    } else {
    // 调用sendThread处理返回值
                        sendThread.readResponse(incomingBuffer);
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                    }
                }
            }
    
            void readResponse(ByteBuffer incomingBuffer) throws IOException {
                ByteBufferInputStream bbis = new ByteBufferInputStream(
                        incomingBuffer);
                BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                ReplyHeader replyHdr = new ReplyHeader();
                replyHdr.deserialize(bbia, "header");
    
    // 省略
        Packet packet;
                synchronized (pendingQueue) {
                    if (pendingQueue.size() == 0) {
                        throw new IOException("Nothing in the queue, but got "
                                + replyHdr.getXid());
                    }
    // 出队列
                    packet = pendingQueue.remove();
                }
                /*
    请求是按照顺序处理的
                 * Since requests are processed in order, we better get a response
                 * to the first request!
                 */
                try {
                    if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                        packet.replyHeader.setErr(
                                KeeperException.Code.CONNECTIONLOSS.intValue());
                        throw new IOException("Xid out of order. Got Xid "
                                + replyHdr.getXid() + " with err " +
                                + replyHdr.getErr() +
                                " expected Xid "
                                + packet.requestHeader.getXid()
                                + " for a packet with details: "
                                + packet );
                    }
    
                    packet.replyHeader.setXid(replyHdr.getXid());
                    packet.replyHeader.setErr(replyHdr.getErr());
                    packet.replyHeader.setZxid(replyHdr.getZxid());
                    if (replyHdr.getZxid() > 0) {
                        lastZxid = replyHdr.getZxid();
                    }
                    if (packet.response != null && replyHdr.getErr() == 0) {
                        packet.response.deserialize(bbia, "response");
                    }
    
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Reading reply sessionid:0x"
                                + Long.toHexString(sessionId) + ", packet:: " + packet);
                    }
                } finally {
    // packet后续处理
                    finishPacket(packet);
                }
    }
    
        private void finishPacket(Packet p) {
            if (p.watchRegistration != null) {
                p.watchRegistration.register(p.replyHeader.getErr());
            }
    
            if (p.cb == null) {
                synchronized (p) {
                    p.finished = true;
                    p.notifyAll();
                }
            } else {
                p.finished = true;
                eventThread.queuePacket(p);
            }
        }
    

    参考

    http://www.cnblogs.com/LBSer/p/4853234.html
    zk通信机制
    http://blog.csdn.net/yangbutao/article/details/9719389

    相关文章

      网友评论

          本文标题:Zookeeper NIO处理

          本文链接:https://www.haomeiwen.com/subject/xbxpcxtx.html