美文网首页
【Netty】从Java.IO到Java.NIO再到Netty

【Netty】从Java.IO到Java.NIO再到Netty

作者: 小圣996 | 来源:发表于2020-02-17 22:36 被阅读0次

    我们应该已经知道,Netty是一个基于NIO的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。Netty在Java NIO的基础上提供了更高层的抽象和封装,因此要想对Netty有所深入了解,势必要对Java.NIO有所了解,而NIO是对传统IO由阻塞向异步非阻塞IO的巨大跨越,因此了解传统Java.IO对了解Java.NIO也大有裨益。

    传统IO弊端
    首先我们看下传统IO的网络编程的一个简单例子,由此将进入对传统Java.IO的介绍:

        public static void main(String[] agrs) throws Exception{
            ServerSocket serverSocket = new ServerSocket(8899);
            while (true) {    
                Socket socket = serverSocket.accept();      
                new Thread(() -> {
                    try {
                        BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        String clientInputStr = input.readLine(); 
                        
                        System.out.println("客户端"+socket.getRemoteSocketAddress()+"发过来的内容:" + clientInputStr); 
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }).start();    
            } 
        }
    

    java.io最核心的一个概念是流(Stream),java.io可以看成是面向流的编程,它将数据的输入输出抽象为流,流是一组有顺序的,单向的,有起点和终点的数据集合,就像水流。在Java中,一个流要么是输入流,要么是输出流。按照流中的最小数据单元又分为字节流和字符流。

    字节流:以 8 位(即1byte=8bit)作为一个数据单元,数据流中最小的数据单元是字节。它的顶级父类是InputStream和OutputStream。
    字符流:以 16 位(即1char=2byte=16bit)作为一个数据单元,数据流中最小的数据单元是字符, Java 中的字符是 Unicode 编码,一个字符占用两个字节。它的顶级父类是Reader和Writer。

    所有的Java IO流都是阻塞的,这意味着,当一条线程执行accept(),read()或者write()方法时,这条线程会一直阻塞直到有连接请求,或读取到了一些数据或者要写出去的数据已经全部写出,在这期间这条线程不能做任何其他的事情,而如果想支持多个连接,那就需为每个连接新开个线程去支持读写操作,如上代码所示。这种模式在用户负载增加时,性能将下降非常的快(大家应该都知道无限开线程的后果)。

    非阻塞Reactor模式引入
    随着网络应用的发展和网络服务的用户逐渐增多,需要有一种新的方案去解决传统网络的这种问题,在上世纪90年代,便提出了一种Reactor模式,Reactor模式是一种高并发事件驱动的网络服务模式,它的实现可以用Java实现、C++实现或其他语言实现,而java.nio就是依照reactor模式设计的,此外,其他的一些框架也采用(或实现)了Reactor模式,如Redis,Nginx,Netty(Netty是Java NIO更高层的抽象)等。Reactor的框架及流程图如下所示(参照Douglas C. Schmidt 的《Reactor》):

    reactor模式及逻辑流程.png

    它的结构包括了5个部分,因为这些结构和Java nio的实现有些出入(有些对不上号,有些需用户自己实现),此不介绍了,感兴趣的读者可以参考Douglas C. Schmidt 的《Reactor》一文介绍。而大家对Reactor模式的认识更喜好Doug Lea 《Scalable IO in Java》中的一文介绍。如下图所示:

    单线程的Reactor模式

    Java NIO网络编程
    已知了java.nio就是依照reactor模式设计的,我们再看一个Java NIO网络编程的一个简单例子,由此将进入对传统Java NIO的介绍:

    public static void main(String[] agrs) throws Exception{
            int portsNum = 5;
            int[] ports = new int[portsNum];
            for (int i = 0; i < portsNum; i++){
                ports[i] = 9000 + i;
            }
            
            Selector selector = Selector.open();
            
            for (int i = 0; i < ports.length; i++){
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.bind(new InetSocketAddress(ports[i]));
                
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("服务端监听端口:" + ports[i]);
            }
            
            while(true) {
                selector.select();
                
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                for(Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext();){
                    SelectionKey selectionKey = it.next();
                    
                    if (selectionKey.isAcceptable()){
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        
                        it.remove();
                        System.out.println("接受客户端连接:" + socketChannel);
                    }else if (selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                        
                        int byteRead = 0;
                        while(true){
                            ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                            byteBuffer.clear();
                            
                            int read = socketChannel.read(byteBuffer);
                            if (read <= 0){
                                break;
                            }
                            
                            byteBuffer.flip();
                            socketChannel.write(byteBuffer);
                            byteRead += read;
                        }
                        
                        it.remove();
                        System.out.println("读取:" + byteRead + ", 来自:" + socketChannel);
                    }
                }
            }
        }
    

    从上述代码可以看出,一个单线程的java.nio的网络编程流程基本为:
    1.创建一个selector;
    2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
    3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
    (tips:即使上面的serverSocketChannel.configureBlocking(false);及关心事件用完即删it.remove();在Netty中也会有相应源码)

    由此再看[reactor模式及逻辑流程.png]一图,java.nio的流程和该图中的结构及流程图是非常相似的,
    Selector:相当于Synchronous Event Demultiplexer。
    SelectionKey: 相当于event,和一个SocketChannel关联。
    SocketChannel:相当于handle。
    java nio中没有提供initial dispatcher的抽象,这部分功能需要用户自行实现。
    java nio中没有提供event handler的抽象,这部分功能需要用户自行实现。

    Java NIO有三大核心组件:
    1.Channel
    Java NIO中的所有I/O操作都基于Channel对象,就像流操作都要基于Stream对象一样。一个Channel(通道)代表和某一实体的连接,这个实体可以是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于我们的程序和操作系统底层I/O服务进行交互。但是,一个通道,既可以读又可以写,而一个Stream是单向的。

    2.Buffer
    NIO中所使用的缓冲区不是一个简单的byte数组,而是封装过的Buffer类,通过它提供的API,我们可以灵活的操纵数据。在Java NIO当中,我们是面向(块)或是缓冲区(buffer)编程的。Buffer本身就是一块内存,底层实现除了数组之外,Buffer还提供了对于数据的结构化访问方式,并且可以追踪到系统的读写过程。
    NIO提供了多种 Buffer 类型与Java基本类型相对应(但没有BoolBuffer),如ByteBuffer、CharBuffer、IntBuffer等,区别就是读写缓冲区时的单位长度不一样。Buffer中有3个很重要的变量,它们是理解Buffer工作机制的关键,分别是capacity (总容量)、position (指针当前位置)、limit (读/写边界位置)。

    3.Selector
    Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生。
    通道有如下4个事件可供我们监听:
    Accept:有可以接受的连接
    Connect:连接成功
    Read:有数据可读
    Write:可以写入数据了

    为什么要用Selector?
    如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。
    【注:从上述三大组件至此大部分摘自 一文让你彻底理解 Java NIO 核心组件

    Netty对Java NIO的抽象
    了解了Java NIO及知道java.nio的网络编程流程后,我们再看Netty源码就显得轻松点了,下面我们将在netty源码中找出java.nio网络编程的影子。

    1.创建一个selector;
    在创建boss线程组时,我们会调用EventLoopGroup bossGroup = new NioEventLoopGroup();方法,一直点NioEventLoopGroup实现,最后会来到NioEventLoop实现,由selector = openSelector();一句,每个NioEventLoop都会创建一个selector。

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            selector = openSelector();
            selectStrategy = strategy;
        }
    

    2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
    Netty中创建NioServerSocketChannel通道,并注册到boss线程的selector中的源码在bootstrap.bind(address)中,即服务端的ip和端口绑定方法中,先在initAndRegister()方法的

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                ...
            }
    
            ChannelFuture regFuture = config().group().register(channel);
        }
    

    ChannelFuture regFuture = config().group().register(channel);一句,由register(channel)方法及类的继承关系,进入SingleThreadEventLoop的register(Channel channel)方法

    public ChannelFuture register(Channel channel) {
            return register(new DefaultChannelPromise(channel, this));
        }
    

    然后在在AbstractChannel的register(EventLoop eventLoop, final ChannelPromise promise)方法中

            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ...
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        ...
                    }
                }
            }
    

    进入该类下私有的register0(promise);方法,

    private void register0(ChannelPromise promise) {
                try {
                    ...
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
                    ...
                } catch (Throwable t) {
                }
            }
    

    最后进入doRegister();的实现类重写方法,即AbstractNioChannel下的doRegister()方法中,

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        throw e;
                    }
                }
            }
        }
    

    由javaChannel().register(eventLoop().selector, 0, this);一句,可见将NioServerSocketChannel通道,注册到了boss线程的selector中。

    我们再看客户端的channel注册:
    boss线程启动后,会监听客户端的连接,主要是在下面代码中监听的(见《Netty的启动过程二》一文介绍):

                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
    

    在《Netty的启动过程二》中知道,当有客户端请求连接时,会进入内部类NioMessageUnsafe的read()方法中,继而在doReadMessages(readBuf);方法,然后在buf.add(new NioSocketChannel(this, ch));一句中,创建用于读取和写入数据的NioSocketChannel,并注册关心事件SelectionKey.OP_READ;此后,仍然在内部类NioMessageUnsafe的read()方法中,在pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)方法(上述过程详见《Netty的启动过程二):

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
                child.pipeline().addLast(childHandler);
                for (Entry<ChannelOption<?>, Object> e: childOptions) {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                }
                for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                    child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
                }
    
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            }
    

    由childGroup.register(child)一句,便和NioServerSocketChannel通道注册一样,将NioSocketChannel注册到了worker线程的selector中。

    3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
    由《Netty的启动过程二》一文其实已经写明了,它是在这里无限循环读取NioServerSocketChannel(NioSocketChannel)上发生的关心事件然后各自channel的handler处理的。

        @Override
        protected void run() {
            for (;;) {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            ...
                    }
                    processSelectedKeys();
            }
        }
    

    从上述分析可知,Netty很巧妙的封装了Java NIO支持,提供了reactor的所有封装,在一定程度上简化了nio网络编程,用户在使用中只需实现网络数据包的event handler即可。当然,还需了解服务端和客户端的启动模式,并知晓如何监听连接的,如何读取数据的,及数据在ChannelPipeline中的流向,再以netty自带的编解码工具,出站入站适配工具,便可对Netty上手了。

    相关文章

      网友评论

          本文标题:【Netty】从Java.IO到Java.NIO再到Netty

          本文链接:https://www.haomeiwen.com/subject/uytvfhtx.html