

作者: persisting_ | 来源:发表于2019-05-23 00:18 被阅读3次

    1 概述


    public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
        int writtenBytes = setBytes(writerIndex, in, length);
        if (writtenBytes > 0) {
            writerIndex += writtenBytes;
        return writtenBytes;
    public ByteBuf ensureWritable(int minWritableBytes) {
        return this;
    final void ensureWritable0(int minWritableBytes) {
        if (minWritableBytes <= writableBytes()) {
        // Normalize the current capacity to the power of 2.
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
        // Adjust to the new capacity.



    2 RecvByteBufAllocator接口相关实现介绍


    Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough not to waste its space.


    public interface RecvByteBufAllocator {
        * Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
        * required for predicting an optimal buffer capacity.
        Handle newHandle();
         * @deprecated Use {@link ExtendedHandle}.
        interface Handle {
             * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
             * enough not to waste its space.
            ByteBuf allocate(ByteBufAllocator alloc);
             * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
             * capacity.
            int guess();
             * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
             * read loop.
             * <p>
             * This may be used by {@link #continueReading()} to determine if the read operation should complete.
             * </p>
             * This is only ever a hint and may be ignored by the implementation.
             * @param config The channel configuration which may impact this object's behavior.
            void reset(ChannelConfig config);
             * Increment the number of messages that have been read for the current read loop.
             * @param numMessages The amount to increment by.
            void incMessagesRead(int numMessages);
             * Set the bytes that have been read for the last read operation.
             * This may be used to increment the number of bytes that have been read.
             * @param bytes The number of bytes from the previous read operation. This may be negative if an read error
             * occurs. If a negative value is seen it is expected to be return on the next call to
             * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
             * to this class and is not required to be enforced in {@link #continueReading()}.
            void lastBytesRead(int bytes);
             * Get the amount of bytes for the previous read operation.
             * @return The amount of bytes for the previous read operation.
            int lastBytesRead();
             * Set how many bytes the read operation will (or did) attempt to read.
             * @param bytes How many bytes the read operation will (or did) attempt to read.
            void attemptedBytesRead(int bytes);
             * Get how many bytes the read operation will (or did) attempt to read.
             * @return How many bytes the read operation will (or did) attempt to read.
            int attemptedBytesRead();
             * Determine if the current read loop should should continue.
             * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
            boolean continueReading();
             * The read has completed.
            void readComplete();











    that limits the number of read operations that will be attempted when a read operation is attempted by the event loop.


    public DefaultMaxMessagesRecvByteBufAllocator() {
    public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {



    * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        private int maxMessagePerRead;
        private int totalMessages;
        private int totalBytesRead;
        private int attemptedBytesRead;
        private int lastBytesRead;
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            public boolean get() {
                return attemptedBytesRead == lastBytesRead;
        * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
        public void reset(ChannelConfig config) {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
        public final int lastBytesRead() {
            return lastBytesRead;
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        //totalMessages < maxMessagePerRead && totalBytesRead > 0
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                    (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                    totalMessages < maxMessagePerRead &&
                    totalBytesRead > 0;
        public void readComplete() {
        public int attemptedBytesRead() {
            return attemptedBytesRead;
        public void attemptedBytesRead(int bytes) {
            attemptedBytesRead = bytes;
        protected final int totalBytesRead() {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;


    3 AdaptiveRecvByteBufAllocator实现



    The {@link RecvByteBufAllocator} that automatically increases and decreases the predicted buffer size on feed back. It gradually increases the expected number of readable bytes if the previous read fully filled the allocated buffer. It gradually decreases the expected number of readable bytes if the read operation was not able to fill a certain amount of the allocated buffer two times consecutively. Otherwise, it keeps returning the same prediction.



    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;
    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;
    private static final int[] SIZE_TABLE;
    //[16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224,
    // 240, 256, 272, 288, 304, 320, 336, 352, 368, 384, 400, 416, 
    // 432, 448, 464, 480, 496, 512, 1024, 2048, 4096, 8192, 16384, 
    // 32768, 65536, 131072, 262144, 524288, 1048576, 2097152, 
    // 4194304, 8388608, 16777216, 33554432, 67108864, 134217728, 
    // 268435456, 536870912, 1073741824]
    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
        for (int i = 512; i > 0; i <<= 1) {
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);


    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            if (high == low) {
                return high;
            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;


    private final int minIndex;
    private final int maxIndex;
    private final int initial;
    public AdaptiveRecvByteBufAllocator() {
        //上面列出的默认值,分别为64 1024 65536
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        if (minimum <= 0) {
            throw new IllegalArgumentException("minimum: " + minimum);
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        this.initial = initial;


    private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        private int nextReceiveBufferSize;
        private boolean decreaseNow;
        public HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;
            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        public void lastBytesRead(int bytes) {
            // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
            // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
            // the selector to check for more data. Going back to the selector can add significant latency for large
            // data transfers.
            if (bytes == attemptedBytesRead()) {
        public int guess() {
            return nextReceiveBufferSize;
        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
            //下面的else if则表明实际读取的大小大于上次预测的缓冲区大小,
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
        public void readComplete() {


    public ByteBuf allocate(ByteBufAllocator alloc) {
        return alloc.ioBuffer(guess());


    4 实际使用


     public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            //Netty Channel只会有一个HandleImpl实例,第一次调用此方法时
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                    readPending = false;
                    byteBuf = null;
                } while (allocHandle.continueReading());
                if (close) {
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {



