Netty介绍
Netty是由JBOSS提供的一个java开源框架,是业界最流行的NIO框架,整合了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,精心设计的框架,在多个大型商业项目中得到充分验证。
那些主流框架产品在用?
- 搜索引擎框架 ElasticSerach
- Hadopp子项目Avro项目,使用Netty作为底层通信框架
- 阿里巴巴开源的RPC框架 Dubbo
补充:netty4是dubbo2.5.6后引入的,2.5.6之前的netty用的是netty3Netty在Dubbo里面使用的地址 https://github.com/apache/incubator-dubbo/tree/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4
BIO时间返回器
public class BioServer {
public static final int PORT=3456;
public static void main(String[] args) throws IOException {
ServerSocket server=null;
try {
server=new ServerSocket(PORT);
Socket socket=null;
while (true) {
socket= server.accept();
new Thread(new TimerServerHandler(socket)).start();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
if (server != null) {
server.close();
}
}
}
}
public class TimerServerHandler implements Runnable {
private Socket socket;
public TimerServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body=null;
while ((body = in.readLine()) != null && body.length() != 0) {
System.out.println("客户端发送:"+body);
out.println(new Date().toString());
}
} catch (Exception e) {
} finally {
if (in!=null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out!=null) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (this.socket != null) {
try {
this.socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class BioClient {
public static final int PORT=3456;
public static void main(String[] args) {
Socket socket=null;
BufferedReader in=null;
PrintWriter out=null;
try {
socket=new Socket("127.0.0.1",PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("i am client");
String s = in.readLine();
System.out.println("服务器当前时间:"+s);
} catch (Exception e) {
} finally {
if (in!=null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out!=null) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
BIO优缺点
-
优点
- 模型简单
- 编码简单
-
缺点:性能瓶颈,请求数和线程数 N:N关系高并发情况下,CPU切换线程上下文损耗大
案例:web服务器Tomcat7之前,都是使用BIO,7之后就使用NIO
改进:伪NIO,使用线程池去处理业务逻辑
网络IO模型
同步异步、堵塞和非堵塞
-
洗衣机洗衣服
- 洗衣机洗衣服(无论阻塞式IO还是非阻塞式IO,都是同步IO模型)
-
同步阻塞:你把衣服丢到洗衣机洗,然后看着洗衣机洗完,洗好后再去晾衣服(你就干等,啥都不做,阻塞在那边)
-
同步非阻塞:你把衣服丢到洗衣机洗,然后会客厅做其他事情,定时去阳台看洗衣机是不是洗完了,洗好后再去晾衣服,这之间可以干其他事情
-
异步阻塞: 你把衣服丢到洗衣机洗,然后看着洗衣机洗完,洗好后再去晾衣服(几乎没这个情况,几乎没这个说法,可以忽略)
-
异步非阻塞:你把衣服丢到洗衣机洗,然后会客厅做其他事情,洗衣机洗好后会自动去晾衣服,晾完成后放个音乐告诉你洗好衣服并晾好了
IO详解
- IO操作分两步:发起IO请求等待数据准备,实际IO操作(洗衣服,晾衣服)同步须要主动读写数据,在读写数据的过程中还是会阻塞(好比晾衣服阻塞了你) 异步仅仅须要I/O操作完毕的通知。并不主动读写数据,由操作系统内核完毕数据的读写(机器人帮你自动晾衣服)
- 五种IO的模型:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO和异步IO,前四种都是同步IO,在内核数据copy到用户空间时都是阻塞的
权威:RFC标准,或者书籍 《UNIX Network Programming》中文名《UNIX网络编程-卷一》第六章
1)阻塞式I/O;
2)非阻塞式I/O;
3)I/O复用(select,poll,epoll...);
I/O多路复用是阻塞在select,epoll这样的系统调用没有阻塞在真正的I/O系统调用如recvfrom进程受阻于select,等待可能多个套接口中的任一个变为可读
IO多路复用使用两个系统调用(select和recvfrom)
blocking IO只调用了一个系统调用(recvfrom)
select/epoll 核心是可以同时处理多个connection,而不是更快,所以连接数不高的话,性能不一定比多线程+阻塞IO好
多路复用模型中,每一个socket,设置为non-blocking,
阻塞是在select这
-
信号驱动式I/O(SIGIO)
-
异步I/O(POSIX的aio_系列函数)Future-Listener机制
-
IO操作分为两步
- 发起IO请求,等待数据准备(Waiting for the data to be ready)
- 实际的IO操作,将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
-
前四种IO模型都是同步IO操作,区别在于第一阶段,而他们的第二阶段是一样的:在数据从内核复制到应用缓冲区期间(用户空间),进程阻塞于recvfrom调用或者select()函数。相反,异步I/O模型在这两个阶段都要处理。
-
阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。
几个核心点:
阻塞非阻塞说的是线程的状态(重要)
同步和异步说的是消息的通知机制(重要)
同步需要主动读写数据,异步是不需要主动读写数据
同步IO和异步IO是针对用户应用程序和内核的交互
异步需要内核层次的支持
IO多路复用技术
什么是IO多路复用:I/O多路复用,I/O是指网络I/O, 多路指多个TCP连接(即socket或者channel),复用指复用一个或几个线程。简单来说:就是使用一个或者几个线程处理多个TCP连接,最大优势是减少系统开销小,不必创建过多的进程/线程,也不必维护这些进程/线程
select:
基本原理:监视文件3类描述符: writefds、readfds、和exceptfds,调用后select
函数会阻塞住,等有数据 可读、可写、出异常 或者 超时 就会返回,select函数正常返回后,通过遍历fdset整个数组才能发现哪些句柄发生了事件,来找到
就绪的描述符fd,然后进行对应的IO操作,几乎在所有的平台上支持,跨平台支持性好
缺点:
1)select采用轮询的方式扫描文件描述符,全部扫描,随着文件描述符FD数量增多而性能下降
2)每次调用 select(),需要把 fd 集合从用户态拷贝到内核态,并进行遍历(消息传递都是从内核到用户空间)
3)最大的缺陷就是单个进程打开的FD有限制,默认是1024,这个指的是jvm的限制,而不是linux的限制(可修改宏定义,但是效率仍然慢)
static final int MAX_FD = 1024
poll:
基本流程:
select() 和 poll() 系统调用的大体一样,处理多个描述符也是使用轮询的方式,根据描述符的状态进行处理,一样需要把 fd 集合从用户态拷贝到内核态,并进行遍历。最大区别是: poll没有最大文件描述符限制(使用链表的方式存储fd)
select和poll基本没啥区别,主要是一个链表一个数组。
Epoll讲解
epoll 基本原理:
在2.6内核中提出的,对比select和poll,epoll更加灵活,没有描述符限制,用户态拷贝到内核态只需要一次
使用事件通知,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用callback的回调机制来激活对应的fd
优点:
1)没fd这个限制,所支持的FD上限是操作系统的最大文件句柄数,1G内存大概支持10万个句柄
2)效率提高,使用回调通知而不是轮询的方式,不会随着FD数目的增加效率下降
3)通过callback机制通知,内核和用户空间mmap同一块内存实现
Linux内核核心函数
1)epoll_create() 在Linux内核里面申请一个文件系统 B+树,返回epoll对象,也是一个fd
2)epoll_ctl() 操作epoll对象,在这个对象里面修改添加删除对应的链接fd, 绑定一个callback函数
3)epoll_wait() 判断并完成对应的IO操作
缺点:
编程模型比select/poll 复杂
例子:100万个连接,里面有1万个连接是活跃,在 select、poll、epoll分别是怎样的表现
select:不修改宏定义,则需要 1000个进程才可以支持 100万连接
poll:100万个链接,遍历都响应不过来了,还有空间的拷贝消耗大量的资源
epoll:通过回调通知,性能相比之下提升很大
Java的I/O演进历史
- jdk1.4之前是采用同步阻塞模型,也就是BIO 大型服务一般采用C或者C++, 因为可以直接操作系统提供的异步IO,AIO
- jdk1.4推出NIO,支持非阻塞IO,jdk1.7升级,推出NIO2.0,提供AIO的功能,支持文件和网络套接字的异步IO
Netty线程模型和Reactor模式
- 设计模式——Reactor模式(反应器设计模式),是一种基于事件驱动的设计模式,在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求一般出现在高并发系统中,比如Netty,Redis等
- 优点
- 1)响应快,不会因为单个同步而阻塞,虽然Reactor本身依然是同步的
- 2)编程相对简单,最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
- 3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
- 缺点
- 1)相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
- 2)Reactor模式需要系统底层的的支持,比如Java中的Selector支持,操作系统的select系统调用支持
- 通俗理解:KTV例子前台接待,服务人员带领去开机器
- Reactor模式基于事件驱动,适合处理海量的I/O事件,属于同步非阻塞IO(NIO)
- Reactor单线程模型(比较少用)
- 1)作为NIO服务端,接收客户端的TCP连接;作为NIO客户端,向服务端发起TCP连接;
- 2)服务端读请求数据并响应;客户端写请求并读取响应
使用场景: 对应小业务则适合,编码简单;对于高负载、大并发的应用场景不适合,一个NIO线程处理太多请求,则负载过高,并且可能响应变慢,导致大量请求超时,而且万一线程挂了,则不可用了
- Reactor多线程模型
- 内容:Acceptor不在是一个线程,而是一组NIO线程;IO线程也是一组NIO线程,这样就是两个线程池去处理接入连接和处理IO
- 使用场景:满足目前的大部分场景,也是Netty推荐使用的线程模型
实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:
(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。
(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。
总结:上面的单线程Reactor其实就可以看着一个特殊的handler。而多线程Reactor则分为两部分,一部分是Reactor(可以为多线程,线程组或者单线程),而handler也就是上面说的IO线程,必须是线程组或者多线程。
附属资料:
为什么Netty使用NIO而不是AIO,是同步非阻塞还是异步非阻塞?
答案:
在Linux系统上,AIO的底层实现仍使用EPOLL,与NIO相同,因此在性能上没有明显的优势
Netty整体架构是reactor模型,采用epoll机制,所以往深的说,还是IO多路复用模式,所以也可说netty是同步非阻塞模型(看的层次不一样)
很多人说这是netty是基于Java NIO 类库实现的异步通讯框架
特点:异步非阻塞、基于事件驱动,性能高,高可靠性和高可定制性。
参考资料:
https://github.com/netty/netty/issues/2515
基于netty搭建echo服务
常用服务组件
- EventLoop和EventLoopGroup
- Bootstrapt启动引导类
- Channel 生命周期,状态变化
- ChannelHandler和ChannelPipline
代码
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
/**
* 启动流程
*/
public void run() throws InterruptedException {
//配置服务端线程组
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
//启动类
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.TCP_NODELAY,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//串联很多要处理的handler
ch.pipeline().addLast(new EchoHandler());
}
});
//绑定端口,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}finally {
//优雅退出,释放线程池
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port=8080;
if (args.length > 0) {
port=Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}
public class EchoHandler extends ChannelInboundHandlerAdapter {
//读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Channel channel = ctx.channel();
// channel.writeAndFlush()
// ChannelPipeline pipeline = ctx.pipeline();
// pipeline.writeAndFlush()
ByteBuf data= (ByteBuf) msg;
System.out.println("服务端收到数据:"+data.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(data);
}
//读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("EchoServerHandler channelReadComplete");
}
//异常捕获
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//关闭管道
}
}
public class EchoClient {
private String host;
private int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
//https://blog.csdn.net/fd2025/article/details/79740226
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new EchoClientHandler());
}
});
//连接到服务端,connect是异步连接,再调用同步async,等待连接成功从
ChannelFuture channelFuture = bootstrap.connect().sync();
//阻塞,直到客户端通道关闭
channelFuture.channel().closeFuture().sync();
} finally {
//优雅退出,释放nio线程
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient("127.0.0.1", 8080).start();
}
}
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf msg) throws Exception {
System.out.println("Client Received: "+msg.toString(CharsetUtil.UTF_8));
}
//channel激活的时候
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
ctx.writeAndFlush(Unpooled.copiedBuffer("哈哈测试",CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("EchoClientHandler Complate");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.xdclass</groupId>
<artifactId>echo-project</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
</dependencies>
</project>
Netty的核心链路源码
剖析EventLoop和EventLoopGroup线程模型
- 高性能RPC框架的3个要素:IO模型(linux的IO模型五种)、数据协议(http,rpc等)、线程模型
线程模型
1. 传统IO模型:
每个请求都分配一个线程用来处理该请求,关于该请求
的read,handle,和send都放在一个线程中进行处理
2. 基于线程池的伪异步IO模型
针对传统IO模型中会造成线程资源极大浪费的缺点,通
过线程池来复用线程处理客户端连接和数据处理.
* 会有一个阻塞线程负责socket连接,即acceptor;
*会有一个线程池维护n个活跃线程和一个消息队列,来
处理socketTask,所以资源是可控的,所以无论客户端
多少并发连接,都会导致系统资源耗尽和宕机;
缺点:
- 无法解决通信阻塞的问题,因为socket.read()方法是
流式数据读取,因此只能读取完所有数据后才能正确处理,如果一个socket发送数据需要60秒那么该线程处理数
据至少要60秒,那么这段时间内的io事件,该线程是
无法及时处理的,如果这样的io事件出现多次,很可
能造成消息队列阻塞;
- 只有一个acceptor负责socket连接,如果线程池阻塞队列阻塞之后,那么所有新的客户端连接也将会被拒绝;如果大量连接拒绝,就可能会认定为系统故障;
3. Reactor模型(实时响应)
前面已经讲过这个模型;
IO复用结合线程池复用就是Reactor模型设计的基本思想
总结:线程模型其实就是IO模型的相关运用,可能还会搭配线程池服用,例如Reactor模型
-
EventLoop好比一个线程,1个EventLoop可以服务多个Channel,1个Channel只有一个EventLoop可以创建多个 EventLoop 来优化资源利用,也就是EventLoopGroup
-
EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个EventLoop
- EventLoopGroup -> 多个 EventLoop
- EventLoop -> 维护一个Selector(其实就是遍历器)
- 学习资料:http://ifeve.com/selectors/
-
EventLoopGroup默认线程池数量是系统核数*2
Bootstrap模块讲解
- 服务器启动引导类ServerBootstrap
- group :设置线程组模型,Reactor线程模型对比EventLoopGroup
- 单线程
- 多线程
- 主从线程
- 参考:https://blog.csdn.net/QH_JAVA/article/details/78443646
- group :设置线程组模型,Reactor线程模型对比EventLoopGroup
- channel:
设置channel通道类型NioServerSocketChannel、OioServerSocketChannel
-
option: 作用于每个新建立的channel,设置TCP连接中的一些参数,如下
- ChannelOption.SO_BACKLOG: 存放已完成三次握手的请求的等待队列的最大长度;
- Linux服务器TCP连接底层知识:
- syn queue:半连接队列,洪水攻击,tcp_max_syn_backlog
- accept queue:全连接队列, net.core.somaxconn
- 系统默认的somaxconn参数要足够大 ,如果backlog比somaxconn大,则会优先用后者 https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/NetUtil.java#L250
- ChannelOption.TCP_NODELAY: 为了解决Nagle的算法问题,默认是false, 要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数,就设置为false,会累积一定大小后再发送
- 知识拓展: https://baike.baidu.com/item/Nagle%E7%AE%97%E6%B3%95/5645172 https://www.2cto.com/article/201309/241096.html
-
childOption: 作用于被accept之后的连接
-
childHandler: 用于对每个通道里面的数据处理
粗略的理解为option是给bossGroup配置的,childOption是给workerGroup配置的;这两个线程组对应reactor模型的Acceptor和handler
- 客户端启动引导类Bootstrap
- remoteAddress: 服务端地址
- handler:和服务端通信的处理器
Channel模块
- 什么是Channel: 客户端和服务端建立的一个连接通道
- 什么是ChannelHandler: 负责Channel的逻辑处理
- 什么是ChannelPipeline:负责管理ChannelHandler的有序容器
- 他们是什么关系
一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中 创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的
- Channel当状态出现变化,就会触发对应的事件
- 状态:
- channelRegistered: channel注册到一个EventLoop
- channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据
- channelInactive: channel处于非活跃状态,没有连接到远程主机
- channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定
- 状态:
特别注意:执行顺序channelRegistered-》channelActive=》channelInactive=》channelUnregistered
ChannelHandler和ChannelPipeline模块讲解
- 方法: handlerAdded : 当 ChannelHandler 添加到 ChannelPipeline 调用; handlerRemoved : 当 ChannelHandler 从 ChannelPipeline 移除时调用; exceptionCaught : 执行抛出异常时调用;
- ChannelHandler下主要是两个子接口
- ChannelInboundHandler:(入站) 处理输入数据和Channel状态类型改变, 适配器ChannelInboundHandlerAdapter(适配器设计模式) 常用的:SimpleChannelInboundHandler
- ChannelOutboundHandler:(出站) 处理输出数据,适配器ChannelOutboundHandlerAdapter
- ChannelPipeline: 好比厂里的流水线一样,可以在上面添加多个ChannelHandler,也可看成是一串 ChannelHandler实例,拦截穿过 Channel 的输入输出 event,ChannelPipeline实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权
ChannelHandlerContext模块
- ChannelHandlerContext是连接ChannelHandler和ChannelPipeline的桥梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,好比调用write方法
- Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者都会在整个管道流里传播,而ChannelHandlerContext就只会在后续的Handler里面传播
- AbstractChannelHandlerContext类双向链表结构,next/prev分别是后继节点,和前驱节点
- DefaultChannelHandlerContext 是实现类,但是大部分都是父类那边完成,这个只是简单的实现一些方法 主要就是判断Handler的类型
- ChannelInboundHandler之间的传递,主要通过调用ctx里面的FireXXX()方法来实现下个handler的调用
入站出站Handler执行顺序
- InboundHandler顺序执行,OutboundHandler逆序执行
- InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)
- InboundHandler通过ctx.write(msg),则会传递到outboundHandler
- 使用ctx.write(msg)传递消息,Inbound需要放在结尾,在Outbound之后,不然outboundhandler会不执行;但是使用channel.write(msg)、pipline.write(msg)情况会不一致,outboundhandler都会执行
- outBound和Inbound谁先执行,针对客户端和服务端而言,客户端是发起请求再接受数据,先outbound再inbound,服务端则相反
总结:需要保证最后一个outhandler的的上下文可以有next的指向,否则最后一个outhandler就不会执行了,也就是说最后一个inhanlder之后的outhandler都不会执行。所以一般最后都要有一个inhandler。
模块ChannelFuture
- Netty中的所有I/O操作都是异步的,这意味着任何I/O调用都会立即返回,而ChannelFuture会提供有关的信息I/O操作的结果或状态。
- ChannelFuture状态
- 未完成:当I/O操作开始时,将创建一个新的对象,新的最初是未完成的 - 它既没有成功,也没有成功,也没有被取消,因为I/O操作尚未完成。
- 已完成:当I/O操作完成,不管是成功、失败还是取消,Future都是标记为已完成的, 失败的时候也有具体的信息,例如原因失败,但请注意,即使失败和取消属于完成状态
- 注意:不要在IO线程内调用future对象的sync或者await方法。不能在channelHandler中调用sync或者await方法,会阻塞
- ChannelPromise:继承于ChannelFuture,进一步拓展用于设置IO操作的结果
Netty网络数据传输编解码
- 最开始接触的编码码:java序列化/反序列化(就是编解码)、url编码、base64编解码
- 为啥jdk有编解码,还要netty自己开发编解码?
- java自带序列化的缺点
1)无法跨语言
2) 序列化后的码流太大,也就是数据包太大
3) 序列化和反序列化性能比较差
- 业界里面也有其他编码框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等
- Netty里面的编解码:
- 解码器:负责处理“入站 InboundHandler”数据
- 编码器:负责“出站 OutboundHandler” 数据
- Netty里面提供默认的编解码器,也支持自定义编解码器
- Encoder:编码器
- Decoder:解码器
- Codec:编解码器
解码器Decoder
- Decoder对应的就是ChannelInboundHandler,主要就是字节数组转换为消息对象
- 主要是两个方法 decode decodeLast
- 抽象解码器
- ByteToMessageDecoder用于将字节转为消息,需要检查缓冲区是否有足够的字节
- ReplayingDecoder继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略满于ByteToMessageDecoder,不是所有的ByteBuf都支持
- 选择:项目复杂性高则使用ReplayingDecoder,否则使用 ByteToMessageDecoder
- MessageToMessageDecoder用于从一种消息解码为另外一种消息(例如POJO到POJO)
- 解码器具体的实现,用的比较多的是(更多是为了解决TCP底层的粘包和拆包问题)
- DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
- LineBasedFrameDecoder: 以换行符为结束标志的解码器
- FixedLengthFrameDecoder:固定长度解码器
- LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
- StringDecoder:文本解码器,将接收到的对象转化为字符串,一般会与上面的进行配合,然后在后面添加业务handle
编码器Encoder
- Encoder对应的就是ChannelOutboundHandler,消息对象转换为字节数组
- Netty本身未提供和解码一样的编码器,是因为场景不同,两者非对等的(也就是不见得是一对一的关系)
- MessageToByteEncoder消息转为字节数组,调用write方法,会先判断当前编码器是否支持需要发送的消息类型,如果不支持,则透传;
- MessageToMessageEncoder用于从一种消息编码为另外一种消息(例如POJO到POJO)
编解码器类Codec
组合解码器和编码器,以此提供对于字节和消息都相同的操作
优点:成对出现,编解码都是在一个类里面完成
缺点:耦合在一起,拓展性不佳
Codec:组合编解码
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解码
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:编码
1)ByteToMessageEncoder
2)MessageToMessageEncoder
TCP粘包拆包
什么是粘包拆包
1)TCP拆包: 一个完整的包可能会被TCP拆分为多个包进行发送
2)TCP粘包: 把多个小的包封装成一个大的数据包发送, client发送的若干数据包 Server接收时粘成一包
发送方和接收方都可能出现这个原因
发送方的原因:TCP默认会使用Nagle算法
接收方的原因: TCP接收到数据放置缓存中,应用程序从缓存中读取
UDP: 是没有粘包和拆包的问题,有边界协议
TCP半包读写常见解决方案
发送方:可以关闭Nagle算法
接受方: TCP是无界的数据流,并没有处理粘包现象的机制, 且协议本身无法避免粘包,半包读写的发生需要在应用层进行处理
应用层解决半包读写的办法
1)设置定长消息 (10字符)
xdclass000xdclass000xdclass000xdclass000
2)设置消息的边界 ($$ 切割)
sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$
3)使用带消息头的协议,消息头存储消息开始标识及消息的长度信息
Header+Body
Netty自带解决TCP半包读写方案
DelimiterBasedFrameDecoder: 指定消息分隔符的解码器
- LineBasedFrameDecoder:以换行符为结束标志的解码器
- FixedLengthFrameDecoder:固定长度解码器
- LengthFieldBasedFrameDecoder:message = header+body, 基于长度解码的通用解码器
public void run() throws Exception{
//配置服务端的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
System.out.println("Echo 服务器启动");
//绑定端口,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
}finally {
//优雅退出,释放线程池
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
LineBasedFrameDecoder解决TCP半包读写
- LineBaseFrameDecoder 以换行符为结束标志的解码器 ,构造函数里面的数字表示最长遍历的帧数
- StringDecoder解码器将对象转成字符串
自定义分隔符解决TCP读写问题
- maxLength:表示一行最大的长度,如果超过这个长度依然没有检测自定义分隔符,将会抛出TooLongFrameException
- failFast:如果为true,则超出maxLength后立即抛出TooLongFrameException,不进行继续解码.如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常
- stripDelimiter:解码后的消息是否去除掉分隔符
- delimiters:分隔符,ByteBuf类型
自定义长度半包读写器LengthFieldBasedFrameDecoder
maxFrameLength 数据包的最大长度
lengthFieldOffset 长度字段的偏移位,长度字段开始的地方,意思是跳过指定长度个字节之后的才是消息体字段
lengthFieldLength 长度字段占的字节数, 帧数据长度的字段本身的长度
lengthAdjustment
一般 Header + Body,添加到长度字段的补偿值,如果为负数,开发人员认为这个 Header的长度字段是整个消息包的长度,则Netty应该减去对应的数字
initialBytesToStrip 从解码帧中第一次去除的字节数, 获取完一个完整的数据包之后,忽略前面的指定位数的长度字节,应用解码器拿到的就是不带长度域的数据包
failFast 是否快速失败
缓冲ByteBuf
ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的
JDK ByteBuffer的缺点:
-
无法动态扩容:长度固定,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常
-
API使用复杂:读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些API,否则很容易出现错误
-
ByteBuf:是数据容器(字节容器)
-
JDK ByteBuffer:共用读写索引,每次读写操作都需要Flip(复位,因为读索引和写索引是同一个)扩容麻烦,而且扩容后容易造成浪费
-
Netty ByteBuf: 读写使用不同的索引,所以操作便捷自动扩容,使用便捷
增强
- API操作便捷性
- 动态扩容
- 多种ByteBuf实现
- 高效的零拷贝机制
ByteBuf操作
1.png 2.pngByteBuf动态扩容
capacity默认值:256字节,最大值:Integet.MAX_VALUE(2GB)
write*方法调用时,通过AbstractByteBuf.ensureWritable0进行检查
容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)
根据新capacity的最小值要求,对应有两套计算方法:
没超过4M:从64字节开始,每次增加一倍,直至计算出来的newCpacity满足新容量最小要求
示例:当前大小256,写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512
超过4M:新容量=新容量最小要求/4M*4M+4M
示例:当前大小3M,已写3M,继续写2M数据,需要的容量最小要求是5M,则新容量是9M(不能超过最大值)
4M的来源:一个固定的阈值AbstractByteBufAllocator.CALCULATE_THRESHOLD
ByteBuf实现
3.png所谓池化,其实就是内存复用
Unsafe的实现
4.pngPooledByteBuf对象、内存复用
5.png零拷贝机制
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多的关联。
使用ByteBuf时netty高性能很重哟的一个原因。
6.png
说明:例如2.就是buffer持有array的引用,实际上数据没动,3也是,数据没动,只是其中ll的引用被buffer持有;还有1,如果是常规jdk的数组合并,其实是拷贝数据,同时新开内存生成新的数组
ByteBuf创建方法和常用的模式
ByteBuf:传递字节数据的容器
ByteBuf的创建方法
1)ByteBufAllocator
池化(Netty4.x版本后默认使用 PooledByteBufAllocator提高性能并且最大程度减少内存碎片
非池化UnpooledByteBufAllocator: 每次返回新的实例
2)Unpooled: 提供静态方法创建未池化的ByteBuf,可以创建堆内存和直接内存缓冲区
ByteBuf使用模式
堆缓存区HEAP BUFFER:
优点:存储在JVM的堆空间中,可以快速的分配和释放
缺点:每次使用前会拷贝到直接缓存区(也叫堆外内存)
直接缓存区DIRECR BUFFER:
优点:存储在堆外内存上,堆外分配的直接内存,不会占用堆空间
缺点:内存的分配和释放,比在堆缓冲区更复杂
复合缓冲区COMPOSITE BUFFER:
可以创建多个不同的ByteBuf,然后放在一起,但是只是一个视图
选择:大量IO数据读写,用“直接缓存区”; 业务消息编解码用“堆缓存区”
Netty内部设计模式
Builder构造器模式:ServerBootstap
责任链设计模式:pipeline的事件传播
工厂模式: 创建Channel
适配器模式:HandlerAdapter
单机百万连接
必备知识
- 网络IO模型
- Linux文件描述符
- 单进程文件句柄数(默认1024,不同系统不一样,每个进程都有最大的文件描述符限制)
- 全局文件句柄数
- 如何确定一个唯一的TCP连接.
- TCP四元组:源IP地址、源端口、目的ip、目的端口
Netty单机百万连接Linux内核参数优化
局部文件句柄限制(单个进程最大文件打开数)
ulimit -n 一个进程最大打开的文件数 fd 不同系统有不同的默认值
root身份编辑 vim /etc/security/limits.conf
增加下面
root soft nofile 1000000
root hard nofile 1000000
* soft nofile 1000000
* hard nofile 1000000
* 表示当前用户,修改后要重启
全局文件句柄限制(所有进程最大打开的文件数,不同系统是不一样,可以直接echo临时修改)
查看命令
cat /proc/sys/fs/file-max
永久修改全局文件句柄, 修改后生效 sysctl -p
vim /etc/sysctl.conf
增加 fs.file-max = 1000000
启动
java -jar millionServer-1.0-SNAPSHOT.jar -Xms5g -Xmx5g -XX:NewSize=3g -XX:MaxNewSize=3g
网友评论