不多BB,我们开门见山
Netty是一个提供了易于使用的API的NIO框架,具有高并发、高性能的特点
Netty的Reactor线程模型
https://blog.csdn.net/bingxuesiyang/article/details/89888664
单线程模型
所有的IO操作都是由一个NIO线程处理。
Reactor内部通过selector 监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立的事件,则由Acceptor处理,Acceptor通过accept接受连接,并创建一个Handler来处理连接后续的各种事件,如果是读写事件,直接调用连接对应的Handler来处理。
Handler完成read->业务处理(decode->compute->encode)->send的全部流程。
这种模型好处是简单,坏处却很明显,当某个Handler阻塞时,会导致其他客户端的handler和accpetor都得不到执行,无法做到高性能,只适用于业务处理非常快速的场景。
缺点:单线程NIO负载过重,并发高时产生任务堆积,延迟过高,不适合并发高的场景
多线程模型
由一个NIO线程处理客户端连接,由一组NIO线程池处理IO操作
主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立事件,则由Acceptor处理,Acceptor通过accept接收连接,并创建一个Handler来处理后续事件,而Handler只负责响应事件,不进行业务操作,也就是只进行read读取数据和write写出数据,业务处理交给一个线程池进行处理。
线程池分配一个线程完成真正的业务处理,然后将响应结果交给主进程的Handler处理,Handler将结果send给client。
单Reactor承当所有事件的监听和响应,而当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈。
缺点:连接处理能力有限,客户连接并发高时,会产生连接延迟过高
主从线程模型
一组NIO线程池处理连接,一组NIO线程池处理IO操作
存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch。
主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accpetor接收,将新的连接分配给某个子线程。
子线程中的subReactor将mainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后续事件
Handler完成read->业务处理->send的完整业务流程。
缺点:无
Netty的核心概念
EventLoop和EventLoopGroup
EventLoopGroup是一个线程组,包含了一组NIO线程,主要用于处理网络事件,例如客户端连接,Channel的事件。每个EventLoop负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个EventLoop。EventLoopGroup实际上就是Reactor线程组,Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,一组EventLoopGroup可以同时并发处理成百上千个客户端连接,另一组EventLoopGroup可以处理客户端的通道事件ChannelHandler和ChannelPipeline
ChannelHandler是一个接口,专门处理 I/O 或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelPipeline是ChannelHandler的容器,而每个Channel都会有一个ChannelPipeline,负责ChannelHandler的管理和事件拦截与调度。Channel会将事件扔到ChannelPipeline中,然后事件会被ChannelPipeline安排一系列的ChannelHandler拦截处理,例如编解码事件、TCP的粘包拆包事件、用户自定义Handler等,经过一系列加工后,事件的消息会被添加缓冲区中等待Channel的刷新和发送。
事件出站(Outbound)和入站(Inbound)
- Inbound事件:通常由I/O线程触发,例如TCP链路连接、链路关闭、读事件、异常通知等,它们对应下图的左半部分,这些方法被封装在ChannelInboundHandler里面
- Outbound事件:通常是用户主动发起的网络IO操作,例如发起连接操作、绑定操作、消息发送等,它们对应下图的右半部分,这些方法被封装在ChannelOutboundHandler里面
ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API,当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。
与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。
ChannelHandlerContext
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。
ServerBootstrap和Bootstrap
ServerBootstrap是服务端的启动助手,Bootstrap是客户端的启动助手,它们的目的主要是降低开发的复杂度
TCP的粘包和拆包
在TCP协议中一个完整的数据包可能会被TCP拆分为多个包发送,或者将多个小的数据包封装成大的数据包发送,这就会发生TCP的粘包和拆包的问题。
产生原因:
- 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
- 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
- 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
- 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
TCP的粘包和拆包的解决方法
粘包问题解决
- FixedLengthFrameDecoder:定长协议解码器,我们可以指定固定的字节数算一个完整的报文
- LineBasedFrameDecoder:行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文
- DelimiterBasedFrameDecoder:分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以自己指定
- LengthFieldBasedFrameDecoder:长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的
- JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是一个完整的json对象或者json数组。
Netty的默认线程数
Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。
Netty高性能的原因
- IO 线程模型:同步非阻塞,用最少的资源做更多的事。
- 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
- 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
- 串形化处理读写:避免使用锁带来的性能开销。
- 高性能序列化协议:支持 protobuf 等高性能序列化协议。
Netty的序列化协议:
-
XML,优点:人机可读性好,可指定元素或特性的名称。缺点:序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息;只能序列化公共属性和字段;不能序列化方法;文件庞大,文件格式复杂,传输占带宽。适用场景:当做配置文件存储数据,实时数据转换。
-
JSON,是一种轻量级的数据交换格式,优点:兼容性高、数据格式比较简单,易于读写、序列化后数据较小,可扩展性好,兼容性好、与XML相比,其协议比较简单,解析速度比较快。缺点:数据的描述性比XML差、不适合性能要求为ms级别的情况、额外空间开销比较大。适用场景(可替代XML):跨防火墙访问、可调式性要求高、基于Web browser的Ajax请求、传输数据量相对小,实时性要求相对低(例如秒级别)的服务。
-
Fastjson,采用一种“假定有序快速匹配”的算法。优点:接口简单易用、目前java语言中最快的json库。缺点:过于注重快,而偏离了“标准”及功能性、代码质量不高,文档不全。适用场景:协议交互、Web输出、Android客户端
-
Thrift,不仅是序列化协议,还是一个RPC框架。优点:序列化后的体积小, 速度快、支持多种语言和丰富的数据类型、对于数据字段的增删具有较强的兼容性、支持二进制压缩编码。缺点:使用者较少、跨防火墙访问时,不安全、不具有可读性,调试代码时相对困难、不能与其他传输层协议共同使用(例如HTTP)、无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议。适用场景:分布式系统的RPC解决方案
-
Avro,Hadoop的一个子项目,解决了JSON的冗长和没有IDL的问题。优点:支持丰富的数据类型、简单的动态语言结合功能、具有自我描述属性、提高了数据解析速度、快速可压缩的二进制数据形式、可以实现远程过程调用RPC、支持跨编程语言实现。缺点:对于习惯于静态类型语言的用户不直观。适用场景:在Hadoop中做Hive、Pig和MapReduce的持久化数据格式。
-
Protobuf,将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的-POJO对象和Protobuf相关的方法和属性。优点:序列化后码流小,性能高、结构化数据存储格式(XML JSON等)、通过标识字段的顺序,可以实现协议的前向兼容、结构化的文档更容易管理和维护。缺点:需要依赖于工具生成代码、支持的语言相对较少,官方只支持Java 、C++ 、python。适用场景:对性能要求高的RPC调用、具有良好的跨防火墙的访问属性、适合应用层对象的持久化
-
protostuff 基于protobuf协议,但不需要配置proto文件,直接导包即可
-
Jboss marshaling 可以直接序列化java类, 无须实java.io.Serializable接口
-
MessagePack 一个高效的二进制序列化格式
-
Hessian 采用二进制协议的轻量级remoting onhttp工具
-
kryo 基于protobuf协议,只支持java语言,需要注册(Registration),然后序列化(Output),反序列化(Input)
Netty实战:1、利用Netty实现的聊天室 2、利用Netty实现Http协议 3、利用Netty实现Websocket协议
1、利用Netty实现的聊天室:
Sever端
public class ChatServer {
private int port;
public ChatServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup works=new NioEventLoopGroup();
try {
ServerBootstrap boot=new ServerBootstrap();
boot.group(boss,works)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("line",new LineBasedFrameDecoder(1024));
pipeline.addLast("encode", new StringEncoder());
pipeline.addLast("decode", new StringDecoder());
pipeline.addLast(new ServerHander());
}
})
.option(ChannelOption.SO_BACKLOG,128)
.option(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = boot.bind(this.port).sync();
System.out.println("服务已经启动...............");
future.channel().closeFuture().sync();
System.out.println("服务已经关闭...............");
}catch (Exception e){
}finally {
boss.shutdownGracefully();
works.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatServer(8888).start();
}
}
SeverHander
public class ServerHander extends SimpleChannelInboundHandler<String> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 每当从客户端有消息写入时
*
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
Channel inComing = channelHandlerContext.channel();
for (Channel channel : channels) {
if (channel != inComing) {
channel.writeAndFlush("[用户" + inComing.remoteAddress() + " 说:]" + s + "\n");
} else {
channel.writeAndFlush("[我说:]" + s + "\n");
}
}
}
/**
* 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列
*
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();//获得客户端通道
//通知其他客户端有新人进入
for (Channel channel : channels) {
if (channel != inComing) {
channel.writeAndFlush("[欢迎: " + inComing.remoteAddress() + "] 进入聊天室!\n");
}
}
channels.add(inComing);//加入队列
}
/**
* 断开连接
*
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel outComing = ctx.channel();//获得客户端通道
//通知其他客户端有人离开
for (Channel channel : channels) {
if (channel != outComing) {
channel.writeAndFlush("[再见: ]" + outComing.remoteAddress() + " 离开聊天室!\n");
}
}
channels.remove(outComing);
}
/**
* 当服务器监听到客户端活动时
*
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 进入聊天室");
}
/**
* 离线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel inComing = ctx.channel();
System.out.println("[" + inComing.remoteAddress() + "]: 离线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel inComing = ctx.channel();
System.out.println(inComing.remoteAddress() + "通讯异常!");
ctx.close();
}
}
Client端
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
EventLoopGroup works = new NioEventLoopGroup();
try {
Bootstrap boot = new Bootstrap();
boot.group(works).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("line",new LineBasedFrameDecoder(1024));
pipeline.addLast("encode", new StringEncoder());//编码器
pipeline.addLast("decode", new StringDecoder());//解码器
pipeline.addLast(new ClientHander());
}
});
ChannelFuture future = boot.connect(this.host, this.port).sync();
System.out.println("客户端已经连接");
future.channel().closeFuture().sync();
System.out.println("客户端已经关闭");
} catch (Exception e) {
} finally {
works.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatClient("localhost",8888).start();
}
}
ClientHander
public class ClientHander extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new Thread(()-> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine()+"\n";
ByteBuf buffer = Unpooled.buffer(s.length());
buffer.writeBytes(s.getBytes());
ctx.writeAndFlush(buffer);
}
}).start();
}
}
启动Sever
image.png启动一个Client
image.png启动第二个Client
image.png image.png image.png image.png2、利用Netty实现的Http协议:
Server端
public class HttpServer {
public static void main(String[] args) throws Exception {
// 定义一对线程组
// 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// netty服务器的创建, ServerBootstrap 是一个启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // 设置主从线程组
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
// 启动server,并且设置8088为启动的端口号,同时启动方式为同步
ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
// 监听关闭的channel,设置位同步方式
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 通过SocketChannel去获得对应的管道
ChannelPipeline pipeline = channel.pipeline();
// 通过管道,添加handler
// HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器
// 当请求到服务端,我们需要做解码,响应到客户端做编码
pipeline.addLast("HttpServerCodec", new HttpServerCodec());
// 添加自定义的助手类,返回 "hello netty~"
pipeline.addLast("customHandler", new CustomHandler());
}
}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
throws Exception {
// 获取channel
Channel channel = ctx.channel();
if (msg instanceof HttpRequest) {
// 显示客户端的远程地址
System.out.println(channel.remoteAddress());
// 定义发送的数据消息
ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
// 构建一个http response
FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
// 为响应增加数据类型和长度
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 把响应刷到客户端
ctx.writeAndFlush(response);
}
}
/*
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel注册到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel取消和NioEventLoop的绑定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel准备就绪");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel被关闭");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel读数据完成");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("用户事件触发。。。");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel可写更改");
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("捕获到异常");
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("新事件被添加");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("事件被移除");
super.handlerRemoved(ctx);
}
*/
}
3、利用Netty实现的Websocket协议:
由于Websocket的握手需要使用http,所以在pipline中需要注册支持http的事件HttpServerCodec
Server端
public class WSServer {
public static void main(String[] args) throws Exception {
EventLoopGroup mainGroup = new NioEventLoopGroup();
EventLoopGroup subGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WSServerInitialzer());
ChannelFuture future = server.bind(8088).sync();
future.channel().closeFuture().sync();
} finally {
mainGroup.shutdownGracefully();
subGroup.shutdownGracefully();
}
}
}
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
// 几乎在netty中的网络编程,都会使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024*64));
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
// 如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(8, 10, 12));
// 自定义的空闲状态检测
pipeline.addLast(new HeartBeatHandler());
/**
* websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
* 本handler会帮你处理一些繁重的复杂的事
* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new ChatHandler());
}
}
TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用于记录和管理所有客户端的channle
private static ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
throws Exception {
// 获取客户端传输过来的消息
String content = msg.text();
System.out.println("接受到的数据:" + content);
// for (Channel channel: clients) {
// channel.writeAndFlush(
// new TextWebSocketFrame(
// "[服务器在]" + LocalDateTime.now()
// + "接受到消息, 消息为:" + content));
// }
// 下面这个方法,和上面的for循环效果是一样的
clients.writeAndFlush(
new TextWebSocketFrame(
"[服务器在]" + LocalDateTime.now()
+ "接受到消息, 消息为:" + content));
}
/**
* 当客户端连接服务端之后(打开连接)
* 获取客户端的channle,并且放到ChannelGroup中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
System.out.println("客户端被移除,channelId为:" + channelId);
//System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
//System.out.println("客户端断开,channle对应的短id为:" + ctx.channel().id().asShortText());
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
clients.remove(ctx.channel());
}
/**
发生异常时进行捕获
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
ctx.channel().close();
clients.remove(ctx.channel());
}
}
心跳检测
/**
* @Description: 用于检测channel的心跳handler
* 继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0方法
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent)evt; // 强制类型转换
if (event.state() == IdleState.READER_IDLE) {
System.out.println("进入读空闲...");
} else if (event.state() == IdleState.WRITER_IDLE) {
System.out.println("进入写空闲...");
} else if (event.state() == IdleState.ALL_IDLE) {
System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
Channel channel = ctx.channel();
// 关闭无用的channel,以防资源浪费
channel.close();
System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
}
}
}
}
网友评论