美文网首页
zk源码阅读33:Server与Client的网络I/O(二):

zk源码阅读33:Server与Client的网络I/O(二):

作者: 赤子心_d709 | 来源:发表于2017-08-13 14:17 被阅读223次

    摘要

    本节讲解ServerCnxn的NIO实现方式,也就是NIOServerCnxn(NettyServerCnxn就不讲了)
    NIOServerCnxn继承了ServerCnxn抽象类,用NIO来处理与客户端之间的通信,单线程处理。

    主要讲解

    内部类
      SendBufferWriter定义一些Writer的实现,来完成cmd处理时的一些输出
      CommandThread完成不同cmd的处理
    属性
    函数
      构造函数
      IO相关
        doIO核心函数
        读相关
          readLength读取四个字节,判断是否是payload这种请求
          checkFourLetterWord看四个字节和cmd对应的是否匹配,是的话完成printWriter对应的写操作
          readPayload读取len后面对应大小的buffer进行处理
          readConnectRequest读取连接请求
          readRequest读取非连接的请求
        写相关
          sendCloseSession发送close的buffer
          sendBufferSync同步发送
          sendBuffer普通发送
      继承父类serverCnxn的函数
      实现接口Watcher相关函数
    

    相关介绍

    client发送请求的类型

    源码中,体现为两种

    payload:翻译是"有效载荷",代表的是前面四个字节的int代表len,后续有对应len长度的buffer
    非payload:前面四个字节int,对应ServerCnxn中定义的不同cmd,如“conf”(对应int 1668247142),“cons”(对应 1668247155)等,不同的cmd后续可能跟着不同的,特定长度的buffer
    

    内部类

    NIOServerCnxn内部类

    SendBufferWriter

    该类用来将给客户端的响应进行分块
    避免response太大,没有写完而一直占用空间,因此对response分块

        /**
         * This class wraps the sendBuffer method of NIOServerCnxn. It is
         * responsible for chunking up the response to a client. Rather
         * than cons'ing up a response fully in memory, which may be large
         * for some commands, this class chunks up the result.
         */
        private class SendBufferWriter extends Writer {
            private StringBuffer sb = new StringBuffer();
            
            /**
             * Check if we are ready to send another chunk.
             * @param force force sending, even if not a full chunk
             */
            private void checkFlush(boolean force) {
                if ((force && sb.length() > 0) || sb.length() > 2048) {// 当强制发送并且sb大小大于0,或者sb大小大于2048即发送缓存
                    sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));//sockChannel完成对应的发送
                    // clear our internal buffer
                    sb.setLength(0);
                }
            }
    
            @Override
            public void close() throws IOException {
                if (sb == null) return;
                checkFlush(true);// 关闭之前需要强制性发送缓存
                sb = null; // clear out the ref to ensure no reuse
            }
    
            @Override
            public void flush() throws IOException {
                checkFlush(true);//强制发送
            }
    
            @Override
            public void write(char[] cbuf, int off, int len) throws IOException {
                sb.append(cbuf, off, len);//sb写入内容
                checkFlush(false);//非强制发送
            }
        }
    

    CommandThread

    用于处理ServerCnxn中的定义的命令,如"ruok","stmk".其主要逻辑定义在commandRun方法中,在子类中各自实现,每个命令使用单独的线程进行处理。

        /**
         * Set of threads for commmand ports. All the 4
         * letter commands are run via a thread. Each class
         * maps to a corresponding 4 letter command. CommandThread
         * is the abstract class from which all the others inherit.
         */
        private abstract class CommandThread extends Thread {
            PrintWriter pw;
            
            CommandThread(PrintWriter pw) {
                this.pw = pw;
            }
            
            public void run() {
                try {
                    commandRun();
                } catch (IOException ie) {
                    LOG.error("Error in running command ", ie);
                } finally {
                    cleanupWriterSocket(pw);
                }
            }
            
            public abstract void commandRun() throws IOException;
        }
    

    它有很多个子类,这里只列举一个DumpCommand,感受一下即可,实现commandRun,完成对应操作,写入PrintWriter就行。

        private class DumpCommand extends CommandThread {
            public DumpCommand(PrintWriter pw) {
                super(pw);
            }
            
            @Override
            public void commandRun() {
                if (zkServer == null) {
                    pw.println(ZK_NOT_SERVING);
                }
                else {
                    pw.println("SessionTracker dump:");
                    zkServer.sessionTracker.dumpSessions(pw);
                    pw.println("ephemeral nodes dump:");
                    zkServer.dumpEphemerals(pw);
                }
            }
        }
    

    属性

        static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);
    
        NIOServerCnxnFactory factory;// ServerCnxn工厂
    
        final SocketChannel sock;// 针对面向流的连接套接字的可选择通道
    
        private final SelectionKey sk;//对于ACCEPT,CONNECT,READ,WRITE感兴趣的key组合
    
        boolean initialized;
    
        ByteBuffer lenBuffer = ByteBuffer.allocate(4);// 分配四个字节缓冲区,用于读取len长度,空间大小绝对不会变
    
        ByteBuffer incomingBuffer = lenBuffer;//读取输入流,会根据读取到的len再分配对应的长度
    
        LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();//输入的缓冲队列
    
        int sessionTimeout;//会话超时时间
    
        private final ZooKeeperServer zkServer;//zkServer服务器
    
        /**
         * The number of requests that have been submitted but not yet responded to.
         */
        int outstandingRequests;//已提交但是尚未回复的请求数
    
        /**
         * This is the id that uniquely identifies the session of a client. Once
         * this session is no longer active, the ephemeral nodes will go away.
         */
        long sessionId;// 会话ID
    
        static long nextSessionId = 1;// 下个会话ID
        int outstandingLimit = 1;//默认的,能够容忍的 已提交但是尚未回复的请求数,后面会重新赋值
    
        private static final String ZK_NOT_SERVING =
            "This ZooKeeper instance is not currently serving requests";
        
        private final static byte fourBytes[] = new byte[4];
    

    函数

    构造函数

        public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
                SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
            this.zkServer = zk;//zk服务器
            this.sock = sock;
            this.sk = sk;//对应的感兴趣的key
            this.factory = factory;//工厂方法
            if (this.factory.login != null) {
                this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
            }
            if (zk != null) { 
                outstandingLimit = zk.getGlobalOutstandingLimit();//获取能够容忍的 接收到但是还没有回复的请求的数量
            }
            sock.socket().setTcpNoDelay(true);
            /* set socket linger to false, so that socket close does not
             * block */
            sock.socket().setSoLinger(false, -1);
            InetAddress addr = ((InetSocketAddress) sock.socket()
                    .getRemoteSocketAddress()).getAddress();
            authInfo.add(new Id("ip", addr.getHostAddress()));
            sk.interestOps(SelectionKey.OP_READ);//初始对READ感兴趣(之前已经连接上了,即处理过ACCEPT了)
        }
    

    对Socket通道进行相应设置,如设置TCP连接无延迟、获取客户端的IP地址并将此信息进行记录
    最后设置SelectionKey感兴趣的操作类型为READ,准备读取后续消息

    发送buffer相关

    IO相关

    核心函数 doIO

        void doIO(SelectionKey k) throws InterruptedException {
            try {
                if (isSocketOpen() == false) {
                    LOG.warn("trying to do i/o on a null socket for session:0x"
                             + Long.toHexString(sessionId));
    
                    return;
                }
                if (k.isReadable()) {
                    int rc = sock.read(incomingBuffer);
                    if (rc < 0) {
                        throw new EndOfStreamException(
                                "Unable to read additional data from client sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely client has closed socket");
                    }
                    if (incomingBuffer.remaining() == 0) {
                        boolean isPayload;
                        if (incomingBuffer == lenBuffer) { // start of next request
                            incomingBuffer.flip();
                            isPayload = readLength(k);//读取len,判断是否是payload
                            incomingBuffer.clear();
                        } else {
                            // continuation
                            isPayload = true;
                        }
                        if (isPayload) { // not the case for 4letterword
                            readPayload();
                        }
                        else {
                            // four letter words take care
                            // need not do anything else
                            return;
                        }
                    }
                }
                if (k.isWritable()) {//如果可写
                    // ZooLog.logTraceMessage(LOG,
                    // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
                    // "outgoingBuffers.size() = " +
                    // outgoingBuffers.size());
                    if (outgoingBuffers.size() > 0) {//发送队列不为空
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
                        // "sk " + k + " is valid: " +
                        // k.isValid());
    
                        /*
                         * This is going to reset the buffer position to 0 and the
                         * limit to the size of the buffer, so that we can fill it
                         * with data from the non-direct buffers that we need to
                         * send.
                         */
                        ByteBuffer directBuffer = factory.directBuffer;
                        directBuffer.clear();
    
                        for (ByteBuffer b : outgoingBuffers) {
                            if (directBuffer.remaining() < b.remaining()) {//如果分配空间剩余不够,就进行分片
                                /*
                                 * When we call put later, if the directBuffer is to
                                 * small to hold everything, nothing will be copied,
                                 * so we've got to slice the buffer if it's too big.
                                 */
                                b = (ByteBuffer) b.slice().limit(
                                        directBuffer.remaining());
                            }
                            /*
                             * put() is going to modify the positions of both
                             * buffers, put we don't want to change the position of
                             * the source buffers (we'll do that after the send, if
                             * needed), so we save and reset the position after the
                             * copy
                             */
                            int p = b.position();
                            directBuffer.put(b);
                            b.position(p);
                            if (directBuffer.remaining() == 0) {//写满一个buffer
                                break;
                            }
                        }
                        /*
                         * Do the flip: limit becomes position, position gets set to
                         * 0. This sets us up for the write.
                         */
                        directBuffer.flip();
    
                        int sent = sock.write(directBuffer);//写入socket
                        ByteBuffer bb;
    
                        // Remove the buffers that we have sent
                        while (outgoingBuffers.size() > 0) {
                            bb = outgoingBuffers.peek();
                            if (bb == ServerCnxnFactory.closeConn) {
                                throw new CloseRequestException("close requested");
                            }
                            int left = bb.remaining() - sent;
                            if (left > 0) {//如果还有内容没有发送
                                /*
                                 * We only partially sent this buffer, so we update
                                 * the position and exit the loop.
                                 */
                                bb.position(bb.position() + sent);
                                break;
                            }
                            packetSent();//更新统计数据
                            /* We've sent the whole buffer, so drop the buffer */
                            sent -= bb.remaining();
                            outgoingBuffers.remove();
                        }
                        // ZooLog.logTraceMessage(LOG,
                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
                        // outgoingBuffers.size() = " + outgoingBuffers.size());
                    }
    
                    synchronized(this.factory){
                        if (outgoingBuffers.size() == 0) {
                            if (!initialized
                                    && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
                                throw new CloseRequestException("responded to info probe");
                            }
                            sk.interestOps(sk.interestOps()
                                    & (~SelectionKey.OP_WRITE));
                        } else {
                            sk.interestOps(sk.interestOps()
                                    | SelectionKey.OP_WRITE);
                        }
                    }
                }
            } catch (CancelledKeyException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("CancelledKeyException stack trace", e);
                }
                close();
            } catch (CloseRequestException e) {
                // expecting close to log session closure
                close();
            } catch (EndOfStreamException e) {
                LOG.warn("caught end of stream exception",e); // tell user why
    
                // expecting close to log session closure
                close();
            } catch (IOException e) {
                LOG.warn("Exception causing close of session 0x"
                        + Long.toHexString(sessionId)
                        + " due to " + e);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("IOException stack trace", e);
                }
                close();
            }
        }
    

    主要逻辑就是

    1.可读,那么根据最开始4个字节的int,判断是否是payload
      如果不是的话,代表是cmd,调用对应的处理函数写进printWriter
      如果是的话,分配对应len的空间给buffer,读取后续请求内容
    2.如果可写
      那么遍历outgoingBuffers,然后每次写满一个64k的buffer空间(可能有分片的操作),然后进行发送
    

    读取相关

    在上面doIO中,调用到了如下函数

    readLength

    //读取前4个字节代表int,如果还没有初始化,并且int值是特定cmd对应的int,那么就当成是cmd,否则给incomingBuffer分配对应len的空间

        private boolean readLength(SelectionKey k) throws IOException {
            // Read the length, now get the buffer
            int len = lenBuffer.getInt();
            if (!initialized && checkFourLetterWord(sk, len)) {//如果没有初始化,并且是cmd的话,就写对应的printWriter回复
                return false;
            }
            if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                throw new IOException("Len error " + len);
            }
            if (zkServer == null) {
                throw new IOException("ZooKeeperServer not running");
            }
            incomingBuffer = ByteBuffer.allocate(len);//分配对应len的空间
            return true;
        }
    
    checkFourLetterWord

    验证int值是否对应特定的cmd,是的话写入对应回复到PrintWriter

        private boolean checkFourLetterWord(final SelectionKey k, final int len)
        throws IOException
        {
            // We take advantage of the limited size of the length to look
            // for cmds. They are all 4-bytes which fits inside of an int
            String cmd = cmd2String.get(len);
            if (cmd == null) {
                return false;
            }
            LOG.info("Processing " + cmd + " command from "
                    + sock.socket().getRemoteSocketAddress());
            packetReceived();
    
            /** cancel the selection key to remove the socket handling
             * from selector. This is to prevent netcat problem wherein
             * netcat immediately closes the sending side after sending the
             * commands and still keeps the receiving channel open. 
             * The idea is to remove the selectionkey from the selector
             * so that the selector does not notice the closed read on the
             * socket channel and keep the socket alive to write the data to
             * and makes sure to close the socket after its done writing the data
             */
            if (k != null) {
                try {
                    k.cancel();
                } catch(Exception e) {
                    LOG.error("Error cancelling command selection key ", e);
                }
            }
    
            final PrintWriter pwriter = new PrintWriter(
                    new BufferedWriter(new SendBufferWriter()));
            if (len == ruokCmd) {
                RuokCommand ruok = new RuokCommand(pwriter);
                ruok.start();
                return true;
            } else if (len == getTraceMaskCmd) {
                TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
                tmask.start();
                return true;
            } else if (len == setTraceMaskCmd) {
                incomingBuffer = ByteBuffer.allocate(8);
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new IOException("Read error");
                }
    
                incomingBuffer.flip();
                long traceMask = incomingBuffer.getLong();
                ZooTrace.setTextTraceLevel(traceMask);
                SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
                setMask.start();
                return true;
            } else if (len == enviCmd) {
                EnvCommand env = new EnvCommand(pwriter);
                env.start();
                return true;
            } else if (len == confCmd) {
                ConfCommand ccmd = new ConfCommand(pwriter);
                ccmd.start();
                return true;
            } else if (len == srstCmd) {
                StatResetCommand strst = new StatResetCommand(pwriter);
                strst.start();
                return true;
            } else if (len == crstCmd) {
                CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
                crst.start();
                return true;
            } else if (len == dumpCmd) {
                DumpCommand dump = new DumpCommand(pwriter);
                dump.start();
                return true;
            } else if (len == statCmd || len == srvrCmd) {
                StatCommand stat = new StatCommand(pwriter, len);
                stat.start();
                return true;
            } else if (len == consCmd) {
                ConsCommand cons = new ConsCommand(pwriter);
                cons.start();
                return true;
            } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
                WatchCommand wcmd = new WatchCommand(pwriter, len);
                wcmd.start();
                return true;
            } else if (len == mntrCmd) {
                MonitorCommand mntr = new MonitorCommand(pwriter);
                mntr.start();
                return true;
            } else if (len == isroCmd) {
                IsroCommand isro = new IsroCommand(pwriter);
                isro.start();
                return true;
            }
            return false;
        }
    
    readPayload

    读取payload请求,即非cmd的请求

        private void readPayload() throws IOException, InterruptedException {//读取payload的请求
            if (incomingBuffer.remaining() != 0) { // have we read length bytes?
                int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
            }
    
            if (incomingBuffer.remaining() == 0) { // have we read length bytes?
                packetReceived();//更新统计数据
                incomingBuffer.flip();
                if (!initialized) {
                    readConnectRequest();//读取连接请求
                } else {
                    readRequest();//读取请求
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;//还原成只读4个字节的byte
            }
        }
    
    readConnectRequest

    读取连接请求,调用ZooKeeperServer相关逻辑

        private void readConnectRequest() throws IOException, InterruptedException {
            if (zkServer == null) {
                throw new IOException("ZooKeeperServer not running");
            }
            zkServer.processConnectRequest(this, incomingBuffer);
            initialized = true;//标记初始化完成
        }
    
    readRequest

    读取非连接的请求

        private void readRequest() throws IOException {
            zkServer.processPacket(this, incomingBuffer);
        }
    

    发送相关

    doIO里面并没有直接利用发送相关的函数

    sendCloseSession

    发送"关闭"的buffer

        /**
         * send buffer without using the asynchronous
         * calls to selector and then close the socket
         * @param bb
         */
        public void sendCloseSession() {
            sendBuffer(ServerCnxnFactory.closeConn);
        }
    

    sendBufferSync

    同步发送

        /**
         * send buffer without using the asynchronous
         * calls to selector and then close the socket
         * @param bb
         */
        void sendBufferSync(ByteBuffer bb) {//同步发送
           try {
               /* configure socket to be blocking
                * so that we dont have to do write in 
                * a tight while loop
                */
               sock.configureBlocking(true);
               if (bb != ServerCnxnFactory.closeConn) {
                   if (sock.isOpen()) {
                       sock.write(bb);
                   }
                   packetSent();
               } 
           } catch (IOException ie) {
               LOG.error("Error sending data synchronously ", ie);
           }
        }
    

    sendBuffer

    发送的核心函数

        public void sendBuffer(ByteBuffer bb) {//发送
            try {
                if (bb != ServerCnxnFactory.closeConn) {//如果不是"关闭"的回复
                    // We check if write interest here because if it is NOT set,
                    // nothing is queued, so we can try to send the buffer right
                    // away without waking up the selector
                    if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {//如果目前selectionKey还未注册WRITE(代表不能用NIO的方式),则直接写
                        try {
                            sock.write(bb);
                        } catch (IOException e) {
                            // we are just doing best effort right now
                        }
                    }
                    // if there is nothing left to send, we are done
                    if (bb.remaining() == 0) {//如果读取完内容,则更新发包次数
                        packetSent();
                        return;
                    }
                }
    
                synchronized(this.factory){
                    sk.selector().wakeup();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                                + " is valid: " + sk.isValid());
                    }
                    outgoingBuffers.add(bb);//添加到发送缓存队列
                    if (sk.isValid()) {
                        sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);//注册WRITE事件
                    }
                }
                
            } catch(Exception e) {
                LOG.error("Unexpected Exception: ", e);
            }
        }
    

    主要逻辑如下
    如果不是"关闭"的ByteBuffer,如果能用NIO的方式就用NIO的方式,加入outgoingBuffers队列,否则就直接同步发送了

    继承父类ServerCnxn的方法

    有很多,这里只用列举一个即可

    incrOutstandingRequests

    //增加尚未处理的请求个数

        protected void incrOutstandingRequests(RequestHeader h) {
            if (h.getXid() >= 0) {
                synchronized (this) {
                    outstandingRequests++;
                }
                synchronized (this.factory) {        
                    // check throttling
                    if (zkServer.getInProcess() > outstandingLimit) {//如果超过阈值了,就禁止读
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Throttling recv " + zkServer.getInProcess());
                        }
                        disableRecv();
                        // following lines should not be needed since we are
                        // already reading
                        // } else {
                        // enableRecv();
                    }
                }
            }
    
        }
    

    实现接口Watcher相关

    实现接口方法process

        synchronized public void process(WatchedEvent event) {
            ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid为-1表示为通知
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                         "Deliver event " + event + " to 0x"
                                         + Long.toHexString(this.sessionId)
                                         + " through " + this);
            }
    
            // Convert WatchedEvent to a type that can be sent over the wire
            WatcherEvent e = event.getWrapper();//包装为WatcherEvent来提供网络传输
    
            sendResponse(h, e, "notification");//给client发送请求,通知WatchedEvent的发生
        }
    

    里面调用了函数sendResponse,主要是进行一些序列化的操作,然后把对应长度len写入,方便client读,完成一些数据统计,更新的操作

        synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                // Make space for length
                BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
                try {
                    baos.write(fourBytes);
                    bos.writeRecord(h, "header");
                    if (r != null) {
                        bos.writeRecord(r, tag);
                    }
                    baos.close();
                } catch (IOException e) {
                    LOG.error("Error serializing response");
                }
                byte b[] = baos.toByteArray();
                ByteBuffer bb = ByteBuffer.wrap(b);
                bb.putInt(b.length - 4).rewind();//把len-4放入byteBuffer,代表后续内容的长度
                sendBuffer(bb);
                if (h.getXid() > 0) {
                    synchronized(this){
                        outstandingRequests--;
                    }
                    // check throttling
                    synchronized (this.factory) {        
                        if (zkServer.getInProcess() < outstandingLimit
                                || outstandingRequests < 1) {
                            sk.selector().wakeup();
                            enableRecv();//开启写
                        }
                    }
                }
             } catch(Exception e) {
                LOG.warn("Unexpected exception. Destruction averted.", e);
             }
        }
    

    里面调用的sendBuffer在上面已经讲过了

    cmd处理相关

    cleanupWriterSocket函数完成对应cmd处理器写完printWriter之后进行关闭

        private void cleanupWriterSocket(PrintWriter pwriter) {
            try {
                if (pwriter != null) {
                    pwriter.flush();
                    pwriter.close();
                }
            } catch (Exception e) {
                LOG.info("Error closing PrintWriter ", e);
            } finally {
                try {
                    close();
                } catch (Exception e) {
                    LOG.error("Error closing a command socket ", e);
                }
            }
        }
    

    思考

    server在response最开始4个字节写入了len,client是如何处理的

    在前面阅读源码18中讲过,ClientCnxnSocketNIO#doIO,逻辑如下

    client先读取长度,再分配对应空间,读取后续response

    NIOServerCnxn构造函数中,感兴趣的事件就是READ,那么ACCEPT是在哪完成的

    因为先要完成server 接受 client的连接,那么这一步在哪完成,这里简要带过后面才讲的
    NIOServerCnxnFactory

    server对ACCEPT事件的注册,NIOServerCnxnFactory#configure
    server接受client连接,在读取client的请求,如下
    NIOServerCnxnFactory#run

    server接受连接,读取请求

    doIO函数是哪里调用的

    是NIOServerCnxnFactory#run中根据SelectionKey的attachment获取NIOServerCnxn对象,读写就绪时调用的,对应源码NIOServerCnxnFactory#run,这里不展开

    doIO的调用方

    吐槽

    client发送请求的类型判断bug

    在上面介绍payload和非payload过程中,其实都是根据开头4个字节得到int,而判断是不是非payload就根据mapping规则来,比如如果int值是"conf"对应的1668247142,那么就认为是conf的cmd,而不会认为是有1668247142这么大的byteBuffer。感觉是一个bug。如果刚好长度这么大,就会被认为成conf的cmd了。

    nio相关的东西太多了

    这个也没办法,这就是nio的实现

    refer

    http://www.cnblogs.com/leesf456/p/6484780.html

    相关文章

      网友评论

          本文标题:zk源码阅读33:Server与Client的网络I/O(二):

          本文链接:https://www.haomeiwen.com/subject/wopjrxtx.html