read

作者: JIU_LV | 来源:发表于2018-12-06 09:44 被阅读0次

    AbstractNioByteChannel.read()

    @Override
            public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);//每读一次,会加一次read
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);//触发事件
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();//调用record()每次读完会进行,预分配调整,默认第一次是1024
                    pipeline.fireChannelReadComplete();//触发事件
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
     
    

    DefaultMaxBytesRecvByteBufAllocator

       @Override
            public boolean continueReading() {
                return continueReading(defaultMaybeMoreSupplier);
            }
    
            @Override
            public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                return config.isAutoRead() &&
                       maybeMoreDataSupplier.get() &&
                       totalMessages < maxMessagePerRead &&  //有判断读的次数
                       totalBytesRead > 0;
            }
         private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
                @Override
                public boolean get() {
                    return attemptBytesRead == lastBytesRead;//默认分配了1024 只读了420,所以不继续读
                }
            };
    

    AdaptiveRecvByteBufAllocator.HandleImpl计算每次需要的byteSize平均值

    private final class HandleImpl extends MaxMessageHandle {
            private final int minIndex;
            private final int maxIndex;
            private int index;
            private int nextReceiveBufferSize;
            private boolean decreaseNow;
    
            public HandleImpl(int minIndex, int maxIndex, int initial) {
                this.minIndex = minIndex;
                this.maxIndex = maxIndex;
    
                index = getSizeTableIndex(initial);
                nextReceiveBufferSize = SIZE_TABLE[index];
            }
    
            @Override
            public int guess() {
                return nextReceiveBufferSize;
            }
    //每次读完会进行,预分配调整,默认第一次是1024
            private void record(int actualReadBytes) {
                if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
                    if (decreaseNow) {
                        index = Math.max(index - INDEX_DECREMENT, minIndex);
                        nextReceiveBufferSize = SIZE_TABLE[index];
                        decreaseNow = false;
                    } else {
                        decreaseNow = true;
                    }
                } else if (actualReadBytes >= nextReceiveBufferSize) {
                    index = Math.min(index + INDEX_INCREMENT, maxIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                }
            }
    
            @Override
            public void readComplete() {
                record(totalBytesRead());
            }
        }
    
        private static int getSizeTableIndex(final int size) {
            for (int low = 0, high = SIZE_TABLE.length - 1;;) {
                if (high < low) {
                    return low;
                }
                if (high == low) {
                    return high;
                }
    
                int mid = low + high >>> 1;
                int a = SIZE_TABLE[mid];
                int b = SIZE_TABLE[mid + 1];
                if (size > b) {
                    low = mid + 1;
                } else if (size < a) {
                    high = mid - 1;
                } else if (size == a) {
                    return mid;
                } else {
                    return mid + 1;
                }
            }
        }
    

    导读

    AbstractNioByteChannel.read()
      AdaptiveRecvByteBufAllocator allocHandle = recvBufAllocHandle()
      byteBuf = allocHandle.allocate(allocator);
      allocHandle.lastBytesRead(doReadBytes(byteBuf));
      allocHandle.incMessagesRead(1);
      readPending = false;
      pipeline.fireChannelRead(byteBuf);
      allocHandle.continueReading()
        continueReading(defaultMaybeMoreSupplier)
        bytesToRead > 0 && maybeMoreDataSupplier.get()
            UncheckedBooleanSupplier.get()
              return attemptBytesRead == lastBytesRead
      
      allocHandle.readComplete();
         record(totalBytesRead());
           nextReceiveBufferSize = SIZE_TABLE[index]
      pipeline.fireChannelReadComplete();
      closeOnRead(pipeline)
        if (!isInputShutdown0()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
               shutdownInput();
               pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
           }
        else
          doClose()
              doClose0(promise);
              outboundBuffer.failFlushed(cause, notify);
              outboundBuffer.close(closeCause);
              fireChannelInactiveAndDeregister(wasActive);
        pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE)
      if (!readPending && !config.isAutoRead()) {
          removeReadOp();
      }
    

    AbstractNioMessageChannel.NioMessageUnsafe

    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    相关文章

      网友评论

        本文标题:read

        本文链接:https://www.haomeiwen.com/subject/asjncqtx.html