在研究NioEventLoop
执行过程的时候,检测IO事件(包括新连接)
,处理IO事件
,执行所有任务
三个过程。其中检测IO事件中通过持有的selector
去轮询事件,检测出新连接。这里复用同一段代码。
今天我们研究的新连接介入的过程大概如下:
- 检测新连接
- 检测新连接之后,创建
NioSocketChannel
,也就是客户端channel
。 - 接着给
channel
分配一个NioEventLoop
,并且把该channel
注册到NioEventLoop
对应的selector
上。至此,这条channel
之后的读写都由该NioEventLoop
进行管理。 - 最后向
selector
注册读写事件,注册的时候和服务端启动注册accept
事件复用同一段逻辑。
netty的多连接复用指的是,多个连接父用一个NioEventLoop
持有的线程。
netty服务端在启动的时候会绑定一个bossGroup
,即NioEventLoop
,在bind()
绑定端口的时候注册accept
(新连接接入)事件。扫描到该事件后,便处理。因此入口从:NioEventLoop#processSelectedKeys()
开始
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//真正的处理过程
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
真正的入口NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
,改方法整体逻辑我们前面分析过在《NioEventLoop执行之processSelectedKeys()》章节中有介绍,这里我们直接看,新连接处理的逻辑
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//省略代码
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//如果当前NioEventLoop是workGroup 则可能是OP_READ,bossGroup是OP_ACCEPT
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//新连接接入以及读事件处理入口
unsafe.read();
}
}
这里的unsafe
是在《Channel创建过程》的时候,调用了父类AbstractChannel#AbstractChannel()
的构造方法,和pipeline
一起初始化的。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
该unsalf
为NioServerSockeChannel
的父类AbstractNioMessageChannel#newUnsafe()
创建,可以看到对应的是AbstractNioMessageChannel.NioMessageUnsafe
,内部类
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
查看该类的read()
方法,其大致流程如下:
- 循环调用jdk底层的代码创建
channel
,并用netty的NioSocketChannel
包装起来,代表新连接成功接入一个通道。 - 将所有获取到的
channel
存储到一个容器当中,检测接入的连接数,默认是一次接16个连接 - 遍历容器中的
channel
,依次调用方法fireChannelRead
,fireChannelReadComplete
,fireExceptionCaught
来触发对应的传播事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//临时存储读到的连接
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//服务端接入速率处理器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
//while循环调用doReadMessages()创建新连接对象
do {
//获取jdk底层的channel,并加入readBuf容器
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//把读到的连接做一个累加totalMessages,默认最多累计读取16个连接,结束循环
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//触发readBuf容器内所有的传播事件:ChannelRead 读事件
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
//清空容器
readBuf.clear();
allocHandle.readComplete();
//触发传播事件:ChannelReadComplete,所有的读事件完成
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//触发传播事件:exceptionCaught,触发异常
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
- 获取jdk底层的channel,调用的是
NioServerSocketChannel#doReadMessages()
,创建jdk底层channel
并且用NioSocketChannel
包装起来,将该channel
添加到传入的容器保存起来,同时返回一个计数。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//获取jdk底层的channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//将jdk底层的channel封装到netty的channel,并存储到传入的容器当中
buf.add(new NioSocketChannel(this, ch));
//成功和创建 客户端接入的一条通道,并返回
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
下面这段代码allocHandle
是一个服务器接入速率处理器,其实是DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle
,通过incMessagesRead()
方法维持一个成员变量totalMessages
,与continueReading()
方法配合,控制一个while循环接入的连接最大数。循环获取了一个批次的连接之后再统一处理这部分连接。
//服务端接入速率处理器
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
//判断读取到的连接总数是否大于最大连接数,maxMessagePerRead默认16
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
网友评论