什么是Netty
Netty可以帮助我们快速搭建服务端与客户端的Socket网络通信库,他是对JDK的NIO的封装
问?什么是IO,什么是NIO
NIO:非阻塞IO编程
IO:阻塞IO编程
什么是IO编程
- IO编程中服务端实现
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
// (1) 接收新连接线程
new Thread(() -> {
while (true) {
try {
// (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();
// (2) 每一个新的连接都创建一个线程,负责读取数据
new Thread(() -> {
try {
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println(new String(data, 0, len));
}
}
} catch (IOException e) {
}
}).start();
} catch (IOException e) {
}
}
}).start();
}
}
- IO编程中客户端实现
public class IOClient {
public static void main(String[] args) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
try {
socket.getOutputStream().write((new Date() + ": hello world").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
} catch (Exception e) {
}
}
} catch (IOException e) {
}
}).start();
}
}
缺点:IO编程模型在客户端较少的情况下运行良好,对于客户端比较多的业务来说,单机服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了
原因:每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环,那么1w个连接对应1w个线程,继而1w个while死循环,这就带来如下几个问题:
1.线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
2.线程切换效率低下:单机cpu核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
3.IO编程中,我们看到数据读写是以字节流为单位,效率不高。
NIO编程
为了解决上面三个问题,NIO横空出世
线程资源受限
NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责。
imageNIO模型中selector:把连接注册到selector上,然后,通过检查这个selector来批量监测是否有数据可读的连接,进而读取数据。
举例。
实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于IO模型中一个线程管理一条连接,消耗的线程资源大幅减少
线程切换效率低下
由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高
IO读写以字节为单位
NIO解决这个问题的方式是数据读写不再以字节为单位,而是以字节块为单位。
IO:每次都是从操作系统底层一个字节一个字节地读取数据。
NIO:维护一个缓冲区,每次可以从这个缓冲区里面读取一块的数据。
NIO服务端实现
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
new Thread(() -> {
try {
while (true) {
// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 读取数据以块为单位批量读取
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}
- NIO模型中通常会有两个线程,每个线程绑定一个轮询器selector。
例子中的:
serverSelector
负责轮询是否有新的连接
clientSelector
负责轮询连接是否有数据可读 - 服务端监测到新的连接之后,不再创建一个新的线程,而是直接将新连接绑定到
clientSelector
上。 -
clientSelector
被一个while死循环包裹着,如果在某一时刻有多条连接有数据可读,那么通过clientSelector.select(1)
方法可以轮询出来,进而批量处理。 - 数据的读写以内存块为单位。
原生NIO缺点:
1、JDK的NIO编程需要了解很多的概念,编程复杂,对NIO入门非常不友好,编程模型不友好,ByteBuffer的api简直反人类
2、对NIO编程来说,一个比较合适的线程模型能充分发挥它的优势,而JDK没有给你实现,你需要自己实现,就连简单的自定义协议拆包都要你自己实现
3、JDK的NIO的selector该实现饱受诟病的空轮训bug会导致cpu飙升100%
Netty编程
简单来说:Netty封装了JDK的NIO
官方说明:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
使用Netty不使用JDK原生NIO的原因:
- 编程简单:使用JDK自带的NIO需要了解太多的概念,编程复杂,一不小心bug横飞
- io切换:Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型
- Netty自带的拆包解包:异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
- niobug:Netty解决了JDK的很多包括空轮询在内的bug
- nio优化:Netty底层对线程,selector做了很多细小的优化,精心设计的reactor线程模型做到非常高效的并发处理
- 协议栈:自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
Netty服务端
public class NettyServer {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8000);
}
}
1.boos对应,IOServer.java中的接受新连接线程,主要负责创建新连接
2.worker对应 IOClient.java中的负责读取数据的线程,主要用于读取数据以及业务逻辑处理
Netty客户端
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true) {
channel.writeAndFlush(new Date() + ": hello world!");
Thread.sleep(2000);
}
}
}
Netty重要类介绍
EventLoopGroup
事件循环组(NioEventLoopGroup是异步事件循环组)。EventLoopGroup中包含了多个EventLoop。主要提供了两类方法:
① next()方法用于返回下一个EventLoop来使用。
② register方法,来将一个Channel注册到EventLoop当中,同时返回一个ChannelFuture,当注册完成的时候这个ChannelFuture将得到一个通知。可以见该方法是一个异步的方法,在调用完register后会立即返回,然后我们根据ChannelFuture中的相关方法来判断注册操作是否完成。
NioEventLoopGroup ——— 异步事件循环组
它是MultithreadEventLoopGroup的一个实现,它是用于基于Channel的NIO Selector。
EventLoop
事件循环类。将处理一个已经注册到该EventLoop的Channel的所有I/O操作。
服务I/O和Channels事件的EventLoops包含在一个EventLoopGroup里。EventLoops被创建和分配的方式是根据传输实现而有所不同。
NioEventLoop
- NioEventLoop是一个基于JDK NIO的异步事件循环类,它负责处理注册在它其中的Channel的所有事件。
- NioEventLoop的整个生命周期只会依赖于一个单一的线程来完成。
- 一个NioEventLoop可以分配给多个Channel,NioEventLoop通过JDK Selector来实现I/O多路复用,以对多个Channel进行管理。
- 如果调用Channel操作的线程是EventLoop所关联的线程(注册到Selector的Channel的监控如连接、读、写操作)那么该操作会被立即执行。否则会将该操作封装成任务放入EventLoop的任务队列中。
SingleThreadEventExecutor
- 它会执行所有提交的任务在一个单一的线程中。而OrderedEventExecutor作为一个标记接口,它会执行所有提交的任务以有序/连续的方式
- 持有一个MpscQueue taskQueue成员变量,来维护提交上来的任务。
- SingleThreadEventExecutor中还维护有该线程的五个状态:a)ST_NOT_STARTED;b)ST_STARTED;c)ST_SHUTTING_DOWN;d)ST_SHUTDOWN;e)ST_TERMINATED。
SingleThreadEventExecutor的execute(Runnable task)方法:
- execute方法会接收一个个任务,将任务依次放入taskQueue中
- ThreadPerTaskExecutor.execute(Runnable)来创建并启动执行任务的唯一线程
任务线程会在满足如下条件时被创建并执行: - a) 提交任务的线程不为EventLoop所关联的线程
b) EventLoop所关联的线程还不存在,即EventLoop所关联的线程的状态为ST_NOT_STARTED.
如我们的启动程序“serverBootstrap.bind(8080)”就会触发EventLoop所关联的线程创建并执行。 - SingleThreadEventExecutor的doStartThread()方法:会调用SingleThreadEventExecutor.this.run();而这是SingleThreadEventExecutor的一个抽象方法,实际上会调用NioEventLoop类的run()方法,是的我们又回到了NioEventLoop类中,这是一个很重要的方法。
Run事件循环
run为NioEventLoop内的事件循环方法
image imageNioEventLoop的事件循环主要完成下面几件事:
- 根据当前NioEventLoop中是否有待完成的任务得出select策略,进行相应的select操作
- 如果‘selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())’操作返回的是一个>0的值,则说明有就绪的的I/O事件待处理,则直接进入流程②处理。
- 否则,如果返回的是’SelectStrategy.SELECT’则进行select(wakenUp.getAndSet(false))操作:
首先先通过自旋锁(自旋 + CAS)方式获得wakenUp当前的标识,并再将wakenUp标识设置为false。将wakenUp作为参数传入select(boolean oldWakenUp)方法中。 - 注意这个select方法不是JDK NIO的Selector.select方法,是NioEventLoop类自己实现的一个方法,只是方法名一样而已。NioEventLoop的这个select方法还做了一件很重要的时,就是解决“JDK NIO类库的selector bug”问题。
- 处理select操作得到的已经准备好处理的I/O事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务)。
- 如果NioEventLoop所在线程执行了关闭操作,则执行相关的关闭操作处理
ChannelHandler 和 ChannelPipeline
ChannelHandler
Channel生命周期
Channel接口定义了简单但强大的状态模式来紧密的联系ChannelInboundHandler API。包括了4种状态
image
Channel正常生命周期状态改变如下图:
imageChannelPipeline
- ChannelPipeline是一个ChannelHandler的集合,这些ChannelHandler会处理或拦截一个Channel的入站事件或出站操作。
- 每一个新的Channel被创建时都会分配一个新的ChannelPipeline。他们的关系是永久不变的,Channel既不能依附于其他ChannelPipeline也不能和当前ChannelPipeline分离。
- ChannelPipeline根本上是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。
- ChannelPipeline是一个实现了拦截过滤器模式的高级形式,它使得用户能够完全控制事件的处理方式以及ChannelPipeline中的ChannelHandler间的交互。
- 随后该事件通过一个ChannelHandlerContext来实现传递给下一个具有一样父类的处理器,即同一个方向的处理器。
- 一个事件要么被一个ChannelInboundHandler处理要么被一个ChannelOutboundHandler处理。随后该事件通过一个ChannelHandlerContext来实现传递给ChannelPipeline中的下一个具有一样父类的处理器,即同一个方向的处理器。
- ChannelHandler在ChannelPipeline中的顺序是由我们通过ChannelPipeline.add*()方法来确定的。
下图描述了ChannelPipeline中的ChannelHandlers是如何处理I/O事件的
下表列出了ChannelInboundHandler接口生命周期的所有方法。这些方法将被调用在接收数据或者Channel相关状态改变时。正如我们早前提及的,这些方法与Channel的生命周期紧密映射。
image相同的方法通过ChannelHandlerContext被调用时,它将从当前关联的ChannelHandler开始并传播给管道中下一个能够处理该事件的ChannelHandler
HeadContext 与 TailContext
image- 赋值NioServerSocketChannel给成员变量channel
- 根据channel构建SucceededChannelFuture、VoidChannelPromise成员变量
- 构建ChannelHandler链表头HeadContext和链表尾TailContext,并赋值给成员变量head、tail。将head.next指向tail,将tail.prev指向head。
HeadConext作为链表头,它同时实现了ChannelOutboundHandler和ChannelInboundHandler,也就是说它即会处理入站数据也会处理出站数据、它持有NioMessageUnsafe对象,该类用于完成Channel真实的I/O操作和传输。
TailContext作为ChannelHandler链中的最后一个ChannelHandler,它仅实现了ChannelInboundHandler,因此TailContext是入站事件的最后一个ChannelHandler,它主要完成了一些资源的释放工作。
入站事件会依次被从head ——> ... ——> tail中的所有ChannelInboundHandler处理。
出站事件会依次被从tail ——> ... ——> head中的所有ChannelOutboundHandler处理。
注意,我们程序中通过add*(...)加进来的ChannelHandler都会处于head和tail之间,也就是说链表头是HeadConext,链表尾是TailContext,这是固定不会改变的。
Bootstrapping
-
引导客户端和服务器端
image.png
1.ServerBootstrap绑定本地port, Bootstrap绑定远程host和port
2.服务端(ServerBootstrap)需要两个EventLoopGroup( 这两个可以是同一个实例 )。客户端需要一个EventLoopGroup。
一个服务端需要两个不同的Channel集合。第一个集合包含了ServerChannel,该ServerChannel代表服务自己所监听的绑定本地端口的socket。第二个集合将包含所有已经创建了的Channel,这些Channel ( 该Channel由ServerChannel创建 )用于处理客户端连接,服务端收到的每一个客户端的连接都将创建一个Channel。
引用参考:
网友评论