Netty

作者: Sanisy | 来源:发表于2018-07-04 18:33 被阅读17次
    Netty是什么

    Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    为什么选择Netty
    • 事件驱动模型
    • 避免多线程
    • 单线程处理多任务
    • 非阻塞I/O,I/O读写不再阻塞,而是返回0
    • 基于block的传输,通常比基于流的传输更高效
    • 更高级的IO函数,zero-copy
    • IO多路复用大大提高了Java网络应用的可伸缩性和实用性

    什么是NIO
    IO操作有两个步骤:
    1)等待数据准备 (Waiting for the data to be ready)
    2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

    image.png

    传统的BIO代码:

    public class BIO {
         ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
         ServerSocket serverSocket = new ServerSocket();
         serverSocket.bind(8088);
         while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
         Socket socket = serverSocket.accept();
         executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
    }
    
    class ConnectIOnHandler extends Thread{
        private Socket socket;
        public ConnectIOnHandler(Socket socket){
           this.socket = socket;
        }
        public void run(){
          while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
              String someThing = socket.read()....//读取数据
              if(someThing!=null){
                 ......//处理数据
                 socket.write()....//写数据
              }
          }
        }
    }
    

    这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:

    • 利用多核。
    • 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源

    现在的多线程一般都使用线程池,可以让线程的创建和回收成本相对较低。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。

    不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很"贵"的资源,主要表现在:

    • 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
    • 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
    • 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
    • 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
      所以,当面对十万甚至百万级连接的时候,传统的BIO模型是无能为力的。随着移动端应用的兴起和各种网络游戏的盛行,百万级长连接日趋普遍,此时,必然需要一种更高效的I/O处理模型。

    Java中的NIO使用的是Selector模式,所有的Channel都会向Selector中注册,Selector使用了IO多路复用,可以同时监听处理多个SocketChannel。我们可以通过遍历Selector来检测Channel是否有数据读写操作。NIO模式在等待数据的过程是非阻塞的,而是通过死循环遍历Selector。
    原生NIO代码

    public class BasicNioServer implements Runnable{
    
        //1 多路复用器(管理所有的通道)
        private Selector seletor;
        //2 建立缓冲区
        private ByteBuffer readBuf = ByteBuffer.allocate(1024);
        //3 
    //  private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
        
        public BasicNioServer(int port){
            try {
                //1 打开路复用器
                this.seletor = Selector.open();
                //2 打开服务器通道
                ServerSocketChannel ssc = ServerSocketChannel.open();
                //3 设置服务器通道为非阻塞模式
                ssc.configureBlocking(false);   
                //4 绑定地址
                ssc.bind(new InetSocketAddress(port));
                //5 把服务器通道注册到多路复用器上,并且监听阻塞事件
                ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
                
                System.out.println("Server start, port :" + port);
                
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            while(true){
                try {
                    //1 必须要让多路复用器开始监听
                    this.seletor.select();
                    //2 返回多路复用器已经选择的结果集
                    Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                    //3 进行遍历
                    while(keys.hasNext()){
                        //4 获取一个选择的元素
                        SelectionKey key = keys.next();
                        //5 直接从容器中移除就可以了
                        keys.remove();
                        //6 如果是有效的
                        if(key.isValid()){
                            //7 如果为阻塞状态
                            if(key.isAcceptable()){
                                this.accept(key);
                            }
                            //8 如果为可读状态
                            if(key.isReadable()){
                                this.read(key);
                            }
                            //9 写数据
                            if(key.isWritable()){
                                //this.write(key); //ssc
                            }
                        }
                        
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        
    //  private void write(SelectionKey key){
    //      //ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
    //      //ssc.register(this.seletor, SelectionKey.OP_WRITE);
    //  }
    
        private void read(SelectionKey key) {
            try {
                //1 清空缓冲区旧的数据
                this.readBuf.clear();
                //2 获取之前注册的socket通道对象
                SocketChannel sc = (SocketChannel) key.channel();
                //3 读取数据
                int count = sc.read(this.readBuf);
                //4 如果没有数据
                if(count == -1){
                    key.channel().close();
                    key.cancel();
                    return;
                }
                //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
                this.readBuf.flip();
                //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
                byte[] bytes = new byte[this.readBuf.remaining()];
                //7 接收缓冲区数据
                this.readBuf.get(bytes);
                //8 打印结果
                String body = new String(bytes).trim();
                System.out.println("Server : " + body);
                
                // 9..可以写回给客户端数据 
                
            } catch (IOException e) {
                e.printStackTrace();
            }
            
        }
    
        private void accept(SelectionKey key) {
            try {
                //1 获取服务通道
                ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
                //2 执行阻塞方法
                SocketChannel sc = ssc.accept();
                //3 设置阻塞模式
                sc.configureBlocking(false);
                //4 注册到多路复用器上,并设置读取标识
                sc.register(this.seletor, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
            
            new Thread(new BasicNioServer(8765)).start();;
        }
    }
    
    • 需要了解复杂的API:SocketChannel、ByteBuffer、Selector等
    • ByteBuffer读写索引是同一个,容易操作不当
    • 编写代码复杂
    • 不支持心跳机制、连接监控
    Netty的方式
    public class NettyServer {
    
        private ServerBootstrap serverBootstrap;
    
        private EventLoopGroup bossGroup;
    
        private EventLoopGroup workerGroup;
    
        private ChannelFuture future;
    
    
        private Map<String, ChannelHandlerContext> ctxMap;
    
        public NettyServer() {
            ctxMap = new HashMap<>();
            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
                                    .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                    .addLast(new ServerHandler(NettyServer.this));
                        }
                    });
        }
    
    
        public void doBind() {
            try {
                future = serverBootstrap.bind("127.0.0.1", 12345).sync();
                future.channel().closeFuture().sync();
            } catch (Exception e) {
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public Map<String, ChannelHandlerContext> getCtxMap() {
            return this.ctxMap;
        }
    
        /**
         *
         * @param ip
         * @param msg
         */
        public void sendMsg(String ip, String msg) {
            ChannelHandlerContext handlerContext = ctxMap.get(ip);
            if (handlerContext == null) {
                System.out.println("找不到客户端信息");
                return;
            }
    
            handlerContext.writeAndFlush(msg);
        }
    
        public static void main(String[] args) {
            NettyServer nettyServer = new NettyServer();
    
            new Thread(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    String[] info = input.split(" ");
                    nettyServer.sendMsg(info[0], info[1]);
                }
            });
    
            nettyServer.doBind();
        }
    }
    
    Netty处理消息
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        public NettyClient nettyClient;
    
        public ClientHandler(NettyClient nettyClient) {
            this.nettyClient = nettyClient;
        }
    
        /**
         * channel注册成功
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
    
        /**
         * channel被关闭
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("连接被关闭,准备进行重连");
            nettyClient.doConnect();
        }
    
        /**
         * channel读取消息
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("收到服务端消息:" + msg);
        }
    
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    
        /**
         * 事件触发回调
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                switch (event.state()) {
                    //读超时
                    case READER_IDLE:
                    //写超时
                    case WRITER_IDLE:
                    //读写都超时
                    case ALL_IDLE:
                        doPing(ctx);
                        break;
                }
            }
        }
    
    
        private void doPing(ChannelHandlerContext ctx) {
            ctx.writeAndFlush("ping");
        }
    }
    

    Netty的核心组件:

    • Bootstrap
    • EventLoopGroup
    • EventLoop
    • SocketChannel
    • ChannelInitializer
    • ChannelPipeline
    • ChannelHandler
    Netty高性能的原因:
    1. 使用nio

    2. Netty使用的是Reactor的线程模型


      Netty线程模型.png
    3. ByteBuf极大地优化了IO操作

    即所谓的 Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.

    在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.

    而需要注意的是, Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的更多的是偏向于 优化数据操作 这样的概念.


    Netty的事件处理模型:


    Netty事件处理模型.png

    Netty源码分析

    MultithreadEventLoopGroup.java设置默认的线程数
     static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
     }
    
    EventLoopGroup和EventLoop
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    
    /*******************************NioEventLoopGroup.java*******************************/
     @Override
     protected EventLoop newChild(Executor executor, Object... args) throws Exception {
          return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
     }
    

    NioEventLoop的代码

        /**
         * The NIO {@link Selector}.
         */
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
    
        private final SelectorProvider provider;
    
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
        
        
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
        
        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            // check if the set is empty and if so just return to not create garbage by
            // creating a new Iterator every time even if there is nothing to process.
            // See https://github.com/netty/netty/issues/597
            if (selectedKeys.isEmpty()) {
                return;
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
    
                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }
    
    Netty粘包拆包

    TCP为确保传输的准确性,设置了ack应答机制;TCP为了减少ack的次数,优化网络传输,使用了一个传输缓冲区,默认机制是一定时间内当缓冲区的数据达到一定大小的时候,才将数据传输出去。当我们频繁传输短消息的时候,就会引发TCP的粘包现象。TCP拆包是相对于粘包的,拆包是指把TCP传输的数据,准确的拆成真实的传输数据。

    Netty提供了解决粘包拆包的解码器:
    • LineBasedFrameDecoder 换行符
    • DelimiterBaseFrameDecoder 分隔符
    • FixdLengthFrameDecoder 定长
    • StringDecoder 将消息解析成字符串
    ###Client端:
     new Thread(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("--------------run----------------------");
                String str = "7月不下雪";
                for (int i = 0; i < 100; i++) {
                    //换行符作为数据完整性的标志
                    client.sendMsg(str + i + System.lineSeparator());
                }
    
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNext()) {
                    String input = scanner.next();
                    client.sendMsg(input);
                }
            }).start();
    
    ###服务端
    serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            //为了得到正确的拆包,必须把正确的解码器放在第一位
                            pipeline.addLast(new LineBasedFrameDecoder(1024))
                                    .addLast(new StringDecoder(CharsetUtil.UTF_8))
                                    .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                    .addLast(new ServerHandler(NettyServer.this));
                        }
                    });
    

    需要注意的是 group.shutdownGracefully();会关闭EventLoopGroup,Channel依赖于EventLoopGroup来进行事件分发,所以会导致Channel重连失败

        public boolean doConnect() {
            try {
                future = bootstrap.connect("127.0.0.1", 12345).sync();
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            System.out.println("---------------------重连成功-----------------------");
                        }else {
                            System.out.println("---------------------重连失败-----------------------");
                        }
                    }
                });
                log.error("建立连接------------------------");
                //这是一个阻塞的方法,监听服务端连接断开事件
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            } finally {
                log.error("连接关闭------------------------");
    //            这段代码会关闭EventLoopGroup,所以不能放在finally,不然服务端关闭连接的时候,会捕获到异常,最后执行这段代码,会导致重连失败
    //            group.shutdownGracefully();
            }
    
            return true;
        }
    

    Netty天然支持心跳机制:

    public NettyClient() {
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
                                    .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                    //监听Channel读写状态,在ChannelHandler中可以重写回调方法处理对应的事件
                                    .addLast(new IdleStateHandler(5, 5,5))
                                    .addLast(new ClientHandler(NettyClient.this));
                        }
                    });
    }
    
        /**
         * ClientHandler.java
         * 事件触发回调
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                switch (event.state()) {
                    //读超时
                    case READER_IDLE:
                    //写超时
                    case WRITER_IDLE:
                    //读写都超时
                    case ALL_IDLE:
                        //发送心跳
                        doPing(ctx);
                        break;
                }
            }
        }
    

    参考资料:
    5种网络IO模型https://www.cnblogs.com/findumars/p/6361627.html
    美团技术团队https://tech.meituan.com/nio.html?utm_source=tool.lu
    掘金Nettyhttps://juejin.im/search?query=netty
    Netty的ByteBufhttps://blog.csdn.net/u010853261/article/details/53690780
    Netty的ByteBuf的零拷贝https://segmentfault.com/a/1190000007560884
    即时通讯http://www.52im.net/

    相关文章

      网友评论

          本文标题:Netty

          本文链接:https://www.haomeiwen.com/subject/cufuyftx.html