本文参考自慕课网《Java读源码之netty》
如何检测新连接? boss线程轮循出accept事件,通过底层的accept()接收连接
如何注册到NioEventLoop线程? 通过serverSocketChannel的Adptor中,调用workGroup的chooser.next()来确定EventLoop,从而调用其register()绑定selector,并设置对读事件敏感
四步骤:
- 检测新连接的accept事件
- 创建netty封装的channel
- 分配到eventloop线程并注册到selector
- 向selector注册读事件
1.新连接检测
- step1: 在NioEventLoop中
run()
的时候调用了processSelectedKey()
方法;其中对于SelectionKey
这个参数,如果检测到key的事件类型为read || accept事件的时候调用unsafe.read()
函数;
NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
NioUnsafe unsafe = ch.unsafe();
//...
if ((readyOps & 17) != 0 || readyOps == 0) {
unsafe.read();
//...
}
}
- step2: unsafe.read()是父类AbstractNioMessageChannel中的
read()
函数,其调用NioServerSocketChannel中的doReadMessages()
来接收channel;
AbstractNioMessageChannel.java
public void read() {
//...
try {
int localRead;
try {
do {
localRead = AbstractNioMessageChannel.
this.doReadMessages(this.readBuf);
//...当发现读完了,那么localRead就是一个非正数,就break;跳出
allocHandle.incMessagesRead(localRead);//连接计数
} while(allocHandle.continueReading());//一直读到没有限定条件
//....
}
}
}
//可以看到这里限定的是netty单次处理read()事件允许接受的一些最大门限
public boolean continueReading() {
return this.config.isAutoRead()
&& this.totalMessages < this.maxMessagePerRead //这个值默认16
&& this.totalBytesRead < 2147483647;
}
- step3:
doReadMessages()
的方法内,首先把jdk原生的channel拿到,然后封装为netty自己的channel;
NioServerSocketChannel.java
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = this.javaChannel().accept(); //接收到ch
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));//并放入上面MessageChannel中的List中
return 1;
}
//否则没有新连接了,返回非正数
}
//...
}
2.NioChannel的创建
通过上述的new NioSocketChannel(this, ch)
创建一个netty封装的channel(可以看到服务端是通过工厂模式反射,而这用的New instance)
new NioSocketChannel(this, ch)
//this就是当前的正在处理新连接的NioServeSocketChannel
//ch就是new出来的jdk自带channel
可以看到NioSocketChannel 的构造函数里面分为了两个步骤
public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {
super(parent, socket); //调用父类构造函数
this.config = new NioSocketChannel //新建绑定的config类
.NioSocketChannelConfig(this, socket.socket());
}
-
2.1 调用父类构造函数
AbstractNioByteChannel()
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, 1); // " 1 " 就是OP_READ }
-
2.1.1
configureBlocking(false)
设置非阻塞,save op将感兴趣的事件(OP_READ)保存到成员变量;继续调用其父类构造函数AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); //可以看到这里把原生的channel设置为非阻塞 } //...
-
2.1.2 创建与之关联的id ,unsafe ,pipline
基于AbstractNioChannel,继续调用父类AbstractChannel 的构造函数,配置如下
protected AbstractChannel(Channel parent) { //parent是服务端channel this.parent = parent; this.id = this.newId(); this.unsafe = this.newUnsafe(); this.pipeline = this.newChannelPipeline(); }
-
-
2.2 创建与channel绑定的配置类
NioSocketChannelConfig()
,注意这里是一个NioSocketChannel内部类。private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } //... }
-
setTcpNoDelay(true)
禁止Nagle算法,将小数据包无延迟发送,保证数据实时性;这里的调用是在父类DefaultSocketChannelConfig 构造函数里面实现public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } else { this.javaSocket = javaSocket; //这里的值 = !isAndroid(); 因为不是android所以是 !false=true if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { this.setTcpNoDelay(true);//设置Nodelay=true,调用到jdk底层 } catch (Exception var4) { } } } }
-
X. 题外话 Netty中的 Channel 分类
我们自己所使用的channel有
- 服务端 NioServeSocketChannel 通过工厂 反射实现
- 客户端 NioSocketChannel 通过连接触发serveSocketChannel的read进行 new产生
- unsafe 客户端channel构造的时候 产生
而所有channel的继承关系大致如下
image.png
-
Channel 是所有类的父类,含有
pipline eventloop parent channelId
等,是一个基本骨架的存在
-
AbstractChannel 是Channel的子类,包含 select的监听 ,含有
SelectionKey
SelectableChannel
(jdk底层的key和channel)以及阻塞与否,感兴趣的事件等
-
AbstractNioByteChannel是客户端Channel的父类,依赖于 NioByteUnsafe(将其作为成员变量)
- NioSocketChannel客户端channel 关心的事件是OP_READ
- NioByteUnsafe (通过 newUnsafe()来实例化)
-
AbstractNioMessageChannel是服务端channel的父类,依赖于 NioMessageUnsafe(将其作为成员变量)
- NioServeSocketChannel服务端channel 关心的事件是OP_ACCEPT
- NioMessageUnsafe(通过 newUnsafe()来实例化)
对于unsafe(),封装了channel的读写事件,对于服务端来说,read()会调用doReadMessage()中的jdk read方法接收请求;对客户端来说read()会调用doReadBytes()读取放入ByteBuffer中
3.新连接NioEventLoop的分配和selector的注册
首先我们回顾服务端channel的创建过程;在服务端channel创建后会通过init()
初始化,其最后又几行逻辑:
ServerBootstrap
void init(Channel channel) throws Exception {
//...
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(
new ChannelHandler[]{new ServerBootstrap
.ServerBootstrapAcceptor(
currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)});
}
});
}
可以看到服务端channel的Pipline中加入了一个ServerBootstrapAcceptor作为adptor,这个adptor会在接收到新连接的时候其作用;同时传入的还有用户自定义的Options和Attributes以及Handler,这些自定义的东西会在创建每个新连接的channel中给他们赋值,设置。
对于新连接,首先执行上述两部分的任务:NioEventLoop中拿到Key后会调用processSelectedKey()
来处理channel,里面会调用channel的unsafe来执行unsafe.read()
,unsafe是NioMessageChannel中的内部类read()
函数中通过doReadMessage()
通过jdk底层accept获取到连接的channel;
然后调用pipeline.fireChannelRead()
交给自动添加的adptor进行处理:
AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe {
public void read() {
//...
AbstractNioMessageChannel.this.doReadMessages(this.readBuf);
//...
for(int i = 0; i < localRead; ++i) {
AbstractNioMessageChannel.this.readPending = false;
pipeline.fireChannelRead(this.readBuf.get(i));//传给上份代码中的adptor
}
}
}
ServerBootstrapAcceptor中,主要有三件事:
-
添加childHandler
经历层层调用,到了ServeBootstrap的channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel)msg; //这里将 用户在初始化Bootstrap的时候,手动通过Initializer添加的handler //放入这个channel的pipline中 //initializer初始化添加之后将自己remove释放 child.pipeline().addLast(new ChannelHandler[]{this.childHandler}); //... }
-
设置options 和attributes
同样在ServeBootstrap 的channelRead()中 ,通过拿到channel的config类对channel进新option的设置,这些option也来自用户代码自定义,attribute同理;这些option和attibute都是通过在创建之前在pipline中添加的ServerBootstrapAcceptor初始化的时候传入的
try { if (!child.config() .setOption((ChannelOption)e //设置option .getKey(), e.getValue())) { ServerBootstrap.logger.warn("Unknown channel option: " + e); } } //... for(i$ = 0; i$ < len$; ++i$) { e = arr$[i$]; //获取自定义属性 child.attr((AttributeKey)e.getKey()).set(e.getValue());//绑定属性 }
-
为新连接选择NioEventLoop并注册selector
同样通过channelRead(),调用
this.childGroup.register(child)
完成这个逻辑,这里的childGroup就是有很多线程(EventLoop)的负责客户端读写的workGroup;workGroup里面通过chooser的.next()来决定是哪个线程/EventLoop来绑定,对应的NioEventLoop通过register注册channel,MultithreadEventExecutorGroup(父类)
//这里的eventloopGroup是workGroup(也就是很多线程响应客户端读写那个) public EventExecutor next() { return this.chooser.next(); //chooser在上一篇文章有讲过 //通过idx++&(length-1)确定是哪个eventLoop }
然后层层调用,就到了最终对jdk底层的调用注册上去,注册是时候通过注册函数中的attachment参数的方式将封装的nettyChannel放进去,当事件发生的时候通过获取attachment获取到netty封装好的channel
4 客户端NioSocketChannel中读事件的注册
上一步绑定到selector后其实并没有结束,如果不做任何操作是不会产生任何事件的,原因在于绑定的channel并没有设置感兴趣的事件,而selector只有感兴趣的事件发生才会检测到;所以在register至置定的EventLoop之后,会通过设置感兴趣的事件为readInterestOp:
AbstractNioChannel
protected void doBeginRead() throws Exception {
SelectionKey selectionKey = this.selectionKey;
if (selectionKey.isValid()) {
this.readPending = true;
int interestOps = selectionKey.interestOps();
//一般情况下interstOps=0 表示对任何事件不敏感
if ((interestOps & this.readInterestOp) == 0) {
selectionKey.interestOps(interestOps | this.readInterestOp);
//所以这里设置了对读事件敏感
}
}
}
网友评论