美文网首页
Netty笔记-unSafe.read方法

Netty笔记-unSafe.read方法

作者: 兴浩 | 来源:发表于2018-07-12 17:42 被阅读17次

    1. 源码流程

            @Override
            public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    2.相关概念

    要读懂上面代码段需要理解其相关概念

    • ChannelConfig:跟Channel相关的配置参数
    • ChannelPipeline:事件传递相关处理
    • ByteBufAllocator:ByteBuf分配器
    • RecvByteBufAllocator:接口缓存分配器,这个是这次理解的重点

    3. RecvByteBufAllocator

    从注释上理解,RecvByteBufAllocator是为了节约内存,为了分配合理的内存做了优化,具体如何分配也是通过猜测

    /**
     * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
     * not to waste its space.
     */
    public interface RecvByteBufAllocator {
        /**
         * Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
         * required for predicting an optimal buffer capacity.
         */
        Handle newHandle();
    
        /**
         * @deprecated Use {@link ExtendedHandle}.
         */
        @Deprecated
        interface Handle {
            /**
             * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
             * enough not to waste its space.
             */
            ByteBuf allocate(ByteBufAllocator alloc);
    
            /**
             * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
             * capacity.
             */
            int guess();
    
            /**
             * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
             * read loop.
             * <p>
             * This may be used by {@link #continueReading()} to determine if the read operation should complete.
             * </p>
             * This is only ever a hint and may be ignored by the implementation.
             * @param config The channel configuration which may impact this object's behavior.
             */
            void reset(ChannelConfig config);
    
            /**
             * Increment the number of messages that have been read for the current read loop.
             * @param numMessages The amount to increment by.
             */
            void incMessagesRead(int numMessages);
    
            /**
             * Set the bytes that have been read for the last read operation.
             * This may be used to increment the number of bytes that have been read.
             * @param bytes The number of bytes from the previous read operation. This may be negative if an read error
             * occurs. If a negative value is seen it is expected to be return on the next call to
             * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
             * to this class and is not required to be enforced in {@link #continueReading()}.
             */
            void lastBytesRead(int bytes);
    
            /**
             * Get the amount of bytes for the previous read operation.
             * @return The amount of bytes for the previous read operation.
             */
            int lastBytesRead();
    
            /**
             * Set how many bytes the read operation will (or did) attempt to read.
             * @param bytes How many bytes the read operation will (or did) attempt to read.
             */
            void attemptedBytesRead(int bytes);
    
            /**
             * Get how many bytes the read operation will (or did) attempt to read.
             * @return How many bytes the read operation will (or did) attempt to read.
             */
            int attemptedBytesRead();
    
            /**
             * Determine if the current read loop should should continue.
             * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
             */
            boolean continueReading();
    
            /**
             * The read has completed.
             */
            void readComplete();
        }
    }
    

    3.1 lastBytesRead方法

    记录此处读到的buffer字节大小,
    比如"Netty rocks!"字节长度为12

    3.2 incMessagesRead方法

    增加读取的次数

    3.3 continueReading方法

    判断当前循环读取是否继续下去

    3.4 Handle源码相关源码

    重点看continueReading方法的实现

    • attemptedBytesRead表示希望尝试读取的字节,比如想要读取1024字节
    • lastBytesRead表示实际读取到的字节
    • totalBytesRead表示累计读到在字节数

    所以当attemptedBytesRead==lastBytesRead的时候,表明后续有可能还有字节没有读完,否则就认为读完了,不需要继续,

    到这里可以理解RecvByteBufAllocator.Handle实际上实现了一个循环读取ByeBuf的流程

       public abstract class MaxMessageHandle implements ExtendedHandle {
            private ChannelConfig config;
            private int maxMessagePerRead;
            private int totalMessages;
            private int totalBytesRead;
            private int attemptedBytesRead;
            private int lastBytesRead;
            private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
                @Override
                public boolean get() {
                    return attemptedBytesRead == lastBytesRead;
                }
            };
    
            /**
             * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
             */
            @Override
            public void reset(ChannelConfig config) {
                this.config = config;
                maxMessagePerRead = maxMessagesPerRead();
                totalMessages = totalBytesRead = 0;
            }
    
            @Override
            public ByteBuf allocate(ByteBufAllocator alloc) {
                return alloc.ioBuffer(guess());
            }
    
            @Override
            public final void incMessagesRead(int amt) {
                totalMessages += amt;
            }
    
            @Override
            public final void lastBytesRead(int bytes) {
                lastBytesRead = bytes;
                if (bytes > 0) {
                    totalBytesRead += bytes;
                }
            }
    
            @Override
            public final int lastBytesRead() {
                return lastBytesRead;
            }
    
            @Override
            public boolean continueReading() {
                return continueReading(defaultMaybeMoreSupplier);
            }
    
            @Override
            public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                return config.isAutoRead() &&
                       maybeMoreDataSupplier.get() &&
                       totalMessages < maxMessagePerRead &&
                       totalBytesRead > 0;
            }
    
            @Override
            public void readComplete() {
            }
    
            @Override
            public int attemptedBytesRead() {
                return attemptedBytesRead;
            }
    
            @Override
            public void attemptedBytesRead(int bytes) {
                attemptedBytesRead = bytes;
            }
    
            protected final int totalBytesRead() {
                return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
            }
        }
    

    4.真正读取ByteBuf

    由于NioSocketChannel对象中存有nio的SocketChannel实例,

    • 其调用了doReadBytes方法进行读取
    • doReadBytes方法中调用了ByteBuf的writeBytes方法
    • writeBytes方法内部调用了ScatteringByteChannel的read方法
        //NioSocketChannel
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    
        //ByteBuf
        public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
            ensureWritable(length);
            int writtenBytes = setBytes(writerIndex, in, length);
            if (writtenBytes > 0) {
                writerIndex += writtenBytes;
            }
            return writtenBytes;
        }
    
        @Override
        public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
            checkIndex(index, length);
            ByteBuffer tmpBuf = internalNioBuffer();
            index = idx(index);
            tmpBuf.clear().position(index).limit(index + length);
            try {
                return in.read(tmpBuf);
            } catch (ClosedChannelException ignored) {
                return -1;
            }
        }
    

    参考:

    Netty 源码剖析之 unSafe.read 方法
    netty学习系列四:读操作

    相关文章

      网友评论

          本文标题:Netty笔记-unSafe.read方法

          本文链接:https://www.haomeiwen.com/subject/qaerpftx.html