我们应该已经知道,Netty是一个基于NIO的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。Netty在Java NIO的基础上提供了更高层的抽象和封装,因此要想对Netty有所深入了解,势必要对Java.NIO有所了解,而NIO是对传统IO由阻塞向异步非阻塞IO的巨大跨越,因此了解传统Java.IO对了解Java.NIO也大有裨益。
传统IO弊端
首先我们看下传统IO的网络编程的一个简单例子,由此将进入对传统Java.IO的介绍:
public static void main(String[] agrs) throws Exception{
ServerSocket serverSocket = new ServerSocket(8899);
while (true) {
Socket socket = serverSocket.accept();
new Thread(() -> {
try {
BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientInputStr = input.readLine();
System.out.println("客户端"+socket.getRemoteSocketAddress()+"发过来的内容:" + clientInputStr);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
java.io最核心的一个概念是流(Stream),java.io可以看成是面向流的编程,它将数据的输入输出抽象为流,流是一组有顺序的,单向的,有起点和终点的数据集合,就像水流。在Java中,一个流要么是输入流,要么是输出流。按照流中的最小数据单元又分为字节流和字符流。
字节流:以 8 位(即1byte=8bit)作为一个数据单元,数据流中最小的数据单元是字节。它的顶级父类是InputStream和OutputStream。
字符流:以 16 位(即1char=2byte=16bit)作为一个数据单元,数据流中最小的数据单元是字符, Java 中的字符是 Unicode 编码,一个字符占用两个字节。它的顶级父类是Reader和Writer。
所有的Java IO流都是阻塞的,这意味着,当一条线程执行accept(),read()或者write()方法时,这条线程会一直阻塞直到有连接请求,或读取到了一些数据或者要写出去的数据已经全部写出,在这期间这条线程不能做任何其他的事情,而如果想支持多个连接,那就需为每个连接新开个线程去支持读写操作,如上代码所示。这种模式在用户负载增加时,性能将下降非常的快(大家应该都知道无限开线程的后果)。
非阻塞Reactor模式引入
随着网络应用的发展和网络服务的用户逐渐增多,需要有一种新的方案去解决传统网络的这种问题,在上世纪90年代,便提出了一种Reactor模式,Reactor模式是一种高并发事件驱动的网络服务模式,它的实现可以用Java实现、C++实现或其他语言实现,而java.nio就是依照reactor模式设计的,此外,其他的一些框架也采用(或实现)了Reactor模式,如Redis,Nginx,Netty(Netty是Java NIO更高层的抽象)等。Reactor的框架及流程图如下所示(参照Douglas C. Schmidt 的《Reactor》):
reactor模式及逻辑流程.png
它的结构包括了5个部分,因为这些结构和Java nio的实现有些出入(有些对不上号,有些需用户自己实现),此不介绍了,感兴趣的读者可以参考Douglas C. Schmidt 的《Reactor》一文介绍。而大家对Reactor模式的认识更喜好Doug Lea 《Scalable IO in Java》中的一文介绍。如下图所示:
单线程的Reactor模式
Java NIO网络编程
已知了java.nio就是依照reactor模式设计的,我们再看一个Java NIO网络编程的一个简单例子,由此将进入对传统Java NIO的介绍:
public static void main(String[] agrs) throws Exception{
int portsNum = 5;
int[] ports = new int[portsNum];
for (int i = 0; i < portsNum; i++){
ports[i] = 9000 + i;
}
Selector selector = Selector.open();
for (int i = 0; i < ports.length; i++){
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(ports[i]));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端监听端口:" + ports[i]);
}
while(true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for(Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext();){
SelectionKey selectionKey = it.next();
if (selectionKey.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
it.remove();
System.out.println("接受客户端连接:" + socketChannel);
}else if (selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
int byteRead = 0;
while(true){
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
byteBuffer.clear();
int read = socketChannel.read(byteBuffer);
if (read <= 0){
break;
}
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteRead += read;
}
it.remove();
System.out.println("读取:" + byteRead + ", 来自:" + socketChannel);
}
}
}
}
从上述代码可以看出,一个单线程的java.nio的网络编程流程基本为:
1.创建一个selector;
2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
(tips:即使上面的serverSocketChannel.configureBlocking(false);及关心事件用完即删it.remove();在Netty中也会有相应源码)
由此再看[reactor模式及逻辑流程.png]一图,java.nio的流程和该图中的结构及流程图是非常相似的,
Selector:相当于Synchronous Event Demultiplexer。
SelectionKey: 相当于event,和一个SocketChannel关联。
SocketChannel:相当于handle。
java nio中没有提供initial dispatcher的抽象,这部分功能需要用户自行实现。
java nio中没有提供event handler的抽象,这部分功能需要用户自行实现。
Java NIO有三大核心组件:
1.Channel
Java NIO中的所有I/O操作都基于Channel对象,就像流操作都要基于Stream对象一样。一个Channel(通道)代表和某一实体的连接,这个实体可以是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于我们的程序和操作系统底层I/O服务进行交互。但是,一个通道,既可以读又可以写,而一个Stream是单向的。
2.Buffer
NIO中所使用的缓冲区不是一个简单的byte数组,而是封装过的Buffer类,通过它提供的API,我们可以灵活的操纵数据。在Java NIO当中,我们是面向(块)或是缓冲区(buffer)编程的。Buffer本身就是一块内存,底层实现除了数组之外,Buffer还提供了对于数据的结构化访问方式,并且可以追踪到系统的读写过程。
NIO提供了多种 Buffer 类型与Java基本类型相对应(但没有BoolBuffer),如ByteBuffer、CharBuffer、IntBuffer等,区别就是读写缓冲区时的单位长度不一样。Buffer中有3个很重要的变量,它们是理解Buffer工作机制的关键,分别是capacity (总容量)、position (指针当前位置)、limit (读/写边界位置)。
3.Selector
Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生。
通道有如下4个事件可供我们监听:
Accept:有可以接受的连接
Connect:连接成功
Read:有数据可读
Write:可以写入数据了
为什么要用Selector?
如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。
【注:从上述三大组件至此大部分摘自 一文让你彻底理解 Java NIO 核心组件】
Netty对Java NIO的抽象
了解了Java NIO及知道java.nio的网络编程流程后,我们再看Netty源码就显得轻松点了,下面我们将在netty源码中找出java.nio网络编程的影子。
1.创建一个selector;
在创建boss线程组时,我们会调用EventLoopGroup bossGroup = new NioEventLoopGroup();方法,一直点NioEventLoopGroup实现,最后会来到NioEventLoop实现,由selector = openSelector();一句,每个NioEventLoop都会创建一个selector。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
Netty中创建NioServerSocketChannel通道,并注册到boss线程的selector中的源码在bootstrap.bind(address)中,即服务端的ip和端口绑定方法中,先在initAndRegister()方法的
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = config().group().register(channel);
}
ChannelFuture regFuture = config().group().register(channel);一句,由register(channel)方法及类的继承关系,进入SingleThreadEventLoop的register(Channel channel)方法
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
然后在在AbstractChannel的register(EventLoop eventLoop, final ChannelPromise promise)方法中
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
进入该类下私有的register0(promise);方法,
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
...
} catch (Throwable t) {
}
}
最后进入doRegister();的实现类重写方法,即AbstractNioChannel下的doRegister()方法中,
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
由javaChannel().register(eventLoop().selector, 0, this);一句,可见将NioServerSocketChannel通道,注册到了boss线程的selector中。
我们再看客户端的channel注册:
boss线程启动后,会监听客户端的连接,主要是在下面代码中监听的(见《Netty的启动过程二》一文介绍):
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
在《Netty的启动过程二》中知道,当有客户端请求连接时,会进入内部类NioMessageUnsafe的read()方法中,继而在doReadMessages(readBuf);方法,然后在buf.add(new NioSocketChannel(this, ch));一句中,创建用于读取和写入数据的NioSocketChannel,并注册关心事件SelectionKey.OP_READ;此后,仍然在内部类NioMessageUnsafe的read()方法中,在pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)方法(上述过程详见《Netty的启动过程二):
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
由childGroup.register(child)一句,便和NioServerSocketChannel通道注册一样,将NioSocketChannel注册到了worker线程的selector中。
3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
由《Netty的启动过程二》一文其实已经写明了,它是在这里无限循环读取NioServerSocketChannel(NioSocketChannel)上发生的关心事件然后各自channel的handler处理的。
@Override
protected void run() {
for (;;) {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
...
}
processSelectedKeys();
}
}
从上述分析可知,Netty很巧妙的封装了Java NIO支持,提供了reactor的所有封装,在一定程度上简化了nio网络编程,用户在使用中只需实现网络数据包的event handler即可。当然,还需了解服务端和客户端的启动模式,并知晓如何监听连接的,如何读取数据的,及数据在ChannelPipeline中的流向,再以netty自带的编解码工具,出站入站适配工具,便可对Netty上手了。
网友评论