美文网首页
nio和netty

nio和netty

作者: M问道 | 来源:发表于2018-02-19 10:45 被阅读0次

    nio为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,提供多路(no-blocking)非阻塞式的高伸缩性网络I/O

    本文先介绍NIO三大组件 Buffr、Channel、Selector

    Buffer

    Buffer其实本质就是内存块,所有的数据读写都要依赖这个,我们可以先将数据读取到这个内存块,然后从这个内存块读取数据。


    nio buffer实现类

    我们可以把Buffer理解为数组,Buffer有几个重要的属性:position、limit、capacity。



    position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。
    Buffer可以读写模式切换,从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。

    Limit:写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。

    读写模式切换flip,实际就是limit和position值交换,position值清零

    public final Buffer flip() {
    limit = position; // 将 limit 设置为实际写入的数据数量
    position = 0; // 重置 position 为 0
    mark = -1; // mark 之后再说
    return this;
    }

    初始化Buffer方法

    public static ByteBuffer wrap(byte[] array) {
    ...
    }

    ByteBuffer buffer = ByteBuffer.allocate(1024);

    读取数据到Bufer方法

    int num = channel.read(buf);

    将Buffer数据写入channel

    int num = channel.write(buf);

    附加一个Buffer经常使用方法

    new String(buffer.array()).trim();

    Channel

    所有NIO操作都始于通过,发起请求时会选择相应的通道通信,通道是数据来源地或写入的目的地。



    *FileChannel:文件通道,用于文件的读写
    *DatagramChannel:用于UDP连接的接收和发送
    *SocketChannel:TCP客户端
    *ServerSocketChannel:TCP服务端



    Selector

    Selector建立在非阻塞的模式下,所以注册到Selector的channel必须要支持非阻塞模式。Selector实现了多路复用,用于一个线程可以管理多个Channel。

    Selector selector = Selector.open();
    // 将通道设置为非阻塞模式,因为默认都是阻塞模式的
    channel.configureBlocking(false);
    // 注册
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

    SelectKey有四个事件

    • SelectKey.OP_ACCEPT:接收TCP连接
    • SelectKey.OP_READ:通道有数据可以读
    • SelectKey.OP_WRITE:通道有数据可以写
    • SelectKey.OP_CONNECT:成功建立TCP连接

    上面介绍了NIO三大组件,这边介绍NIO特性以及阻塞和异步概念以及实现。
    NIO,JDK1.4,New IO,Non-Blocking IO
    NIO.2,JDK7,More New IO,Asynchronous IO,严格地说 NIO.2 不仅仅引入了 AIO

    NIO的工作原理

    1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
    2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
    3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
    阻塞IO模式

    服务端

    public class Server {
        public static void main(String[] args) throws IOException {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 监听 8080 端口进来的 TCP 链接
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            while (true) {
                // 这里会阻塞,直到有一个请求的连接进来
                SocketChannel socketChannel = serverSocketChannel.accept();
                // 开启一个新的线程来处理这个请求,然后在 while 循环中继续监听 8080 端口
                SocketHandler handler = new SocketHandler(socketChannel);
                new Thread(handler).start();
            }
        }
    }
    

    SocketHandler实现

    public class SocketHandler implements Runnable {
        private SocketChannel socketChannel;
        public SocketHandler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }
        @Override
        public void run() {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                // 将请求数据读入 Buffer 中
                int num;
                while ((num = socketChannel.read(buffer)) > 0) {
                    // 读取 Buffer 内容之前先 flip 一下
                    buffer.flip();
                    // 提取 Buffer 中的数据
                    byte[] bytes = new byte[num];
                    buffer.get(bytes);
                    String re = new String(bytes, "UTF-8");
                    System.out.println("收到请求:" + re);
                    // 回应客户端
                    ByteBuffer writeBuffer = ByteBuffer.wrap(("我已经收到你的请求,你的请求内容是:" + re).getBytes());
                    socketChannel.write(writeBuffer);
                    buffer.flip();
                }
            } catch (IOException e) {
                IOUtils.closeQuietly(socketChannel);
            }
        }
    }
    

    客户端

    public class SocketChannelTest {
        public static void main(String[] args) throws IOException {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("localhost", 8080));
            // 发送请求
            ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());
            socketChannel.write(buffer);
            // 读取响应
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int num;
            if ((num = socketChannel.read(readBuffer)) > 0) {
                readBuffer.flip();
                byte[] re = new byte[num];
                readBuffer.get(re);
                String result = new String(re, "UTF-8");
                System.out.println("返回值: " + result);
            }
        }
    }
    

    阻塞模式的IO其实就是服务端为每次客户端请求分配一个线程去执行,首先accept是个阻塞操作,当有请求到达时才会返回。然后立即分配一个线程去处理这个请求。请注意这个线程不会立即读写,还需要等到通道读写准备就绪才可以读写,在这之前会一直阻塞。在多线程高并发的情况,线程创建过多,内存开销过大,以及线程切换上下文开销太大,导致系统假死。这种方式不可取。

    非阻塞IO

    非阻塞IO核心是一个Selector管理多个通道,将各个通道注册到 Selector 上,指定监听的事件,之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。
    Selector底层实现的三种方式

    • select

    缺点:
    1.单个进程能够监视的文件描述符的数量存在最大限制
    2.内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销
    3.select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件
    4.select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

    • poll

    相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。

    • epoll

    通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效,解决了select/poll缺点。
    每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件,这些事件都会挂载在红黑树中。而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
    1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
    2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
    3)调用epoll_wait收集发生的事件的连接

    select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。

    服务端

    public class SelectorServer {
        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            ServerSocketChannel server = ServerSocketChannel.open();
            server.socket().bind(new InetSocketAddress(8080));
            // 将其注册到 Selector 中,监听 OP_ACCEPT 事件
            server.configureBlocking(false);
            server.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // 需要不断地去调用 select() 方法获取最新的准备好的通道
                int readyChannels = selector.select();
                if (readyChannels == 0) {
                    continue;
                }
                Set<SelectionKey> readyKeys = selector.selectedKeys();
                // 遍历
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isAcceptable()) {
                        // 有已经接受的新的到服务端的连接
                        SocketChannel socketChannel = server.accept();
                        // 有新的连接并不代表这个通道就有数据,
                        // 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        // 有数据可读
                        // 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int num = socketChannel.read(readBuffer);
                        if (num > 0) {
                            // 处理进来的数据...
                            System.out.println("收到数据:" + new String(readBuffer.array()).trim());
                            socketChannel.register(selector, SelectionKey.OP_WRITE);
                        } else if (num == -1) {
                            // -1 代表连接已经关闭
                            socketChannel.close();
                        }
                    }
                    else if (key.isWritable()) {
                        // 通道可写
                        // 给用户返回数据的通道可以进行写操作了
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
                        socketChannel.write(buffer);
                        // 重新注册这个通道,监听 OP_READ 事件,客户端还可以继续发送内容过来
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                }
            }
        }
    }
    

    客户端代码同阻塞模式客户端代码

    NIO.2 异步 IO

    异步IO两种实现方式
    1.返回 Future 实例
    2.提供 CompletionHandler 回调函数

    代码实现
    服务端

    public class Server {
        public static void main(String[] args) throws IOException {
              // 实例化,并监听端口
            AsynchronousServerSocketChannel server =
                    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
            // 自己定义一个 Attachment 类,用于传递一些信息
            Attachment att = new Attachment();
            att.setServer(server);
            server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
                @Override
                public void completed(AsynchronousSocketChannel client, Attachment att) {
                    try {
                        SocketAddress clientAddr = client.getRemoteAddress();
                        System.out.println("收到新的连接:" + clientAddr);
                        // 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
                        att.getServer().accept(att, this);
                        Attachment newAtt = new Attachment();
                        newAtt.setServer(server);
                        newAtt.setClient(client);
                        newAtt.setReadMode(true);
                        newAtt.setBuffer(ByteBuffer.allocate(2048));
                        // 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
                        client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
                    } catch (IOException ex) {
                        ex.printStackTrace();
                    }
                }
                @Override
                public void failed(Throwable t, Attachment att) {
                    System.out.println("accept failed");
                }
            });
            // 为了防止 main 线程退出
            try {
                Thread.currentThread().join();
            } catch (InterruptedException e) {
            }
        }
    }
    

    ChannelHandler

    public class ChannelHandler implements CompletionHandler<Integer, Attachment> {
        @Override
        public void completed(Integer result, Attachment att) {
            if (att.isReadMode()) {
                // 读取来自客户端的数据
                ByteBuffer buffer = att.getBuffer();
                buffer.flip();
                byte bytes[] = new byte[buffer.limit()];
                buffer.get(bytes);
                String msg = new String(buffer.array()).toString().trim();
                System.out.println("收到来自客户端的数据: " + msg);
                // 响应客户端请求,返回数据
                buffer.clear();
                buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
                att.setReadMode(false);
                buffer.flip();
                // 写数据到客户端也是异步
                att.getClient().write(buffer, att, this);
            } else {
                // 到这里,说明往客户端写数据也结束了,有以下两种选择:
                // 1. 继续等待客户端发送新的数据过来
    //            att.setReadMode(true);
    //            att.getBuffer().clear();
    //            att.getClient().read(att.getBuffer(), att, this);
                // 2. 既然服务端已经返回数据给客户端,断开这次的连接
                try {
                    att.getClient().close();
                } catch (IOException e) {
                }
            }
        }
        @Override
        public void failed(Throwable t, Attachment att) {
            System.out.println("连接断开");
        }
    }
    

    客户端

    public class Client {
        public static void main(String[] args) throws Exception {
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
              // 来个 Future 形式的
            Future<?> future = client.connect(new InetSocketAddress(8080));
            // 阻塞一下,等待连接成功
            future.get();
            Attachment att = new Attachment();
            att.setClient(client);
            att.setReadMode(false);
            att.setBuffer(ByteBuffer.allocate(2048));
            byte[] data = "I am obot!".getBytes();
            att.getBuffer().put(data);
            att.getBuffer().flip();
            // 异步发送数据到服务端
            client.write(att.getBuffer(), att, new ClientChannelHandler());
            // 这里休息一下再退出,给出足够的时间处理数据
            Thread.sleep(2000);
        }
    }
    

    ClientChannelHandler

    public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
        @Override
        public void completed(Integer result, Attachment att) {
            ByteBuffer buffer = att.getBuffer();
            if (att.isReadMode()) {
                // 读取来自服务端的数据
                buffer.flip();
                byte[] bytes = new byte[buffer.limit()];
                buffer.get(bytes);
                String msg = new String(bytes, Charset.forName("UTF-8"));
                System.out.println("收到来自服务端的响应数据: " + msg);
                // 接下来,有以下两种选择:
                // 1. 向服务端发送新的数据
    //            att.setReadMode(false);
    //            buffer.clear();
    //            String newMsg = "new message from client";
    //            byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
    //            buffer.put(data);
    //            buffer.flip();
    //            att.getClient().write(buffer, att, this);
                // 2. 关闭连接
                try {
                    att.getClient().close();
                } catch (IOException e) {
                }
            } else {
                // 写操作完成后,会进到这里
                att.setReadMode(true);
                buffer.clear();
                att.getClient().read(buffer, att, this);
            }
        }
        @Override
        public void failed(Throwable t, Attachment att) {
            System.out.println("服务器无响应");
        }
    }
    

    从代码可以看出来,阻塞IO从连接请求(accept)就开始阻塞一直到通道读写完成;非阻塞IO从连接请求(accept)之前一直阻塞,连接之后非阻塞,通过注册读写事件,委托工作线程执行;异步IO从连接请求(accept)之前就是异步的,通过回调函数或者Future实现。

    这边开始介绍netty 一个高性能NIO网络通信框架,这p边以应用为主,理论上文已经说得很多了。

    Netty搭建一个构建Http RPC框架
    服务端NettyHttpServer

    @Component
    public class NettyHttpServer implements ApplicationListener<ContextRefreshedEvent>,Ordered{
        public  void start() {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            EventLoopGroup childGroup = new NioEventLoopGroup();
            EventLoopGroup parentGroup = new NioEventLoopGroup();
            //accept,read,write
            serverBootstrap.group(parentGroup, childGroup);
            serverBootstrap.channel(NioServerSocketChannel.class) // (3)
            .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    //http解码,编码器
                    ch.pipeline().addLast(new HttpRequestDecoder());
                    ch.pipeline().addLast(new HttpResponseEncoder());
                    ch.pipeline().addLast(new HttpObjectAggregator(1048576));
                    ch.pipeline().addLast(new HttpServerHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)          // (5)
            .childOption(ChannelOption.SO_KEEPALIVE, true); //
            ChannelFuture future = null;
            try {
                future = serverBootstrap.bind(8080).sync();
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
            }
        }
    
        public int getOrder() {
            return 20;
        }
    
        public void onApplicationEvent(ContextRefreshedEvent arg0) {
            start();
        }
    }
    

    HttpServerHandler

    public class HttpServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Object result = new Object();
            try {
                if(msg instanceof  FullHttpRequest){
                    String content = ((FullHttpRequest)msg).content().toString(Charset.defaultCharset());
                    System.out.println(content);
                     //首先根据request content获取是哪个controller,并且要获取对应的请求方法
                    RequestParam requestParam = JSONObject.parseObject(content, RequestParam.class);
                    String command = requestParam.getCommand();
                     //然后去执行相对应的 方法
                    BeanMethod beanMethod = Media.commandBeans.get(command);
                    if(beanMethod !=null){
                        Object bean = beanMethod.getBean();
                        Method m = beanMethod.getM();
                        Class<?> paramType = m.getParameterTypes()[0];
                        Object param=null;
                        if(paramType.isAssignableFrom(List.class)){
                            param = JSONArray.parseArray(JSONArray.toJSONString(requestParam.getContent()), paramType);
                        }else{
                             param = JSON.parseObject(JSONObject.toJSONString(requestParam.getContent()), paramType);
                        }
                        result = m.invoke(bean, param);
                        ResponseParam responseParam = new ResponseParam();
                        responseParam.setCode("00000");
                        responseParam.setResult(result);
                        result = responseParam;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                ResponseParam responseParam = new ResponseParam();
                String failMsg = "您的请求异常!";
                responseParam.setCode("33333");
                responseParam.setResult(failMsg);
                result = responseParam;
            }
             DefaultFullHttpResponse response =new  DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus .OK,                Unpooled.wrappedBuffer(JSONObject.toJSONString(result).getBytes(Charset.defaultCharset())));       response.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);     response.headers().set(HttpHeaderNames.CONTENT_LENGTH,response.content().readableBytes());      response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
            ctx.channel().writeAndFlush(response);
        }
    }
    

    MediaInit 初始化Mapping

    @Component
    public class MediaInit implements ApplicationListener<ContextRefreshedEvent>,Ordered{
        public void onApplicationEvent(ContextRefreshedEvent event) {
            //根据Spring容器,找到包含有Controller注解的所有bean
            Map<String,Object> beans = event.getApplicationContext().getBeansWithAnnotation(Controller.class);
            Map<String,BeanMethod> commandBeans = Media.commandBeans;
            for(String key : beans.keySet()){
                Object bean = beans.get(key);
                Method[] ms = bean.getClass().getDeclaredMethods();
                for(Method m : ms){
                    if(m.isAnnotationPresent(Remote.class)){
                        Remote remote = m.getAnnotation(Remote.class);
                        String command = remote.value();
                        BeanMethod  beanMethod = new BeanMethod();
                        beanMethod.setBean(bean);
                        beanMethod.setM(m);
                        commandBeans.put(command, beanMethod);
                    }
                }           
            }
        }
        public int getOrder() {
            return 0;
        }
    }
    

    客户端NettyClient

    public class NettyHttpClient {
        public static void main(String[] args) {
            Bootstrap b = new Bootstrap();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
             try {
                b.group(workerGroup); // (2)
                 b.channel(NioSocketChannel.class); // (3)
                 b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                 b.handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
    
                         ch.pipeline().addLast(new HttpRequestEncoder());
                         ch.pipeline().addLast(new HttpResponseDecoder());
                          ch.pipeline().addLast(new HttpObjectAggregator(1048576));
                         ch.pipeline().addLast(new HttpClientHandler());
                     }
                 });
                 // Start the client.
                 ChannelFuture f = b.connect("localhost", 8080).sync(); // (5)
                 String uri ="http://localhost:8080/";
                RequestParam requestParam = new RequestParam();
                requestParam.setCommand("productPlanSearch");
                requestParam.setContent("1");
                String requestContent = JSONObject.toJSONString(requestParam);
                ByteBuf content = Unpooled.wrappedBuffer(requestContent.getBytes(Charset.defaultCharset()));
                DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                                                      HttpMethod.POST, uri , content );
                request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
                request.headers().set(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes());
                request.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
    
                f.channel().writeAndFlush(request);
                f.channel().closeFuture().sync();
                ResponseParam  response = (ResponseParam)f.channel().attr(AttributeKey.valueOf("httpResultKey")).get();
    
                if(response.getCode().equals("00000")){
                    System.out.println(response.getResult());
                }
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                workerGroup.shutdownGracefully();
            }
    
        }
    }
    

    HttpClientHandler

    public class HttpClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if(msg instanceof DefaultHttpResponse){
    
            }
            if(msg instanceof  FullHttpResponse){
                String result = ((FullHttpResponse)msg).content().toString(Charset.defaultCharset());
                ResponseParam response = JSONObject.parseObject(result,ResponseParam.class);
                ctx.channel().attr(AttributeKey.valueOf("httpResultKey")).set(response);
                ctx.channel().close();
            }
        }
    }
    

    补充:dubbo底层网络通信也用得是netty,dubbo协议默认是长连接,客户端一次连接会发送多个数据包,当客户端闲置的时候通过心跳检测来维持长连接通信。至于dubbo如何解决多线程 粘包拆包问题,这个会在下一个文章介绍。

    相关文章

      网友评论

          本文标题:nio和netty

          本文链接:https://www.haomeiwen.com/subject/dbditftx.html