概述
Netty的线程模型没什么出彩的地方,旧瓶装新酒,其就是基于Reactor模式
Reactor模式结构
首先用一张许多人都看过的图来开始说明Reactor模式
Netty Reactor.jpeg这张图估计在许多博客和帖子都会看到,但是许多博客却没有详细说明及解释这张图在netty的架构上反应出来
-
Reactor模式角色定义
1:MainReactor: 负责响应client的连接请求,并建立连接,可以有1个或者多个,维护一个独立的NIO Selector
2:SubReactor: 负责对client的读写请求进行处理,可以1个或者多个,并也维护一个独立的NIO Selector
3:Acceptor: 负责MainReactor和SubReactor的桥梁左右,已经准备好的连接转发到SubReactor中进行处理 -
Netty中Reactor模式角色定义
1:MainReactorEventLoopGroup,在Netty4以前叫做BossGroup;
2:SubReactor:EventLoopGroup, 在Netty4以前叫做WorkGroup;
3:Acceptor:ServerBootstrapAcceptor:一个系统自带的ChannelInboundHandler事件拦截器,真正的将已准备好的Channel注册到SubReactor中;
SubReactor:EventLoopGroup
subReactor的任务比较简单,接收Acceptor的Channel,后将Channel重新进行注册,并触发自定义的Handler来处理逻辑
1)接收Acceptor传递过来的Channel通道
2)注册到相应的selector。
private void register0(ChannelPromise promise) {
try {
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
}
}
3)调用eventLoop.execute用以执行注册任务
4)启动子线程。即启动了subReactor
注意:pipeline.fireChannelRegistered()触发的事件,其实现原理就是初始化不同的ChannelInitializer对象,对不同类型的Channel添加不同的拦截处理
- 启动时,Channel是NioServerSocketChannel,调用的是ServerBootstrapAcceptor
- 连接时,Channel是NioSocketChannel,调用的是用户自定义的InboundHandler
例如:
ServerBootstrap.childHandler(new GearmanServerInitializer())//此用户自定义的的ChannelInitializer
public class GearmanServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("encoder", new Encoder());
pipeline.addLast("handler", new PacketHandler(networkManager));
}
}
Acceptor:ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// ...... 省略无关代码
try {
childGroup.register(child);
} catch (Throwable t) {
// ......省略无关代码
}
}
说明:
当Channel已经Ready后,就会ServerBootstrapAcceptor#channelRead
1:首先把用户自定义的handler注册到pipleline中
2:将已准备好的Channel与childGroup,触发点就是childGroup.register(child);
如果看了上篇文章,会发现两者都是存在注册通道的原理,其实是不同的
- 在server启动时,通过回调bind的监听会把Selector注册事件改为electionKey.OP_ACCEPT
- 而当有连接进来的时候,通过重新注册又把Selector注册事件改为了0
在这一点有点劳民伤财的味道。(其实不是,在下面会专门有注意点提到这点)
MainReactor:EventLoopGroup
如果看了上篇文章,应该知道在server启动时,会启动MainReactor,一直循环执行IO任务和非IO任务
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
//..........省略无关代码
} catch (Throwable t) {
//...........省略无关代码
}
}
}
在上篇文章主要提的是runAllTasks这个方法,主要执行非IO任务
这里主要是来说明下IO任务,selectedKeys不为空
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
执行processSelectedKeysOptimized(selectedKeys.flip());
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
注意的地方就是 final Object a = k.attachment();这个attachment是从哪里来的,看如下selectionKey =javaChannel().register(eventLoop().selector, 0, this);
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
- 对于当server启动时,由于该当前对象是NioServerSocketChannel
- 对于当连接进来是,由于当前对象是NioSocketChannel
在注册整个Selector选择器的时候,把当前通道(Channel)也注册进去了,上面那个劳民伤财其实是句玩笑,在这里体现出两次注册的用意来了
继续以上processSelectedKeysOptimized,其中processSelectedKey就是处理网络Io事件,把该事件发给Acceptor的主要触发点,而有点要
代码如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
int readyOps = -1;
try {
readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
//........省略无关代码
} catch (CancelledKeyException e) {
//.......省略无关代码
}
}
而组装channel,bytebuffer等网络数据是在NioMessageUnsafe#read()中
public void read() {
//... 省略无关代码
try {
for (;;) {
int localRead = doReadMessages(readBuf);
// .......省略无关代码
}
} catch (Throwable t) {
}
for (int i = 0; i < readBuf.size(); i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
// ..... 省略无关代码
}
}
- doReadMessages(readBuf);
把当前请求连接封装成一个Channel(其实就是NioSocketChannel) - pipeline.fireChannelRead(readBuf.get(i));
通知Acceptor来读取,其实就是通知ServerBootstrapAcceptor#channelRead
网友评论