AbstractNioByteChannel.read()
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);//每读一次,会加一次read
readPending = false;
pipeline.fireChannelRead(byteBuf);//触发事件
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();//调用record()每次读完会进行,预分配调整,默认第一次是1024
pipeline.fireChannelReadComplete();//触发事件
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
DefaultMaxBytesRecvByteBufAllocator
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead && //有判断读的次数
totalBytesRead > 0;
}
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptBytesRead == lastBytesRead;//默认分配了1024 只读了420,所以不继续读
}
};
AdaptiveRecvByteBufAllocator.HandleImpl计算每次需要的byteSize平均值
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;
public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
@Override
public int guess() {
return nextReceiveBufferSize;
}
//每次读完会进行,预分配调整,默认第一次是1024
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() {
record(totalBytesRead());
}
}
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
导读
AbstractNioByteChannel.read()
AdaptiveRecvByteBufAllocator allocHandle = recvBufAllocHandle()
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
allocHandle.continueReading()
continueReading(defaultMaybeMoreSupplier)
bytesToRead > 0 && maybeMoreDataSupplier.get()
UncheckedBooleanSupplier.get()
return attemptBytesRead == lastBytesRead
allocHandle.readComplete();
record(totalBytesRead());
nextReceiveBufferSize = SIZE_TABLE[index]
pipeline.fireChannelReadComplete();
closeOnRead(pipeline)
if (!isInputShutdown0()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
}
else
doClose()
doClose0(promise);
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
fireChannelInactiveAndDeregister(wasActive);
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE)
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
AbstractNioMessageChannel.NioMessageUnsafe
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
网友评论