美文网首页
Netty NioSocketChannel读数据源码分析

Netty NioSocketChannel读数据源码分析

作者: whateverblake | 来源:发表于2020-07-13 17:13 被阅读0次

    上一遍文章我们分析了netty NioSocketChannel写数据的过程,接下来我们分析netty NioSocketChannel读数据过程,我们接着上一遍去分析,一端的NioSocketChannel向另一端发送了hello world,那么另一端是如何接受到hello world的呢?

    读事件的触发

    不论是客户端还是服务端在创建NioSocketChannel实例之后,同样会经历初始化,注册和连接的过程,当连接完成之后NioSocketChannel会向selector注册OP_READ事件,当一端向另一端写数据的时候会触发读事件。NioSocketChannel绑定的NioEventLoop会执行selector.select(),当有读写事件发生的时候,NioEventLoop执行 processSelectedKeys方法来处理这些事件。关于NioEventLoop的解析请参考https://www.jianshu.com/p/732f9dea34d7

    processSelectedKeys
    //selectedKeys会被netty设置成SelectedSelectionKeySet,所以会执行processSelectedKeysOptimized
    private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
    private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                //取得每个IO事件对应的SelectionKey
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
               //selectedKeys清空对应的SelectionKey
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
                //我们基于NIO分析,所以进入processSelectedKey方法
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    processSelectedKey是真正处理IO事件的方法

    processSelectedKey
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            //获取channel的unsafe
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop == this) {
                    // close the channel if the key is not valid anymore
                   //如果selectionKey不合法,关闭channel
                    unsafe.close(unsafe.voidPromise());
                }
                return;
            }
    
            try {
                //获取事件类型的编码
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                //如果是OP_CONNECT事件
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    //OP_CONNECT从感兴趣的事件中清除
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
                    //完成连接
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    //如果是OP_WRITE事件,那么执行forceFlush,把写缓存刷到网络
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                   //处理读事件
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    

    我们分析unsafe.read源代码

    unsafe.read()
    @Override
            public final void read() {
                final ChannelConfig config = config();
                //shouldBreakReadReady用于判断channel的input是不是关闭了,
                //如果关闭了那么就不应该去读数据了,同时清除OP_READ事件
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                //获取ByteBuffer分配器,这里会涉及到jemalloc的知识,请看我写的另一篇文章[https://www.jianshu.com/p/550704d5a628]
                final ByteBufAllocator allocator = config.getAllocator();
    //Handle的默认实现是HandleImpl,它的作用是用来决定每次从channel读取多少数据,每次可以最多读取多少条数据(默认16条)
     //首次默认读取1024个字节,之后根据实际读取到的数据量动态更改每次应该读取的数据量
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        //向netty申请一个ByteByf,这个ByteBuf大小是通过allocHandle计算得到
                        byteBuf = allocHandle.allocate(allocator);
                       //doReadBytes是SocketChannel真正读数据的入口方法
                       //lastBytesRead是记录这次读到的的数据量,如果实际读到的数据量等于byteBuf大小,那么allocHandle会增加下次读取数据用的ByteBuf的大小
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                           //如果读取的数据量是0,那么释放byteBuf
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
                       //allocHandle更新本次读取到的数据总数
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        //触发fireChannelRead事件
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                      //allocHandle.continueReading() 会判断需不需要继续从channel中读数据
                     //如果实际读取数据量等于打算读取的数据量,同时读取的数据条数小于单次读取容许的最大条数那么会继续读取
                    } while (allocHandle.continueReading());
                   //handle根据本次读取的数据总量,动态调整下次读取数据的ByteBuf大小
                    allocHandle.readComplete();
                 //触发数据读取完成事件
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    我们分析下doReadBytes方法以及内部的调用链

    doReadBytes
      protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
             //设置handle试图读取的数据量
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    
    //ByteBuf.writeBytes解析
       public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
            ensureWritable(length);
           
            int writtenBytes = setBytes(writerIndex, in, length);
            if (writtenBytes > 0) {
              //更新ByteBuf的writerIndex
                writerIndex += writtenBytes;
            }
            return writtenBytes;
        }
    
    //ByteBuf.setBytes解析
     @Override
        public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
            try {
               //internalNioBuffer返回的是ByteBuf对应的ByteBuffer,之后SocketChannel通过read()把数据读取到ByteBuffer中
                return in.read(internalNioBuffer(index, length));
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }
    

    netty从channel中读取完一批数据后包装成ByteBuf,接着触发channelRead事件,开发者自定义的inboundHandler的channelRead就会被调用,在这个方法中开发者可以根据业务罗处理接受到的这个网络数据


    上面就是netty NioSocketChannel从网络上读取数据的过程


    接下来我们看下netty是如何动态修改每次读取数据ByteBuf大小,
    这个逻辑在allocHandle.readComplete()实现,我们解析readComplete的源代码

    readComplete
       public void readComplete() {
                //totalBytesRead()用于获取这次IO读事件中读取到的数据总量
                record(totalBytesRead());
        }
    

    record方法源代码

    record
     private void record(int actualReadBytes) {
                //SIZE_TABLE数组记录了一组数,这些数代表着不同大小的ByteBuf,数组元素按照从小到大的顺序存放在SIZE_TABLE中
               //数组中的存的是哪些数据下面会解析 
             //index就是记录每次从channel读取数据ByteBuf的大小在SIZE_TABLE中的索引
              //INDEX_DECREMENT默认是1
               //如果actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)] 说明这次IO事件读取到的数据量小于SIZE_TABLE[index-1]
                if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                    if (decreaseNow) {
                       //下一次读取channel数据的ByteBuf容量缩减为max (SIZE_TABLE[index -1],minIndex)
    
                        index = max(index - INDEX_DECREMENT, minIndex);
                        nextReceiveBufferSize = SIZE_TABLE[index];
                        decreaseNow = false;
                    } else {
                       //当actualReadBytes <[index-1]在首次成立的时候,并不会立马减少下次读取ByteBuf的容量,而只是把decreaseNow设置成true,
                       //说明只有连续两次读取到的实际数据量都小于SIZE_TABLE[index -1]才会使得下次读取数据ByteBuf的容量缩减
                        decreaseNow = true;
                    }
                } else if (actualReadBytes >= nextReceiveBufferSize) {
                    index = min(index + INDEX_INCREMENT, maxIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                }
            }
    

    SIZE_TABLE是一个长度为53的int数组
    它存的数据分成两个部分

    • 第一部分:从16开始每次增加16直到496,总共31个数据项
    • 第二部分:从512开始,下一个数是前一个数的2倍,直到达到int能表示的最大的powerOfTwo(2^{30}
      SIZE_TABLE.png

    就分析这么多了,谢谢阅读

    相关文章

      网友评论

          本文标题:Netty NioSocketChannel读数据源码分析

          本文链接:https://www.haomeiwen.com/subject/mjlccktx.html