Netty是一个高效稳定的NIO应用通信框架,笔者在本专题将带领大家分析Netty底层源码,彻底理解底层通信原理。
注意,本专题只适宜了解java多线程和java io知识的小伙伴阅读。
IO
在计算机系统中I/O就是输入(Input)和输出(Output)的意思,针对不同的操作对象,可以划分为磁盘I/O模型,网络I/O模型,内存映射I/O, Direct I/O、数据库I/O等,只要具有输入输出类型的交互系统都可以认为是I/O系统,也可以说I/O是整个操作系统数据交换与人机交互的通道,这个概念与选用的开发语言没有关系,是一个通用的概念。
在如今的系统中I/O却拥有很重要的位置,现在系统都有可能处理大量文件,大量数据库操作,而这些操作都依赖于系统的I/O性能,也就造成了现在系统的瓶颈往往都是由于I/O性能造成的。因此,为了解决磁盘I/O性能慢的问题,系统架构中添加了缓存来提高响应速度;或者有些高端服务器从硬件级入手,使用了固态硬盘(SSD)来替换传统机械硬盘;在大数据方面,Spark越来越多的承担了实时性计算任务,而传统的Hadoop体系则大多应用在了离线计算与大量数据存储的场景,这也是由于磁盘I/O性能远不如内存I/O性能而造成的格局(Spark更多的使用了内存,而MapReduece更多的使用了磁盘)。因此,一个系统的优化空间,往往都在低效率的I/O环节上,很少看到一个系统CPU、内存的性能是其整个系统的瓶颈。也正因为如此,Java在I/O上也一直在做持续的优化,从JDK 1.4开始便引入了NIO模型,大大的提高了以往BIO模型下的操作效率。
BIO、NIO、AIO
BIO (Blocking I/O):同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。这里使用那个经典的烧开水例子,这里假设一个烧开水的场景,有一排水壶在烧开水,BIO的工作模式就是, 叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶。但是实际上线程在等待水壶烧开的时间段什么都没有做。
NIO (New I/O):同时支持阻塞与非阻塞模式,但这里我们以其同步非阻塞I/O模式来说明,那么什么叫做同步非阻塞?如果还拿烧开水来说,NIO的做法是叫一个线程不断的轮询每个水壶的状态,看看是否有水壶的状态发生了改变,从而进行下一步的操作。
AIO ( Asynchronous I/O):异步非阻塞I/O模型。异步非阻塞与同步非阻塞的区别在哪里?异步非阻塞无需一个线程去轮询所有IO操作的状态改变,在相应的状态改变后,系统会通知对应的线程来处理。对应到烧开水中就是,为每个水壶上面装了一个开关,水烧开之后,水壶会自动通知我水烧开了。
进程中的IO调用
进程中的IO调用步骤大致可以分为以下四步:
1. 进程向操作系统请求数据 ;
2. 操作系统把外部数据加载到内核的缓冲区中;
3. 操作系统把内核的缓冲区拷贝到进程的缓冲区 ;
4. 进程获得数据完成自己的功能 ;
当操作系统在把外部数据放到进程缓冲区的这段时间(即上述的第二,三步),如果应用进程是挂起等待的,那么就是同步IO,反之,就是异步IO,也就是AIO 。
异步、异步、阻塞、非阻塞
- 同步阻塞I/O(BIO):
同步阻塞I/O,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制来改善。BIO方式适用于连接数目比较小且固定的架构,这种方式对服务端资源要求比较高,并发局限于应用中,在jdk1.4以前是唯一的io现在,但程序直观简单易理解 - 同步非阻塞I/O(NIO):
同步非阻塞I/O,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,jdk1,4开始支持 - 异步非阻塞I/O(AIO):
异步非阻塞I/O,服务器实现模式为一个有效请求一个线程,客户端的IO请求都是由操作系统先完成了再通知服务器用其启动线程进行处理。AIO方式适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,jdk1.7开始支持。 - IO与NIO区别:
- IO面向流,NIO面向缓冲区
- IO的各种流是阻塞的,NIO是非阻塞模式
Java NIO的选择允许一个单独的线程来监视多个输入通道,可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入或选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道
- 同步与异步的区别:
同步:发送一个请求,等待返回,再发送下一个请求,同步可以避免出现死锁,脏读的发生
异步:发送一个请求,不等待返回,随时可以再发送下一个请求,可以提高效率,保证并发
- 同步异步关注点在于消息通信机制,阻塞与非阻塞关注的是程序在等待调用结果时(消息、返回值)的状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
- 不同层次:
CPU层次:操作系统进行IO或任务调度层次,现代操作系统通常使用异步非阻塞方式进行IO(有少部分IO可能会使用同步非阻塞),即发出IO请求后,并不等待IO操作完成,而是继续执行接下来的指令(非阻塞),IO操作和CPU指令互不干扰(异步),最后通过中断的方式通知IO操作的完成结果。
线程层次:操作系统调度单元的层次,操作系统为了减轻程序员的思考负担,将底层的异步非阻塞的IO方式进行封装,把相关系统调用(如read和write)以同步的方式展现出来,然而同步阻塞IO会使线程挂起,同步非阻塞IO会消耗CPU资源在轮询上,3个解决方法;
1. 多线程(同步阻塞)
2. IO多路复用(select、poll、epoll)
3. 直接暴露出异步的IO接口,kernel-aio和IOCP(异步非阻塞)
传统BIO创建服务
JNIO是jdk1.4以后才有的,之前JAVA IO一直是BIO,C、C++程序员为什么看不起java程序员?我想BIO的低性能就是其中一个重要的原因吧!
Java BIO其实就是同步阻塞,高并发处理效率低,我们利用JAVA BIO开始一个服务端程序。
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//绑定端口
serverSocket=new ServerSocket(port);
while (true){
//主线程main会阻塞在这里,等待客户端链接
Socket socket = serverSocket.accept();
processClient(socket);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) throws InterruptedException {
//模拟处理socket
Thread.sleep(1000);
}
}
这段代始终在main线程中执行,就好比公司创建初期只有老板一个人,确实能完成客户端的链接及请求处理,运行程序代码会阻塞在serverSocket=new ServerSocket(port);
,一直等到客户端链接成功后,才执行处理函数processClient(socket);
,处理结束后,继续循环,此时程序继续阻塞在Socket socket = serverSocket.accept();
等待新的客户端链接。processClient
花了10秒钟才处理完毕,在此期间,如果其他客户端请链接服务器是不成功的,它必须等上一个客户端请求处理完成了才能继续。假如有1000个客户请求呢?10000个呢?想想你在浏览器页面等待一天才下单成功...于是,这家电商公司倒闭了!
客户端发链接请求,希望你服务器立马处理我的请求,而不是等你处理完毕了别人的事情再来搭理我!时间很宝贵好吗?服务器很委婉,表示人手不够,没办法处理别人事情的同时再处理你的事情,毕竟一心不可二用。
那就增加人手!于是线程临危受命(公司开始招人),服务器派主线程接收请求(相当于公司前台),然后将请求交给另一线程(相当于业务人员)处理,服务器继续等待连接,这样的话新的客户端能立马链接上服务器,而不用等待服务器处理完别人的事情再来接待我了,代码如下:
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//绑定端口
serverSocket=new ServerSocket(port);
while (true){
//主线程main会阻塞在这里,等待客户端链接
Socket socket = serverSocket.accept();
//请求处理交给别人,主线程继续接待客户端的请求
new Thread(()->{
processClient(socket);
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) {
//模拟处理socket
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端链接成功后,交给一个线程处理请求,主线程继续循环等待客户端的链接?如果,有10000个人来链接,那服务器就要开10000个线程,如果10万呢?你开10万个线程?哇,你服务器性能好高耶!线程的创建与销毁很耗资源的好吗?就好比你的公司,你招10000万个业务人员处理客户需求?正常的做法是,招10个业务人员,轮询处理客户请求,每一个业务人员处理完客户请求后等待服务器分给他下一单任务,于是,线程池登场了:
public class BioServer {
public static void main(String[] args) throws IOException {
//端口
int port=8080;
ServerSocket serverSocket=null;
try {
//绑定端口
serverSocket=new ServerSocket(port);
//创建一个线程池,相当于一个固定规模的业务团队
TimeServerHandlerExecutorPool pool = new TimeServerHandlerExecutorPool(50, 1000);
while (true){
//主线程main会阻塞在这里,等待客户端链接
Socket socket = serverSocket.accept();
pool.execute(()->{processClient(socket);});
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
serverSocket.close();
}
}
}
public static void processClient(Socket socket) {
//模拟处理socket
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class TimeServerHandlerExecutorPool implements Executor{
private ExecutorService executorService;
public TimeServerHandlerExecutorPool(int maxPoolSize,int queueSize) {
/**
* @param corePoolSize 核心线程数量
* @param maximumPoolSize 线程创建最大数量
* @param keepAliveTime 当创建到了线程池最大数量时 多长时间线程没有处理任务,则线程销毁
* @param unit keepAliveTime时间单位
* @param workQueue 此线程池使用什么队列
*/
System.out.println(Runtime.getRuntime().availableProcessors());
this.executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
@Override
public void execute(Runnable command) {
executorService.execute(command);
}
}
OK,现在这个公司有模有样了,一个前台,N个业务人员,只要有订单,这N个业务人员可以不睡觉!
公司规模日益增长,用户量越来越大,N个业务人员已经加班加点累吐血了,公司到了一个瓶颈期,急需改变现状。
大家有没有发现,当一件事参与的人多了以后,沟通往往会成为事情发展的最大障碍,前台人员需要不断的与业务人员沟通,业务人员来回不断的找前台沟通,前台在多个业务人员之间不断的进行脑力切换。如果前台正在跟业务人员A沟通,这时业务人员B插进来了,前台转而去跟B沟通,沟通完后需要回忆刚才跟A沟通到哪里了,前台想,太累了,有跟业务人员解释的时间还不如我自己干。其实,这就是多线程的上下文切换,CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再次加载这个任务的状态,从任务保存到再加载的过程就是一次上下文切换。线程切换时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。
image.png
公司不得不进行改革,对前台人员进行业务培训。前台记录多个用户需求,搜集到一定程度后,暂停收集,对这些需求进行筛选,大部分短期自己能做的任务自己做了,难度大且耗时的任务交给业务人员处理。随着规模的增大,可以分成多个组,每组一个前台和多个业务人员。这就是NIO单线程Reactor模型和多线程Reactor模型,上面的比喻可能不恰当,后面会通过代码的形式详细讲解NIO。
利用传统BIO手写一个Redis客户端
Redis作为高性能的缓存数据库,想必大家都用过,应用程序通过Jedis客户端来链接redies,我们就利用java BIO来模拟一个Jedis客户端来向redis发送请求数据。
通信需要双方定好通信协议和数据格式,这里通信协议就是TCP,我们主要关系数据格式,方法就是查看Jedis发送数据的格式。
- 首先,准备服务程序,用于接收并查看Jedis发送来的数据:
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(9999);
while (true) {
Socket socket = serverSocket.accept();
System.out.println("客户端" + socket.getRemoteSocketAddress().toString() + "来连接了");
InputStream inputStream = null;
OutputStream outputStream = null;
try {
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
int count = 0;
byte[] bytes = new byte[1024];
while ((count = inputStream.read(bytes)) != -1) {
String line = new String(bytes, 0, count, "utf-8");
//打印jedis发送过来的数据
System.out.println(line);
outputStream.write("ok".getBytes());
outputStream.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
socket.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
- 准备Jedis链接程序:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
//模拟jedis
public class RedisClient {
public static void main(String[] args) {
Jedis redisClient=new Jedis("localhost",9999);
System.out.println(redisClient.set("yuanma", "123456"));
redisClient.close();
}
}
服务程序打印结果:
客户端/127.0.0.1:65257来连接了
*3
$3
SET
$6
yuanma
$6
123456
上面的服务程序不是真正的Redis服务器,我们只是为了查看Jedis发送的数据格式。Jedis的请求是set yuanma 123456
,意思是设置键yuanma
的值为123456
,Jedis将这个请求封装成了上面的数据格式。我们根据这个数据格式,利用BIO模拟Jedis向真正的Redis服务器发请求,然后再根据键从redis服务器获取值,看是否能成功。
先分析下上面的数据格式,*3
的意思是发送的参数有3个,即set
、yuanma
和123456
,$3
表示第一个参数长度是3,SET
表示的就是第一个参数;以此类推,$6
表示第二个参数长度是6,yuanma
表示第二个参数;$6
表示第三个参数长度是6,123456
表示第三个参数。
举一反三,如果我想Redis发送'set name netty
这条命令,数据格式应该是这样的:
*3
$3
SET
$4
name
$5
netty
如果,向Redis服务器发送 get name
(即获取name的值),数据格式应该是这样的:
*2
$3
GET
$4
name
OK,数据格式我们研究清楚了,下面就是模拟Jedis向服务其发送请求并接收返回的数据。
模拟Jedis客户端来链接Redis服务器
第一步,定义向Redis发送请求的客户端API:
import redis.clients.jedis.Jedis;
//模拟jedis
public class RedisClient {
//发送set key value命令
public String set(String key, String value){
reutrn null;
}
//发送get key命令
public String get(String key){
return null;
}
//发送incr key命令
public String incr(String key){
return null;
}
}
第二步,定义Socket通信层:
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
//socket通信
public class LubanSocket {
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
//构造函数,链接Redis服务器,拿到输入流和输出流
public LubanSocket(String ip,int prot) {
try {
if(!isCon()){
socket=new Socket(ip,prot);
inputStream=socket.getInputStream();
outputStream=socket.getOutputStream();
}
} catch (IOException e) {
e.printStackTrace();
}
}
//发送请求
public void send(String str){
try {
outputStream.write(str.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
//读取Redis返回的数据
public String read(){
byte[] b=new byte[1024];
int count=0;
try {
count= inputStream.read(b);
} catch (IOException e) {
e.printStackTrace();
}
return new String(b,0,count);
}
//判断链接是否断开
public boolean isCon(){
return socket!=null && !socket.isClosed() && socket.isConnected();
}
//关闭连接
public void close(){
if(outputStream!=null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(inputStream!=null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
第三步,定义数据协议层
public class Resp {
/**
* redis网络通信协议,比如set("name","congzhizhi")
* *3
* $3
* set
* $4
* name
* $10
* congzhizhi
* 其中,*3表示发了3个参数,$3表示下面的参数3个字符,以此类推
*
*
*/
public static final String star="*";
public static final String crlf="\r\n";
public static final String lengthStart="$";
//枚举类,定义指令,这里有SET指令、GET指令、INCR指令
public static enum command{
SET,GET,INCR
}
}
下一步就是完善第一步的客户端,组装发送命令。代码也很简单,直接看:
//模拟jedis
public class RedisClient {
private LubanSocket lubanSocket;
//构造函数,链接Redis服务器
public RedisClient(String ip,int prot) {
this.lubanSocket=new LubanSocket(ip,prot);
}
//发送set命令
public String set(String key, String value){
lubanSocket.send(commandStrUtil(Resp.command.SET,key.getBytes(),value.getBytes()));
return lubanSocket.read();
}
//关闭链接
public void close(){
lubanSocket.close();
}
//发送get命令
public String get(String key){
lubanSocket.send(commandStrUtil(Resp.command.GET,key.getBytes()));
return lubanSocket.read();
}
//发送incr命令
public String incr(String key){
lubanSocket.send(commandStrUtil(Resp.command.INCR,key.getBytes()));
return lubanSocket.read();
}
//组装命令
public String commandStrUtil(Resp.command command, byte[]... bytes){
StringBuilder stringBuilder=new StringBuilder();
//拼接*3,set key value,总共3个,bytes代表键和值参数,注意拼接完要加回车换行
stringBuilder.append(Resp.star).append(1+bytes.length).append(Resp.crlf);
//拼接SET的长度,$3
stringBuilder.append(Resp.lengthStart).append(command.toString().getBytes().length).append(Resp.crlf);
//拼接SET字符串
stringBuilder.append(command.toString()).append(Resp.crlf);
//拼接键和值
for (byte[] aByte : bytes) {
stringBuilder.append(Resp.lengthStart).append(aByte.length).append(Resp.crlf);
stringBuilder.append(new String(aByte)).append(Resp.crlf);
}
return stringBuilder.toString();
}
}
上面的代码很简单,不细讲了,下面我们来做个测试:
- 首先,我们先启动redis,小编为演示,在这里启动一个windows版本的redis,到安装目录下通过命令
redis-server.exe "redis.windows.conf"
即可启动,端口号默认为6379:
- 编写测试
public static void main(String[] args) {
RedisClient redisClient=new RedisClient("localhost",6379);
System.out.println(redisClient.set("yuanma", "123456"));
System.out.println(redisClient.get("yuanma"));
redisClient.close();
}
打印结果:
这说明,我们成功向Redis发送的set和get命令,并成功接收了Redis返回的数据。
为进一步证明,我们有可视化客户端连接Redis,然后查看我们刚才set的数据
完美!其实,不光SET 和GET命令,Redis中大部分常用的命令都可以使用咱们这个手写的客户端都可以。Mysql驱动连接数据库也是这个原理啦,就是TCP通信,只不过数据协议和IO模型(以后详讲)不同而已。这就是传统的JAVA AIO编程,他是同步阻塞的,无法满足高并发链接,下一节我们就开始讲高并发网络通信基础NIO。
网友评论