概述

新连接接入流程大致分为4步:
- 检测新连接:通过服务端channel绑定的selector轮询出accept事件
- 创建NioSocketChannel:基于jdk的socketChannel创建netty的NioSocketChannel
- 分配线程及注册selector:给新创建的NioSocketChannel分配NioEventLoop,以后该channel的所有事件都通过该loop来管理,并注册到对应的selector上
- 向selector注册读事件:注册过程跟服务端启动时注册accept事件复用同一段逻辑
1. 检测新连接
注意:服务端channel和客户端channel都有NioEventLoop,很多代码是公用的,通过策略模式执行到不通channel的逻辑,看代码时要通过channel的不同进行区分(channel的结构可以参考《netty源码分析(一) - 基本组件》)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//unsafe:NioMessageUnsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// read 和 accept事件都通过unsafe.read();进行处理;ServerSocketChannel的read是指读取一条连接;SocketChannel是指读取IO数据
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- unsafe:当前是服务端channel的NioEventLoop,unsafe是NioMessageUnsafe
- unsafe.read():read 和 accept事件都通过unsafe.read()进行处理;服务端Channel的read是指读取一条连接;客户端是指读取IO数据
NioMessageUnsafe:read
public void read() {
//服务端channel config NioServerSocketChannel启动时创建
final ChannelConfig config = config();
//服务端channel pipeline NioServerSocketChannel启动时创建
final ChannelPipeline pipeline = pipeline();
//allocHandle控制连接读取的速率,默认是16
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//循环读取连接并放到readBuf中(List)
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//遍历readBuf,通过pipeline传播到ServerBootstrapAcceptor中执行
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
//服务端channel通过pipeline传播channelReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//读取连接报错时传播fireExceptionCaught事件
pipeline.fireExceptionCaught(exception);
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- 循环读取连接并放到readBuf中(List)
- 遍历readBuf,pipeline传播fireChannelRead事件到ServerBootstrapAcceptor中执行
- 通过pipeline传播channelReadComplete事件
- 读取连接报错时传播fireExceptionCaught事件
NioServerSocketChannel:doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
//1. 调用jdk底层accept方法获取jdk channel即SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//2. 创建客户端channel(NioSocketChannel),把jdk channel包装在里面
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//忽略非重点代码
}
return 0;
}
- 调用jdk底层accept方法获取jdk channel即SocketChannel
- 创建客户端channel(NioSocketChannel),把jdk channel包装在里面
- 创建的NioSocketChannel放到buf中返回
2. 创建NioSocketChannel
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
config = new NioSocketChannelConfig(this, socket.socket());
- 创建id/unsafe/pipeline,
- ch:jdk SocketChannel
- readInterestOp:感兴趣的事件-read事件
- NioSocketChannelConfig:会默认关闭Nagle算法(setTcpNoDelay(true)),小的数据包尽可能发出去,降低延迟
3. 分配线程及注册selector
前面init流程中分析过服务端channel初始化时会添加一个handle即ServerBootstrapAcceptor,其中重写了channelRead(详细分析参考《netty源码分析(二) - 服务端启动 - 2》,下面只分析下ServerBootstrapAcceptor创建过程)
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
//workGroup工作线程组
private final EventLoopGroup childGroup;
//childHandler:启动类中配置(EchoServer)
private final ChannelHandler childHandler;
//childOptions:启动类中配置
private final Entry<ChannelOption<?>, Object>[] childOptions;
//childAttrs:启动类中配置
private final Entry<AttributeKey<?>, Object>[] childAttrs;
- childGroup:workGroup工作线程组
- childHandler:启动类中配置(EchoServer)
- childOptions:启动类中配置
- childAttrs:启动类中配置
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//把childHandler加到客户端channel(NioSocketChannel) pipeline上
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//NioSocketChannel注册到childGroup(实际就是NioEventLoop数组)的selectors上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 把childHandler加到客户端channel(NioSocketChannel) pipeline上
- 配置childOptions/childAttrs到NioSocketChannel上
- 从childGroup中选择NioEventLoop并注册selector;register过程跟NioServerSocketChannel启动时注册selector时流程一致
只分析下最终执行的register0方法
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
//调用jdk底层进行注册感兴趣的事件,但是此时注册的实际是0,不是read事件
doRegister();
neverRegistered = false;
//执行pipeline上handlerAdded方法
pipeline.invokeHandlerAddedIfNeeded();
//cas设置注册成功状态
safeSetSuccess(promise);
//执行pipeline上channelRegistered方法
pipeline.fireChannelRegistered();
//判断当前channel是否以及open
if (isActive()) {
//firstRegistration 默认为true,执一次(注册完成)之后变为false
if (firstRegistration) {
//执行pipeline上channelActive方法,在HeadContext中执行到readIfIsAutoRead()真正注册read事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
- 调用jdk底层进行注册感兴趣的事件,但是此时注册的实际是0,不是read事件
- 执行pipeline上handlerAdded方法
- 执行pipeline上channelRegistered方法
- 首次执行时执行pipeline上channelActive方法,在HeadContext中执行到readIfIsAutoRead()真正注册read事件
protected void doRegister() throws Exception {
for (;;) {
try {
//注册 0 事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
//忽略非重点代码
}
}
}
- register三个参数:分别为当前NioEventLoop中的selector、感兴趣的事件 0、attachment为当前NioSocketChannel
4. 向selector注册读事件
注册读事件流程在HeadContext的readIfIsAutoRead()方法中,执行逻辑跟《netty源码分析(二) - 服务端启动 - 2》服务端绑定端口后的注册逻辑一致;唯一区别是当前是客户端channel,注册的是read事件
至此新连接接入流程分析完毕
网友评论