IO/NIO/AIO 的区别:
- IO和NIO 又称为Blocking IO 和 No Blocking IO 即为阻塞,非阻塞IO。
- IO每一个连接都需要建立一个线程,NIO不需要。
- NIO主要有buffer、channel、selector三种技术的整合,通过零拷贝的buffer取得数据,每一个客户端通过channel在selector(多路复用器)上进行注册。服务端不断轮询channel来获取客户端的信息。channel上有connect,accept(阻塞)、read(可读)、write(可写)四种状态标识。根据标识来进行后续操作。所以一个服务端可接收无限多的channel。不需要新开一个线程。大大提升了性能。
- AIO--NIO升级版:AIO 通过调用accept方法,一个会话接入之后再次调用(递归)accept方法,监听下一次会话,读取也不再阻塞,回调complete方法异步进行。不再需要selector 使用channel线程组来接收。
- 但是不管是哪种IO技术,编码都很复杂。故出现了netty。基于NIO。提供更简单的API让网络编程代码更容易,只需关注业务逻辑。
Netty
主流的rpc框架都使用了该框架。Hadoop的Avro 、Dubbo 、RocketMQ等。 建议学习网站 http://ifeve.com/netty5-user-guide/
相关原理
- 底层为NIO
- 核心:基于reactor 模式。Reactor模式基于事件驱动, “don't call us,we will call u ”的思想,事件都是由注册器回调,不主动取。适合处理海量I/O事件。连接发起时客户端的channel在reactor thread pool上注册,注册成功后,进入IO pool,后续读写操作都在IO pool中进行。Server端有两个线程组,一个用于接收客户端的连接,一个用于已连接客户端的读写操作。故我们在建server时,都是建立两个工作组的:
//1 用于接受客户端连接的线程工作组
EventLoopGroup boss = new NioEventLoopGroup(); //NioEventLoop聚合了多路复用器selector。可处理大量的客户端连接
//2 用于对接受客户端连接读写操作的线程工作组
EventLoopGroup work = new NioEventLoopGroup();
//TWO:
//3 辅助类。用于帮助我们创建NETTY服务
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work) //绑定两个工作线程组
- netty底层是zero-copy原理。网络socket直接从内核空间中读取buffer。而不是传统的先读取到application中。再传送出去。
-
传统方式:
中间需要过一次Application -
zero-copy方式:
网络socket直接从内核空间中读取buffer
应用场景
- websocket 服务器客户端连接、推送
- 压缩。大文件传输
- 加密 SSL
- RTSP restful文件传输方式
TCP 拆包、粘包解决方案
- 消息定长,比如固定为200字节,如果不够,空位补空格
- 在包尾部增加特殊字符进行分割,例如回车等
- 将消息分为消息头和消息体,在消息头中包含表示消息长度的字段,然后进行业务逻辑处理
/**
*特殊字符分隔方式: 在client端和server端的initChannel中加入配置
*以”$_”为分隔符
*/
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
/**
*定长分隔方式: 在client端和server端的initChannel中加入配置
*/
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
序列化(对象)传输
- java中序列化传输一般使用 JBossMarshalling,需要双方都为java。跨语言使用Protobuff。ProtoBuff序列化后的码流比java序列化的小很多,如java序列化int,不管值的大小都会占用全部的4字节空间,而protobuff是按实际int大小去占用相应的空间
/**
*client和server端init Chanel时都引入Marshalling转码方法
*/
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
/**
*server端可直接接收对象
*/
Req req = (Req)msg;
注意事项
- 传输的buffer流需要被释放
try{
...
}finally{
ReferenceCountUtil.release(msg);
}
若msg被writeAndFlush发送出去。不需要释放。因为方法中已实现释放代码。
查看源码可发现:
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);//此处被释放
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
网友评论