NIO深入理解
- 零拷贝
-
在理解0拷贝之前 我们应该先需要了解传统IO的一个操作流程
1. 传统的io操作:首先需要进行一个 read操作 这里会发生一次用户空间切换到内核空间 内核会采用DMA(直接内存访问的方式)从磁盘读取数据到内核缓冲区
2. 内核缓冲区将数据拷贝到用户空间 同时再次上下文切换到 用户空间
3. wirte 操作 也会发生一次上下文切换到内核空间 同时将数据拷贝到内核缓冲区
4. 内核空间会将数据拷贝到 socket buffer 在由 socket buffer 写入到协议引擎进行数据的发送(这里有一个异步的过程 写操作后 会切换到用户空间)- 用户发送 sendfile 命令 进行一次上下文切换到内核空间 并采用DMA方式将数据拷贝到内核缓冲区
- 这里内核缓冲区的数据不会拷贝到 socket buffer而是将文件的描述信息放入如 内存地址 以及文件长度信息(异步 进行以此上下文切换到用户空间)
- 协议引擎会根据 socketbuffer 中的文件描述信息 直接从内核缓冲区将数据写入 协议引擎发送出去
-
什么是Reactor模式
-
深入理解 Reactor模式对理解netty的设计模式有莫大的帮助,所以在理解netty的设计模式时,我们需要理解Reactor他的一个设计理念:Reactor的五大角色构成
- Handle (句柄或是描述符)
- 即操作系统中的句柄,是操作系统对资源的一种抽象,可以是打开的文件、一个连接(Socket)、Timer等。在网络编程中,一般指Socket Handle,文件描述符(fd)。将这个Handle注册到Synchronous Event Demultiplexer中,就可以它发生的事件,如READ、WRITE、CLOSE等事件;handle本身是事件产生的发源地
- Synchronous Event Demultiplexer
- 同步事件多路分用器,本质上是系统调用。比如linux中的select、poll、epoll等。它会一直阻塞直在handle上,直到有事件发生时才会返回 (在java nio领域中对应的组件是selector,对应的阻塞方法就是 select方法)
- Initiation Dispatcher
- 初始分发器,它提供了注册、删除与转发event handler的方法。当Synchronous Event Demultiplexer检测到handle上有事件发生时,便会通知initiation dispatcher调用特定的event handler的回调(handle_event())方法。
- Event Handler
- 事件处理器,定义事件处理的回调方法:handle_event(),以供InitiationDispatcher回调使用。(Netty相对于java nio在事件处理器上进行了升级,它为我们开发者提供了大量的回调方法,供我们在特定的事件产生时实现相应的回调方法进行业务逻辑的处理)
- Concrete Event Handler
- 具体的事件处理器,继承自Event Handler,在回调方法中会实现具体的业务逻辑
-
Reactor模式的流程
- 当应用向Initiation Dispatcher 注册具体的事件处理器时,应用会标识出该事件处理器希望Initiation Dispatcher在某个事件发生时向其通知的该事件,该事件与handle关联
- Initiation Dispatcher会要求每个事件处理器向其传递内部handle。该handle向操作系统标识了事件处理器
- 当所有的事件处理器注册完毕后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环。这时,Initiation Dispatcher会将每个注册的事件管理器的handle合并起来,并使用同步事件分离器等待这些事件的发生。比如说:TCP协议层会使用select同步事件分离器操作来等待客户端发送数据到达连接的socket handle上
- 当与某个事件源对应的Handle变为ready状态时(比如:TCP socket变为等待读状态时)。同步事件分离器就会通知Initiation Dispatcher
- Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。当事件发生时,Initiation Dispatcher会将被事件源激活的Handle作为【key】来寻找并分发恰当的事件处理器的回调方法。
- Initiation Dispatcher会回调事件处理器的handle_events回调方法来执行特定与应用的功能(开发者自己编写功能)从而响应这个事件。所发生的事件类型可以作为该方法的参数并被该方法内部使用来执行额外的特定于服务的分离与分发。
-
Netty 核心的几个概念
- 一个EventLoopGroup当中包含一个或多个EventLoop
- 一个EventLoop在它的整个生命周期当中都只会与唯一一个Thread进行绑定
- 所有由EventLoop所处理的各种I/O事件都将在它所关联的那个Thread上进行处理
- 一个Channel在它的整个生命周期中只会注册在一个EventLoop上
- 一个EventLoop在运行过程当中,会被分配到多个Channel
-
结论:
- 在Netty中,Channel的实现一定是线程安全的,基于此,我们可以存储一个Channel的引用,并且在需要向远端发送数据时,通过整个引用来调用Channel的相应方法;即便当时有很多线程都在使用它也不会出现线程问题;而且消息一定会按照顺序发送出去。
- 我们在业务开发中,不要将长时间执行的耗时任务放入到EventLoop的执行队列中,因为它将会一直阻塞该线程所对应的所有Channel上的其他执行任务,如果我们需要进行阻塞调用或是耗时的操作,那么我们就需要使用一个专门的EventExecutor(业务线程池)
- JDK所提供的Future只能通过手工的方式检查执行结果,而这个操作是会阻塞的;Netty则对ChannelFuture进行了增强,通过ChannelFutureListener以回调的方式获取结果,去除了手工检查的操作(观察者模式);值得注意的是:ChannelFutureListener的operationComplete方法是由I/O线程执行的,因此要注意的是不要在这里执行耗时的操作,否则需要通过另外的线程或线程池来执行。
- 在Netty中有两种发送消息的方式,可以直接写到Channel中,也可以写到ChannelHandler所关联的那个ChannelHandlerContext中。对于前一种方式来说,消息会从ChannelPipline的末尾开始流动;对于后一种方式来说,消息将从ChannelPipline中的下一个ChannelHandler开始流动。
- ChannelHandlerContext与ChannelHandler之间的关联绑定关系是永远都不会发生改变的,因此对其进行缓存是没有任何问题的。
- 对于Channel的同名方法来说,ChannelHandlerContext的方法将会产生更短的事件流,所以我们应该在可能的情况下利用这个特性来提升应用的性能。
- 在实际的开发中我们经常会遇到一个服务端可能会去要调用另外一个客户端,这时这个服务端的角色就相当于即作为服务端也作为客户端。这时我们需要注意在我们作为客户端时我们应该将对服务端和客户端的channel绑定在同一个eventLoop上;
-
Netty 提供的三种缓冲区类型
- heap buffer 堆缓冲区
- 优点:由于数据是存储在JVM的堆中,因此可以快速的创建于快速的释放,并且他提供了直接访问内部字节数组的方法
- 缺点:每次的读写操作,都需要先将数据复制到直接缓冲区中在进行网络传输
- direct buffer 直接缓冲区(在堆之外直接分配内存空间,直接缓冲区不会占用堆的容量空间,因为他是由操作系统在本地内存进行的数据分配)
- 优点:在使用Sockte进行数据传递时,性能非常好,因为数据直接位于操作系统的本地内存中,所以不需要从JVM将数据复制到直接缓冲区,性能很好。
- 缺点:因为Direct Buffer是直接在操作系统内存中,所以内存空间的分配与释放要比堆空间更加复杂,而且速度慢一些。(Netty通过提供内存池来解决这个问题)
- 注意:直接缓冲区不支持通过字节数组的方式来直接访问数据(对于后端的业务消息的编解码来说,推荐使用HeapByteBuf;对于I/O通信线程在读写缓冲区时,推荐使用DirectByteBuf)
- composite buffer 复合缓冲区
- heap buffer 堆缓冲区
-
JDK的ByteBuffer和Netty的ByteBuf的差异比对
- Netty的ByteBuf采用读写索引分离的策略(readerIndex与writerIndex),一个初始化(里面尚未有任何数据)的ByteBuf的readerIndex与witerIndex值都为0
- 当读索引和写索引处于同一个位置时,如果我们继续读取,那么就会抛出IndexOutofBoundsException
- 对于ByteBuf的任何读写操作都会分别单独维护读索引与写索引。maxCapacity最大的容量默认是 Integer.MAX_VALUE。
-
-
代码实例:
public static void main(String[] args) {
// 创建一个长度为10 的Bytebuf
ByteBuf byteBuf = Unpooled.buffer(10);
for (int i=0;i<10;i++) {
byteBuf.writeByte(i);
}
for ( int i=0; i<byteBuf.capacity();i++){
System.out.println(byteBuf.getByte(i));
}
}
public static void main(String[] args) {
ByteBuf byteBuf = Unpooled.copiedBuffer("hello你 world", Charset.forName("utf-8"));
if( byteBuf.hasArray()){
// hasArray 判断这个byteBuf背后真正的支持是不是一个字节数组 如果是 表示这是一个堆上的缓冲
// 获取byteBuf背后的真正数据载体
byte[] array = byteBuf.array();
System.out.println(new String(array,Charset.forName("utf-8")));
System.out.println(byteBuf);
System.out.println(byteBuf.arrayOffset());
System.out.println(byteBuf.readerIndex());
System.out.println(byteBuf.writerIndex());
System.out.println(byteBuf.capacity());
System.out.println(byteBuf.readableBytes());
while (byteBuf.isReadable()){
System.out.println((char) byteBuf.readByte());
}
}
}
public static void main(String[] args) {
// 创建一个 CompositeBuffer 复合缓冲区
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(10);
ByteBuf directBuf = Unpooled.directBuffer(8);
// 将堆缓冲 和直接缓冲添加到复合缓冲区中
compositeByteBuf.addComponents(heapBuf,directBuf);
//compositeByteBuf.removeComponent(0);
Iterator<ByteBuf> iterator = compositeByteBuf.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
compositeByteBuf.forEach(System.out::println);
}
网友评论