上一遍文章我们分析了netty NioSocketChannel写数据的过程,接下来我们分析netty NioSocketChannel读数据过程,我们接着上一遍去分析,一端的NioSocketChannel向另一端发送了hello world,那么另一端是如何接受到hello world的呢?
读事件的触发
不论是客户端还是服务端在创建NioSocketChannel实例之后,同样会经历初始化,注册和连接的过程,当连接完成之后NioSocketChannel会向selector注册OP_READ事件,当一端向另一端写数据的时候会触发读事件。NioSocketChannel绑定的NioEventLoop会执行selector.select(),当有读写事件发生的时候,NioEventLoop执行 processSelectedKeys方法来处理这些事件。关于NioEventLoop的解析请参考https://www.jianshu.com/p/732f9dea34d7
processSelectedKeys
//selectedKeys会被netty设置成SelectedSelectionKeySet,所以会执行processSelectedKeysOptimized
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
//取得每个IO事件对应的SelectionKey
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
//selectedKeys清空对应的SelectionKey
selectedKeys.keys[i] = null;
final Object a = k.attachment();
//我们基于NIO分析,所以进入processSelectedKey方法
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
processSelectedKey是真正处理IO事件的方法
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//获取channel的unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
//如果selectionKey不合法,关闭channel
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
//获取事件类型的编码
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
//如果是OP_CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
//OP_CONNECT从感兴趣的事件中清除
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//完成连接
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
//如果是OP_WRITE事件,那么执行forceFlush,把写缓存刷到网络
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//处理读事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我们分析unsafe.read源代码
unsafe.read()
@Override
public final void read() {
final ChannelConfig config = config();
//shouldBreakReadReady用于判断channel的input是不是关闭了,
//如果关闭了那么就不应该去读数据了,同时清除OP_READ事件
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
//获取ByteBuffer分配器,这里会涉及到jemalloc的知识,请看我写的另一篇文章[https://www.jianshu.com/p/550704d5a628]
final ByteBufAllocator allocator = config.getAllocator();
//Handle的默认实现是HandleImpl,它的作用是用来决定每次从channel读取多少数据,每次可以最多读取多少条数据(默认16条)
//首次默认读取1024个字节,之后根据实际读取到的数据量动态更改每次应该读取的数据量
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//向netty申请一个ByteByf,这个ByteBuf大小是通过allocHandle计算得到
byteBuf = allocHandle.allocate(allocator);
//doReadBytes是SocketChannel真正读数据的入口方法
//lastBytesRead是记录这次读到的的数据量,如果实际读到的数据量等于byteBuf大小,那么allocHandle会增加下次读取数据用的ByteBuf的大小
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
//如果读取的数据量是0,那么释放byteBuf
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
//allocHandle更新本次读取到的数据总数
allocHandle.incMessagesRead(1);
readPending = false;
//触发fireChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//allocHandle.continueReading() 会判断需不需要继续从channel中读数据
//如果实际读取数据量等于打算读取的数据量,同时读取的数据条数小于单次读取容许的最大条数那么会继续读取
} while (allocHandle.continueReading());
//handle根据本次读取的数据总量,动态调整下次读取数据的ByteBuf大小
allocHandle.readComplete();
//触发数据读取完成事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
我们分析下doReadBytes方法以及内部的调用链
doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//设置handle试图读取的数据量
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
//ByteBuf.writeBytes解析
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
ensureWritable(length);
int writtenBytes = setBytes(writerIndex, in, length);
if (writtenBytes > 0) {
//更新ByteBuf的writerIndex
writerIndex += writtenBytes;
}
return writtenBytes;
}
//ByteBuf.setBytes解析
@Override
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
//internalNioBuffer返回的是ByteBuf对应的ByteBuffer,之后SocketChannel通过read()把数据读取到ByteBuffer中
return in.read(internalNioBuffer(index, length));
} catch (ClosedChannelException ignored) {
return -1;
}
}
netty从channel中读取完一批数据后包装成ByteBuf,接着触发channelRead事件,开发者自定义的inboundHandler的channelRead就会被调用,在这个方法中开发者可以根据业务罗处理接受到的这个网络数据
上面就是netty NioSocketChannel从网络上读取数据的过程
接下来我们看下netty是如何动态修改每次读取数据ByteBuf大小,
这个逻辑在allocHandle.readComplete()实现,我们解析readComplete的源代码
readComplete
public void readComplete() {
//totalBytesRead()用于获取这次IO读事件中读取到的数据总量
record(totalBytesRead());
}
record方法源代码
record
private void record(int actualReadBytes) {
//SIZE_TABLE数组记录了一组数,这些数代表着不同大小的ByteBuf,数组元素按照从小到大的顺序存放在SIZE_TABLE中
//数组中的存的是哪些数据下面会解析
//index就是记录每次从channel读取数据ByteBuf的大小在SIZE_TABLE中的索引
//INDEX_DECREMENT默认是1
//如果actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)] 说明这次IO事件读取到的数据量小于SIZE_TABLE[index-1]
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
if (decreaseNow) {
//下一次读取channel数据的ByteBuf容量缩减为max (SIZE_TABLE[index -1],minIndex)
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
//当actualReadBytes <[index-1]在首次成立的时候,并不会立马减少下次读取ByteBuf的容量,而只是把decreaseNow设置成true,
//说明只有连续两次读取到的实际数据量都小于SIZE_TABLE[index -1]才会使得下次读取数据ByteBuf的容量缩减
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
SIZE_TABLE是一个长度为53的int数组
它存的数据分成两个部分
- 第一部分:从16开始每次增加16直到496,总共31个数据项
- 第二部分:从512开始,下一个数是前一个数的2倍,直到达到int能表示的最大的powerOfTwo()
SIZE_TABLE.png
就分析这么多了,谢谢阅读
网友评论