上一节最后提到新连接接入过程当中,最后一步是注册Selector,其注册过程和之前的章节《注册selector》类似,但是读事件,却又有区别。
入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()
,
childGroup.register(child)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
实际上调用了AbstractChannel.AbstractUnsafe#register0()
,触发了通道激活事件
if (isActive()) {
if (firstRegistration) {
//触发通道激活事件,调用HeadContent的
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
改方法从pipeline
的头部开始,即DefaultChannelPipeline.HeadContext#channelActive()
从而触发了readIfIsAutoRead()
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
到此。读事件将从尾部的TailContent#read()
被触发,从而依次执行ctx.read()
,从尾部开始,每个outboundHandler
的read()
事件都被触发。直到头部。
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
//获取最近的outboundhandler
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
//并依次执行其read方法
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
进入头部HeadContext#read()
,并且最终更改了selectionKey
,向selector
注册了读事件
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
//io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
网友评论