一 整个调用流程图
先放一张调用的流程图,从最下面开始往上调用。
image.png除了最下面的Channel之外,关键的类就是EventLoop,Unsafe,ChannelPipeline,ChannelHandler。下面分别解析一下这四层。
二 EventLoop
从它的名字大概可以看出它是干嘛的:事件循环。实际上,它主要实现了启动新线程不断从Selector中获取可操作的Channel对象的流程。
EventLoop内部有一个Selector,在服务器启动时,会把ServerSocketChannel注册到这个Selector中,之后客户端连接后,会把客户端对应的SocketChannel也注册到Selector中。
EventLoop还会启动一个线程,不断应用Selector.select()方法获取已经准备就绪的Channel,然后根据就绪事件类型交给Unsafe层的不同方法处理
EventLoop是一个接口,具体使用的类是NioEventLoop这个类,它的核心字段如下:
Selector selector;
private final Executor executor;
private final Runnable asRunnable = new Runnable() {
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
}
}
};
- selector就是实例化的Selector对象,所有的Channel包括ServerSocketChannel都会注册到这个selector上。
- executor是一个线程池,这个线程池会不断的执行asRunnable对象,这个Runable的run方法只是简单的调用了SingleThreadEventExecutor.this.run();
在SingleThreadEventExecutor.this.run();方法中,会不断的从Selector对象中获取可操作的Channel对象,然后交给Unsafe处理(后面会说这个类),代码如下:
protected void run() {
processSelectedKeys();
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
经过一系列简单的调用,最终使用的是processSelectedKey方法来处理可操作的key的,而这个processSelectedKey方法实际上会调用unsafe来处理。
从图中的代码,可以看出具体的逻辑:
- 如果是Channel就绪的是READ或者ACCEPT事件(或者说状态),则调用Unsafe的read()方法
- 如果就绪的WRITE事件,则调用Unsafe的forceFlush方法
- 如果就绪的是CONNECT,则调用Unsafe的finishConnect方法
注:实际上,EventLoop不会单独使用,而是会放在EventLoopGroup中使用,这里先不说,后续再详细解释。
三 Unsafe
这个Unsafe与java api中的那个Unsafe不是同一个东西,这个Unsafe是Netty中用于读取数据的,它与Channel是一一对应的,每个Channel中都会有一个Unsafe类型的字段。
Unsafe是一个接口,具体实现类会定义在Channel的内部,如AbstractNioMessageChannel中有一个Unsafe的实现类NioMessageUnsafe
EventLoop在select出Channel后,会根据不同的事件调用Unsafe不同的方法处理。不同的Channel,使用的Unsafe也不同,但是逻辑基本是一致的:
- 处理好当前的就绪事件,例如如果是ACCEPT事件,会把连接建立好,如果是READ事件,会把数据读到缓冲区里。
- 调用ChannelPipeline,触发各种事件。
以NioMessageUnsafe的read方法为例,我们看看他具体做了什么:
- 首先是第一个红框,调用Channel的doReadMessages方法读取数据(Unsage是Channel的内部类,所以实际上这里调用到的是Channel中的方法),然后把数据保存在readBuf中,readBuf的类型如下:
private final List<Object> readBuf = new ArrayList<Object>();
- 然后是第二个红框,针对读取到的每一条消息,都调用pipeline .fireChannelReadComplete方法处理,这里就是在触发ChannelPipeline的事件了。
- 第三个红框,所有的消息都处理完后调用pipeline.fireChannelReadComplete()方法处理
- 第四个红框,如果出现了异常,则调用pipeline.fireExceptionCaught()方法处理
所以实际上Unsafe就做了两件事:调用Channel中的特定方法处理ACCEPT,READ,WRITE,CONNECT事件,并且在特定时刻然后调用ChannelPipeline的特定事件。
不同的Unsafe有不同的逻辑,这篇文章只是讲一下整体的流程,后续再说每个Unsafe具体的逻辑。
四 ChannelPipeline和ChannelHandler
ChannelPipeline是netty的一层抽象,无论是向Channel中写数据还是从Channel中读数据,都要经过ChannelPipeine的处理。
ChannelPipeline使用了类似于Servlet中Filter的方式,在它内部有一个ChannelHandler类型的双向链表。
ChannelHandler是用于具体处理数据用的,这个就是我们可以自己定义的处理数据的对象,可以说,使用Netty的时候,使用者就是通过ChannelHandler来操作数据的。
当Unsafe从Channel读取到数据后,会交给ChannlPipeline处理,ChannelPipeline会从队头节点开始,今次调用所有ChannelHandler处理数据,任何一个ChannelHandler都可以中断当前的流程。
当我们向ChannelPipeline写数据后,ChannelPipeline会从队尾开始,依次调用所有的ChannelHandler处理数据,同样的,任何ChannelHandler也可以中断流程,如果数据经过所有的ChannelHandler处理完毕,就会Channel.write()写出去。
ChannelPipeline工作流程-图片来源于《Netty权威指南》
ChannelPipeline中的方法与ChannelHandler中的方法是一一对应的,例如Unsafe处理READ事件时会调用ChannelPipeline中的fireChannelRead方法,而在fireChannelRead方法中会依次调用各个ChannelHandler中的channelRead方法。
接下来再看看ChannelHandler中有哪些方法,以及它们具体会在什么时候被调用。
4.1 ChannelHandler中读相关的方法
ACCEPT和READ相关的事件被称为inbound事件
- channelRegistered(): Channel注册事件
...待补充
五 Bootstrap
实际上,有了上面的几个类,我们就已经可以自己写一个服务器了,但是还比较麻烦,Bootstrap这一层帮我们封装了一些比较麻烦的细节,调用上面的四个类来组装一个服务器,可以说是Netty的facade了
网友评论