1. 源码流程
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
2.相关概念
要读懂上面代码段需要理解其相关概念
- ChannelConfig:跟Channel相关的配置参数
- ChannelPipeline:事件传递相关处理
- ByteBufAllocator:ByteBuf分配器
- RecvByteBufAllocator:
接口缓存分配器,这个是这次理解的重点
3. RecvByteBufAllocator
从注释上理解,RecvByteBufAllocator是为了节约内存,为了分配合理的内存做了优化,具体如何分配也是通过猜测
/**
* Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
* not to waste its space.
*/
public interface RecvByteBufAllocator {
/**
* Creates a new handle. The handle provides the actual operations and keeps the internal information which is
* required for predicting an optimal buffer capacity.
*/
Handle newHandle();
/**
* @deprecated Use {@link ExtendedHandle}.
*/
@Deprecated
interface Handle {
/**
* Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
* enough not to waste its space.
*/
ByteBuf allocate(ByteBufAllocator alloc);
/**
* Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
* capacity.
*/
int guess();
/**
* Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
* read loop.
* <p>
* This may be used by {@link #continueReading()} to determine if the read operation should complete.
* </p>
* This is only ever a hint and may be ignored by the implementation.
* @param config The channel configuration which may impact this object's behavior.
*/
void reset(ChannelConfig config);
/**
* Increment the number of messages that have been read for the current read loop.
* @param numMessages The amount to increment by.
*/
void incMessagesRead(int numMessages);
/**
* Set the bytes that have been read for the last read operation.
* This may be used to increment the number of bytes that have been read.
* @param bytes The number of bytes from the previous read operation. This may be negative if an read error
* occurs. If a negative value is seen it is expected to be return on the next call to
* {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
* to this class and is not required to be enforced in {@link #continueReading()}.
*/
void lastBytesRead(int bytes);
/**
* Get the amount of bytes for the previous read operation.
* @return The amount of bytes for the previous read operation.
*/
int lastBytesRead();
/**
* Set how many bytes the read operation will (or did) attempt to read.
* @param bytes How many bytes the read operation will (or did) attempt to read.
*/
void attemptedBytesRead(int bytes);
/**
* Get how many bytes the read operation will (or did) attempt to read.
* @return How many bytes the read operation will (or did) attempt to read.
*/
int attemptedBytesRead();
/**
* Determine if the current read loop should should continue.
* @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
*/
boolean continueReading();
/**
* The read has completed.
*/
void readComplete();
}
}
3.1 lastBytesRead方法
记录此处读到的buffer字节大小,
比如"Netty rocks!"字节长度为12
3.2 incMessagesRead方法
增加读取的次数
3.3 continueReading方法
判断当前循环读取是否继续下去
3.4 Handle源码相关源码
重点看continueReading方法的实现
- attemptedBytesRead表示希望尝试读取的字节,比如想要读取1024字节
- lastBytesRead表示实际读取到的字节
- totalBytesRead表示累计读到在字节数
所以当attemptedBytesRead==lastBytesRead的时候,表明后续有可能还有字节没有读完,否则就认为读完了,不需要继续,
到这里可以理解RecvByteBufAllocator.Handle实际上实现了一个循环读取ByeBuf的流程
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
private int maxMessagePerRead;
private int totalMessages;
private int totalBytesRead;
private int attemptedBytesRead;
private int lastBytesRead;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
/**
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
*/
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
@Override
public final int lastBytesRead() {
return lastBytesRead;
}
@Override
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
@Override
public void readComplete() {
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
}
4.真正读取ByteBuf
由于NioSocketChannel对象中存有nio的SocketChannel实例,
- 其调用了doReadBytes方法进行读取
- doReadBytes方法中调用了ByteBuf的writeBytes方法
- writeBytes方法内部调用了ScatteringByteChannel的read方法
//NioSocketChannel
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
//ByteBuf
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
writerIndex += writtenBytes;
}
return writtenBytes;
}
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
checkIndex(index, length);
ByteBuffer tmpBuf = internalNioBuffer();
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
try {
return in.read(tmpBuf);
} catch (ClosedChannelException ignored) {
return -1;
}
}
参考:
网友评论