美文网首页NIO通信 & Netty 4 源码解读
「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结

「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结

作者: 源码之路 | 来源:发表于2020-06-22 11:00 被阅读0次

           Netty是一个高效稳定的NIO应用通信框架,笔者在本专题将带领大家分析Netty底层源码,彻底理解底层通信原理。

    注意,本专题只适宜了解java多线程和java io知识的小伙伴阅读。

    IO

           在计算机系统中I/O就是输入(Input)和输出(Output)的意思,针对不同的操作对象,可以划分为磁盘I/O模型,网络I/O模型,内存映射I/O, Direct I/O、数据库I/O等,只要具有输入输出类型的交互系统都可以认为是I/O系统,也可以说I/O是整个操作系统数据交换与人机交互的通道,这个概念与选用的开发语言没有关系,是一个通用的概念。
           在如今的系统中I/O却拥有很重要的位置,现在系统都有可能处理大量文件,大量数据库操作,而这些操作都依赖于系统的I/O性能,也就造成了现在系统的瓶颈往往都是由于I/O性能造成的。因此,为了解决磁盘I/O性能慢的问题,系统架构中添加了缓存来提高响应速度;或者有些高端服务器从硬件级入手,使用了固态硬盘(SSD)来替换传统机械硬盘;在大数据方面,Spark越来越多的承担了实时性计算任务,而传统的Hadoop体系则大多应用在了离线计算与大量数据存储的场景,这也是由于磁盘I/O性能远不如内存I/O性能而造成的格局(Spark更多的使用了内存,而MapReduece更多的使用了磁盘)。因此,一个系统的优化空间,往往都在低效率的I/O环节上,很少看到一个系统CPU、内存的性能是其整个系统的瓶颈。也正因为如此,Java在I/O上也一直在做持续的优化,从JDK 1.4开始便引入了NIO模型,大大的提高了以往BIO模型下的操作效率。

    BIO、NIO、AIO

    BIO (Blocking I/O):同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。这里使用那个经典的烧开水例子,这里假设一个烧开水的场景,有一排水壶在烧开水,BIO的工作模式就是, 叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶。但是实际上线程在等待水壶烧开的时间段什么都没有做。

    NIO (New I/O):同时支持阻塞与非阻塞模式,但这里我们以其同步非阻塞I/O模式来说明,那么什么叫做同步非阻塞?如果还拿烧开水来说,NIO的做法是叫一个线程不断的轮询每个水壶的状态,看看是否有水壶的状态发生了改变,从而进行下一步的操作。

    AIO ( Asynchronous I/O):异步非阻塞I/O模型。异步非阻塞与同步非阻塞的区别在哪里?异步非阻塞无需一个线程去轮询所有IO操作的状态改变,在相应的状态改变后,系统会通知对应的线程来处理。对应到烧开水中就是,为每个水壶上面装了一个开关,水烧开之后,水壶会自动通知我水烧开了。

    进程中的IO调用

    进程中的IO调用步骤大致可以分为以下四步:
       1. 进程向操作系统请求数据 ;
       2. 操作系统把外部数据加载到内核的缓冲区中;
       3. 操作系统把内核的缓冲区拷贝到进程的缓冲区 ;
       4. 进程获得数据完成自己的功能 ;
           当操作系统在把外部数据放到进程缓冲区的这段时间(即上述的第二,三步),如果应用进程是挂起等待的,那么就是同步IO,反之,就是异步IO,也就是AIO 。

    异步、异步、阻塞、非阻塞

    1. 同步阻塞I/O(BIO):
            同步阻塞I/O,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制来改善。BIO方式适用于连接数目比较小且固定的架构,这种方式对服务端资源要求比较高,并发局限于应用中,在jdk1.4以前是唯一的io现在,但程序直观简单易理解
    2. 同步非阻塞I/O(NIO):
            同步非阻塞I/O,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,jdk1,4开始支持
    3. 异步非阻塞I/O(AIO):
            异步非阻塞I/O,服务器实现模式为一个有效请求一个线程,客户端的IO请求都是由操作系统先完成了再通知服务器用其启动线程进行处理。AIO方式适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,jdk1.7开始支持。
    4. IO与NIO区别:
    • IO面向流,NIO面向缓冲区
    • IO的各种流是阻塞的,NIO是非阻塞模式
            Java NIO的选择允许一个单独的线程来监视多个输入通道,可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入或选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道
    1. 同步与异步的区别:
            同步:发送一个请求,等待返回,再发送下一个请求,同步可以避免出现死锁,脏读的发生
            异步:发送一个请求,不等待返回,随时可以再发送下一个请求,可以提高效率,保证并发
    • 同步异步关注点在于消息通信机制,阻塞与非阻塞关注的是程序在等待调用结果时(消息、返回值)的状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
    • 不同层次:
      CPU层次:操作系统进行IO或任务调度层次,现代操作系统通常使用异步非阻塞方式进行IO(有少部分IO可能会使用同步非阻塞),即发出IO请求后,并不等待IO操作完成,而是继续执行接下来的指令(非阻塞),IO操作和CPU指令互不干扰(异步),最后通过中断的方式通知IO操作的完成结果。
      线程层次:操作系统调度单元的层次,操作系统为了减轻程序员的思考负担,将底层的异步非阻塞的IO方式进行封装,把相关系统调用(如read和write)以同步的方式展现出来,然而同步阻塞IO会使线程挂起,同步非阻塞IO会消耗CPU资源在轮询上,3个解决方法;
            1. 多线程(同步阻塞)
            2. IO多路复用(select、poll、epoll)
            3. 直接暴露出异步的IO接口,kernel-aio和IOCP(异步非阻塞)

    传统BIO创建服务

           JNIO是jdk1.4以后才有的,之前JAVA IO一直是BIO,C、C++程序员为什么看不起java程序员?我想BIO的低性能就是其中一个重要的原因吧!
           Java BIO其实就是同步阻塞,高并发处理效率低,我们利用JAVA BIO开始一个服务端程序。

    
    public class BioServer {
      public static void main(String[] args) throws IOException {
          //端口
          int port=8080;
          ServerSocket serverSocket=null;
          try {
              //绑定端口
              serverSocket=new ServerSocket(port);
              while (true){
                  //主线程main会阻塞在这里,等待客户端链接
                  Socket socket = serverSocket.accept();
                  processClient(socket);
              }
          } catch (IOException | InterruptedException e) {
              e.printStackTrace();
          }finally {
              if(serverSocket!=null){
                  serverSocket.close();
              }
          }
      }
      public static  void processClient(Socket socket) throws InterruptedException {
          //模拟处理socket
          Thread.sleep(1000);
      }
    }
    

           这段代始终在main线程中执行,就好比公司创建初期只有老板一个人,确实能完成客户端的链接及请求处理,运行程序代码会阻塞在serverSocket=new ServerSocket(port);,一直等到客户端链接成功后,才执行处理函数processClient(socket);,处理结束后,继续循环,此时程序继续阻塞在Socket socket = serverSocket.accept();等待新的客户端链接。processClient花了10秒钟才处理完毕,在此期间,如果其他客户端请链接服务器是不成功的,它必须等上一个客户端请求处理完成了才能继续。假如有1000个客户请求呢?10000个呢?想想你在浏览器页面等待一天才下单成功...于是,这家电商公司倒闭了!
           客户端发链接请求,希望你服务器立马处理我的请求,而不是等你处理完毕了别人的事情再来搭理我!时间很宝贵好吗?服务器很委婉,表示人手不够,没办法处理别人事情的同时再处理你的事情,毕竟一心不可二用。
           那就增加人手!于是线程临危受命(公司开始招人),服务器派主线程接收请求(相当于公司前台),然后将请求交给另一线程(相当于业务人员)处理,服务器继续等待连接,这样的话新的客户端能立马链接上服务器,而不用等待服务器处理完别人的事情再来接待我了,代码如下:

    public class BioServer {
        public static void main(String[] args) throws IOException {
            //端口
            int port=8080;
            ServerSocket serverSocket=null;
            try {
                //绑定端口
                serverSocket=new ServerSocket(port);
                while (true){
                    //主线程main会阻塞在这里,等待客户端链接
                    Socket socket = serverSocket.accept();
                  //请求处理交给别人,主线程继续接待客户端的请求
                    new Thread(()->{
                            processClient(socket);
                    }).start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if(serverSocket!=null){
                    serverSocket.close();
                }
            }
        }
          public static  void processClient(Socket socket)  {
            //模拟处理socket
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

           客户端链接成功后,交给一个线程处理请求,主线程继续循环等待客户端的链接?如果,有10000个人来链接,那服务器就要开10000个线程,如果10万呢?你开10万个线程?哇,你服务器性能好高耶!线程的创建与销毁很耗资源的好吗?就好比你的公司,你招10000万个业务人员处理客户需求?正常的做法是,招10个业务人员,轮询处理客户请求,每一个业务人员处理完客户请求后等待服务器分给他下一单任务,于是,线程池登场了:

    public class BioServer {
        public static void main(String[] args) throws IOException {
            //端口
            int port=8080;
            ServerSocket serverSocket=null;
            try {
                //绑定端口
                serverSocket=new ServerSocket(port);
                //创建一个线程池,相当于一个固定规模的业务团队
                TimeServerHandlerExecutorPool pool = new TimeServerHandlerExecutorPool(50, 1000);
                while (true){
                    //主线程main会阻塞在这里,等待客户端链接
                    Socket socket = serverSocket.accept();
                    pool.execute(()->{processClient(socket);});
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if(serverSocket!=null){
                    serverSocket.close();
                }
            }
        }
    
        public static  void processClient(Socket socket)  {
            //模拟处理socket
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class TimeServerHandlerExecutorPool implements Executor{
         private ExecutorService executorService;
        public TimeServerHandlerExecutorPool(int maxPoolSize,int queueSize) {
            /**
             * @param corePoolSize 核心线程数量
             * @param maximumPoolSize 线程创建最大数量
             * @param keepAliveTime 当创建到了线程池最大数量时  多长时间线程没有处理任务,则线程销毁
             * @param unit keepAliveTime时间单位
             * @param workQueue 此线程池使用什么队列
             */
            System.out.println(Runtime.getRuntime().availableProcessors());
            this.executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                    maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
        }
        @Override
        public void execute(Runnable command) {
            executorService.execute(command);
        }
    }
    

           OK,现在这个公司有模有样了,一个前台,N个业务人员,只要有订单,这N个业务人员可以不睡觉!
           公司规模日益增长,用户量越来越大,N个业务人员已经加班加点累吐血了,公司到了一个瓶颈期,急需改变现状。
           大家有没有发现,当一件事参与的人多了以后,沟通往往会成为事情发展的最大障碍,前台人员需要不断的与业务人员沟通,业务人员来回不断的找前台沟通,前台在多个业务人员之间不断的进行脑力切换。如果前台正在跟业务人员A沟通,这时业务人员B插进来了,前台转而去跟B沟通,沟通完后需要回忆刚才跟A沟通到哪里了,前台想,太累了,有跟业务人员解释的时间还不如我自己干。其实,这就是多线程的上下文切换,CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再次加载这个任务的状态,从任务保存到再加载的过程就是一次上下文切换。线程切换时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。


    image.png

    公司不得不进行改革,对前台人员进行业务培训。前台记录多个用户需求,搜集到一定程度后,暂停收集,对这些需求进行筛选,大部分短期自己能做的任务自己做了,难度大且耗时的任务交给业务人员处理。随着规模的增大,可以分成多个组,每组一个前台和多个业务人员。这就是NIO单线程Reactor模型和多线程Reactor模型,上面的比喻可能不恰当,后面会通过代码的形式详细讲解NIO。

    利用传统BIO手写一个Redis客户端

           Redis作为高性能的缓存数据库,想必大家都用过,应用程序通过Jedis客户端来链接redies,我们就利用java BIO来模拟一个Jedis客户端来向redis发送请求数据。
           通信需要双方定好通信协议和数据格式,这里通信协议就是TCP,我们主要关系数据格式,方法就是查看Jedis发送数据的格式。

    • 首先,准备服务程序,用于接收并查看Jedis发送来的数据:
    
    public class BioServer {
    
        public static void main(String[] args) throws IOException {
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(9999);
                while (true) {
                    Socket socket = serverSocket.accept();
                    System.out.println("客户端" + socket.getRemoteSocketAddress().toString() + "来连接了");
                    InputStream inputStream = null;
                    OutputStream outputStream = null;
                    try {
                        inputStream = socket.getInputStream();
                        outputStream = socket.getOutputStream();
                        int count = 0;
                        byte[] bytes = new byte[1024];
                        while ((count = inputStream.read(bytes)) != -1) {
                            String line = new String(bytes, 0, count, "utf-8");
                           //打印jedis发送过来的数据
                            System.out.println(line);
                            outputStream.write("ok".getBytes());
                            outputStream.flush();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        socket.close();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                serverSocket.close();
            }
        }
    }
    
    • 准备Jedis链接程序:
        <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.8.0</version>
            </dependency>
    
    //模拟jedis
    public class RedisClient {
    
        public static void main(String[] args) {
            Jedis redisClient=new Jedis("localhost",9999);
            System.out.println(redisClient.set("yuanma", "123456"));
            redisClient.close();
        }
    }
    

    服务程序打印结果:

    客户端/127.0.0.1:65257来连接了
    *3
    $3
    SET
    $6
    yuanma
    $6
    123456
    

           上面的服务程序不是真正的Redis服务器,我们只是为了查看Jedis发送的数据格式。Jedis的请求是set yuanma 123456,意思是设置键yuanma的值为123456,Jedis将这个请求封装成了上面的数据格式。我们根据这个数据格式,利用BIO模拟Jedis向真正的Redis服务器发请求,然后再根据键从redis服务器获取值,看是否能成功。
           先分析下上面的数据格式,*3的意思是发送的参数有3个,即setyuanma123456$3表示第一个参数长度是3,SET表示的就是第一个参数;以此类推,$6表示第二个参数长度是6,yuanma表示第二个参数;$6表示第三个参数长度是6,123456表示第三个参数。
           举一反三,如果我想Redis发送'set name netty这条命令,数据格式应该是这样的:

    *3
    $3
    SET
    $4
    name
    $5
    netty
    

           如果,向Redis服务器发送 get name(即获取name的值),数据格式应该是这样的:

    *2
    $3
    GET
    $4
    name
    

           OK,数据格式我们研究清楚了,下面就是模拟Jedis向服务其发送请求并接收返回的数据。

    模拟Jedis客户端来链接Redis服务器

    第一步,定义向Redis发送请求的客户端API:

    import redis.clients.jedis.Jedis;
    //模拟jedis
    public class RedisClient {
        //发送set key value命令
        public String set(String key, String value){
              reutrn null;
        }
        //发送get key命令
        public String get(String key){
            return null;
        }
        //发送incr key命令
        public String incr(String key){
            return  null;
        }
    }
    
    

    第二步,定义Socket通信层:

    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    
    //socket通信
    public class LubanSocket {
    
        private Socket socket;
        private InputStream inputStream;
        private OutputStream outputStream;
    
    //构造函数,链接Redis服务器,拿到输入流和输出流
        public LubanSocket(String ip,int prot) {
            try {
                if(!isCon()){
                    socket=new Socket(ip,prot);
                    inputStream=socket.getInputStream();
                    outputStream=socket.getOutputStream();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    //发送请求
        public void send(String str){
            try {
                outputStream.write(str.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    //读取Redis返回的数据
        public String read(){
            byte[] b=new byte[1024];
            int count=0;
            try {
                count= inputStream.read(b);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return new String(b,0,count);
        }
    
    //判断链接是否断开
        public boolean isCon(){
            return socket!=null && !socket.isClosed() && socket.isConnected();
        }
    //关闭连接
        public void close(){
            if(outputStream!=null){
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(inputStream!=null){
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(socket!=null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    第三步,定义数据协议层

    public class Resp {
        /**
         * redis网络通信协议,比如set("name","congzhizhi")
         * *3
         * $3
         * set
         * $4
         * name
         * $10
         * congzhizhi
         * 其中,*3表示发了3个参数,$3表示下面的参数3个字符,以此类推
         *
         *
         */
        public static final String star="*";
        public static final String crlf="\r\n";
        public static final String lengthStart="$";
    //枚举类,定义指令,这里有SET指令、GET指令、INCR指令
        public static enum command{
            SET,GET,INCR
        }
    }
    
    

    下一步就是完善第一步的客户端,组装发送命令。代码也很简单,直接看:

    //模拟jedis
    public class RedisClient {
    
        private LubanSocket lubanSocket;
    //构造函数,链接Redis服务器
        public RedisClient(String ip,int prot) {
            this.lubanSocket=new LubanSocket(ip,prot);
        }
    //发送set命令
        public String set(String key, String value){
            lubanSocket.send(commandStrUtil(Resp.command.SET,key.getBytes(),value.getBytes()));
            return lubanSocket.read();
        }
    //关闭链接
        public void close(){
            lubanSocket.close();
        }
    //发送get命令
        public String get(String key){
            lubanSocket.send(commandStrUtil(Resp.command.GET,key.getBytes()));
            return lubanSocket.read();
        }
    //发送incr命令
        public String incr(String key){
            lubanSocket.send(commandStrUtil(Resp.command.INCR,key.getBytes()));
            return lubanSocket.read();
        }
    
    //组装命令
        public String commandStrUtil(Resp.command command, byte[]... bytes){
        StringBuilder stringBuilder=new StringBuilder();
            //拼接*3,set key value,总共3个,bytes代表键和值参数,注意拼接完要加回车换行
            stringBuilder.append(Resp.star).append(1+bytes.length).append(Resp.crlf);
            //拼接SET的长度,$3
            stringBuilder.append(Resp.lengthStart).append(command.toString().getBytes().length).append(Resp.crlf);
            //拼接SET字符串
            stringBuilder.append(command.toString()).append(Resp.crlf);
            //拼接键和值
            for (byte[] aByte : bytes) {
                stringBuilder.append(Resp.lengthStart).append(aByte.length).append(Resp.crlf);
                stringBuilder.append(new String(aByte)).append(Resp.crlf);
            }
            return stringBuilder.toString();
        }
    }
    
    

    上面的代码很简单,不细讲了,下面我们来做个测试:

    • 首先,我们先启动redis,小编为演示,在这里启动一个windows版本的redis,到安装目录下通过命令redis-server.exe "redis.windows.conf"即可启动,端口号默认为6379:
    • 编写测试
     public static void main(String[] args) {
            RedisClient redisClient=new RedisClient("localhost",6379);
            System.out.println(redisClient.set("yuanma", "123456"));
            System.out.println(redisClient.get("yuanma"));
            redisClient.close();
        }
    

    打印结果:


    这说明,我们成功向Redis发送的set和get命令,并成功接收了Redis返回的数据。
    为进一步证明,我们有可视化客户端连接Redis,然后查看我们刚才set的数据



    完美!其实,不光SET 和GET命令,Redis中大部分常用的命令都可以使用咱们这个手写的客户端都可以。Mysql驱动连接数据库也是这个原理啦,就是TCP通信,只不过数据协议和IO模型(以后详讲)不同而已。这就是传统的JAVA AIO编程,他是同步阻塞的,无法满足高并发链接,下一节我们就开始讲高并发网络通信基础NIO。

    相关文章

      网友评论

        本文标题:「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结

        本文链接:https://www.haomeiwen.com/subject/llhyxktx.html