美文网首页技术
Netty之旅二:口口相传的高性能Netty到底是什么?

Netty之旅二:口口相传的高性能Netty到底是什么?

作者: 一枝花算不算浪漫 | 来源:发表于2020-08-23 15:34 被阅读0次
    d0iosx.png

    高清思维导图原件(xmind/pdf/jpg)可以关注公众号:一枝花算不算浪漫 回复netty01即可。

    前言

    上一篇文章讲了NIO相关的知识点,相比于传统IONIO已经做得很优雅了,为什么我们还要使用Netty

    上篇文章最后留了很多坑,讲了NIO使用的弊端,也是为了引出Netty而设立的,这篇文章我们就来好好揭开Netty的神秘面纱。

    本篇文章的目的很简单,希望看过后你能看懂Netty的示例代码,针对于简单的网络通信,自己也能用Netty手写一个开发应用出来!

    一个简单的Netty示例

    以下是一个简单聊天室Server端的程序,代码参考自:http://www.imooc.com/read/82/article/2166

    代码有点长,主要核心代码是在main()方法中,这里代码也希望大家看懂,后面也会一步步剖析。

    PS:我是用mac系统,直接在终端输入telnet 127.0.0.1 8007 即可启动一个聊天框,如果提示找不到telnet命令,可以通过brew进行安装,具体步骤请自行百度。

    /**
     * @Description netty简易聊天室
     *
     * @Author 一枝花算不算浪漫
     * @Date 2020/8/10 6:52 上午
     */
    public final class NettyChatServer {
    
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // 1. EventLoopGroup
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // 2. 服务端引导器
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                // 3. 设置线bootStrap信息
                serverBootstrap.group(bossGroup, workerGroup)
                        // 4. 设置ServerSocketChannel的类型
                        .channel(NioServerSocketChannel.class)
                        // 5. 设置参数
                        .option(ChannelOption.SO_BACKLOG, 100)
                        // 6. 设置ServerSocketChannel对应的Handler,只能设置一个
                        .handler(new LoggingHandler(LogLevel.INFO))
                        // 7. 设置SocketChannel对应的Handler
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                // 可以添加多个子Handler
                                p.addLast(new LoggingHandler(LogLevel.INFO));
                                p.addLast(new ChatNettyHandler());
                            }
                        });
    
                // 8. 绑定端口
                ChannelFuture f = serverBootstrap.bind(PORT).sync();
                // 9. 等待服务端监听端口关闭,这里会阻塞主线程
                f.channel().closeFuture().sync();
            } finally {
                // 10. 优雅地关闭两个线程池
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> {
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                System.out.println("one conn active: " + ctx.channel());
                // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的
                ChatHolder.join((SocketChannel) ctx.channel());
            }
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
                byte[] bytes = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(bytes);
                String content = new String(bytes, StandardCharsets.UTF_8);
                System.out.println(content);
    
                if (content.equals("quit\r\n")) {
                    ctx.channel().close();
                } else {
                    ChatHolder.propagate((SocketChannel) ctx.channel(), content);
                }
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                System.out.println("one conn inactive: " + ctx.channel());
                ChatHolder.quit((SocketChannel) ctx.channel());
            }
        }
    
        private static class ChatHolder {
            static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>();
    
            /**
             * 加入群聊
             */
            static void join(SocketChannel socketChannel) {
                // 有人加入就给他分配一个id
                String userId = "用户"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
                send(socketChannel, "您的id为:" + userId + "\n\r");
    
                for (SocketChannel channel : USER_MAP.keySet()) {
                    send(channel, userId + " 加入了群聊" + "\n\r");
                }
    
                // 将当前用户加入到map中
                USER_MAP.put(socketChannel, userId);
            }
    
            /**
             * 退出群聊
             */
            static void quit(SocketChannel socketChannel) {
                String userId = USER_MAP.get(socketChannel);
                send(socketChannel, "您退出了群聊" + "\n\r");
                USER_MAP.remove(socketChannel);
    
                for (SocketChannel channel : USER_MAP.keySet()) {
                    if (channel != socketChannel) {
                        send(channel, userId + " 退出了群聊" + "\n\r");
                    }
                }
            }
    
            /**
             * 扩散说话的内容
             */
            public static void propagate(SocketChannel socketChannel, String content) {
                String userId = USER_MAP.get(socketChannel);
                for (SocketChannel channel : USER_MAP.keySet()) {
                    if (channel != socketChannel) {
                        send(channel, userId + ": " + content);
                    }
                }
            }
    
            /**
             * 发送消息
             */
            static void send(SocketChannel socketChannel, String msg) {
                try {
                    ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
                    ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length);
                    writeBuffer.writeCharSequence(msg, Charset.defaultCharset());
                    socketChannel.writeAndFlush(writeBuffer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    dkeb0s.png

    代码有点长,执行完的效果如上图所示,下面所有内容都是围绕着如何看懂以及如何写出这样的代码来展开的,希望你看完 也能轻松手写Netty服务端代码~。通过简单demo开发让大家体验了Netty实现相比NIO确实要简单的多,但优点不限于此,只需要知道选择Netty就对了。

    Netty核心组件

    对应着文章开头的思维导图,我们知道Netty的核心组件主要有:

    • Bootstrap && ServerBootstrap
    • EventLoopGroup
    • EventLoop
    • ByteBuf
    • Channel
    • ChannelHandler
    • ChannelFuture
    • ChannelPipeline
    • ChannelHandlerContext

    类图如下:

    dk8ZC9.png

    Bootstrap & ServerBootstrap

    一看到BootStrap大家就应该想到启动类、引导类这样的词汇,之前分析过EurekaServer项目启动类时介绍过EurekaBootstrap, 他的作用就是上下文初始化、配置初始化。

    Netty中我们也有类似的类,BootstrapServerBootstrap它们都是Netty程序的引导类,主要用于配置各种参数,并启动整个Netty服务,我们看下文章开头的示例代码:

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)      
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LoggingHandler(LogLevel.INFO));
                    p.addLast(new ChatNettyHandler());
                }
            });
    

    BootstrapServerBootstrap是针对于ClientServer端定义的两套启动类,区别如下:

    • Bootstrap是客户端引导类,而ServerBootstrap是服务端引导类。
    • Bootstrap通常使用connect()方法连接到远程的主机和端口,作为一个TCP客户端
    • ServerBootstrap通常使用bind()方法绑定本地的端口,等待客户端来连接。
    • ServerBootstrap可以处理Accept事件,这里面childHandler是用来处理Channel请求的,我们可以查看chaildHandler()方法的注解:

    [图片上传失败...(image-2595fc-1598167949595)]

    • Bootstrap客户端引导只需要一个EventLoopGroup,但是一个ServerBootstrap通常需要两个(上面的boosGroupworkerGroup)。

    EventLoopGroup && EventLoop

    EventLoopGroupEventLoop这两个类名称定义的很奇怪,对于初学者来说往往无法通过名称来了解其中的含义,包括我也是这样。

    EventLoopGroup 可以理解为一个线程池,对于服务端程序,我们一般会绑定两个线程池,一个用于处理 Accept 事件,一个用于处理读写事件,看下EventLoop系列的类目录:

    dU4Roj.png

    通过上面的类图,我们才恍然大悟,我的亲娘咧,这不就是一个线程池嘛?(名字气的犄角拐弯的真是难认)

    EventLoopGroupEventLoop的集合,一个EventLoopGroup 包含一个或者多个EventLoop。我们可以将EventLoop看做EventLoopGroup线程池中的一个个工作线程。

    至于这里为什么要用到两个线程池,具体的其实可以参考Reactor设计模式,这里暂时不做过多的讲解。

    • 一个 EventLoopGroup 包含一个或多个 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n
    • 一个 EventLoop 在它的生命周期内,只能与一个 Thread 绑定,即 EventLoop : Thread = 1 : 1
    • 所有有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,从而保证线程安全,即 Thread : EventLoop = 1 : 1
    • 一个 Channel 在它的生命周期内只能注册到一个 EventLoop 上,即 Channel : EventLoop = n : 1
    • 一个 EventLoop 可被分配至一个或多个 Channel ,即 EventLoop : Channel = 1 : n

    当一个连接到达时,Netty 就会创建一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 来给这个 Channel 绑定上,在该 Channel 的整个生命周期中都是有这个绑定的 EventLoop 来服务的。

    ByteBuf

    Java NIO中我们有 ByteBuffer缓冲池,对于它的操作我们应该印象深刻,往Buffer中写数据时我们需要关注写入的位置,切换成读模式时我们还要切换读写状态,不然将会出现大问题。

    针对于NIO中超级难用的Buffer类, Netty 提供了ByteBuf来替代。ByteBuf声明了两个指针:一个读指针,一个写指针,使得读写操作进行分离,简化buffer的操作流程。

    dkQocV.png

    另外Netty提供了发几种ByteBuf的实现以供我们选择,ByteBuf可以分为:

    • PooledUnpooled 池化和非池化
    • Heap 和 Direct,堆内存和堆外内存,NIO中创建Buffer也可以指定
    • Safe 和 Unsafe,安全和非安全
    dkJ9TU.png

    对于这么多种创建Buffer的方式该怎么选择呢?Netty也为我们处理好了,我们可以直接使用(真是暖男Ntetty):

    ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
    ByteBuf buffer = allocator.buffer(length);
    

    使用这种方式,Netty将最大努力的使用池化、Unsafe、对外内存的方式为我们创建buffer。

    Channel

    提起Channel并不陌生,上一篇讲NIO的三大组件提到过,最常见的就是java.nio.SocketChanneljava.nio.ServerSocketChannel,他们用于非阻塞的I/0操作。类似于NIOChannel,Netty提供了自己的Channel和其子类实现,用于异步I/0操作和其他相关的操作。

    Netty 中, Channel 是一个 Socket 连接的抽象, 它为用户提供了关于底层 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作。每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例。并且,有父子channel的概念。 服务器连接监听的channel ,也叫 parent channel。 对应于每一个 Socket 连接的channel,也叫 child channel

    既然channel 是 Netty 抽象出来的网络 I/O 读写相关的接口,为什么不使用JDK NIO 原生的 Channel 而要另起炉灶呢,主要原因如下:

    • JDKSocketChannelServersocketChannel没有统一的 Channel 接口供业务开发者使用,对一于用户而言,没有统一的操作视图,使用起来并不方便。
    • JDKSocketChannelScrversockctChannel的主要职责就是网络 I/O 操作,由于他们是SPI 类接口,由具体的虚拟机厂家来提供,所以通过继承 SPI 功能直接实现 ServersocketChannelSocketChannel 来扩展其工作量和重新Channel 功类是差不多的。
    • Netty 的 ChannelPipeline Channel 需要够跟 Netty 的整体架构融合在一起,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些JDK SocketChannelServersocketChannel都没有提供,需要重新封装。
    • 自定义的 Channel ,功实现更加灵活。

    基于上述 4 原因,它的设计原理比较简单, Netty 重新设计了 Channel 接口,并且给予了很多不同的实现。但是功能却比较繁杂,主要的设计理念如下:

    • Channel 接口层,相关联的其他操作封装起来,采用 Facade 模式进行统一封装,将网络 I/O 操作、网络 I/O 统一对外提供。
    • Channel 接口的定义尽量大而全,统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。
    • 具体实现采用聚合而非包含的方式,将相关的功类聚合在 Channel中,由 Channel 统一负责分配和调度,功能实现更加灵活。

    Channel的实现类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个 NioServerSocketChannelNioSocketChannel

    服务端 NioServerSocketChannel的继承关系类图如下:

    dUn8G4.png

    客户端 NioSocketChannel的继承关系类图如下:

    dUnJz9.png

    后面文章源码系列会具体分析,这里就不进一步阐述分析了。

    ChannelHandler

    ChannelHandlerNetty中最常用的组件。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。

    ChannelHandler 有两个核心子类 ChannelInboundHandlerChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站( Inbound )的数据和事件,而 ChannelOutboundHandler 则相反,用于接收、处理出站( Outbound )的数据和事件。

    dkJAp9.png

    ChannelInboundHandler

    ChannelInboundHandler处理入站数据以及各种状态变化,当Channel状态发生改变会调用ChannelInboundHandler中的一些生命周期方法.这些方法与Channel的生命密切相关。

    入站数据,就是进入socket的数据。下面展示一些该接口的生命周期API

    dUntMR.png

    当某个 ChannelInboundHandler的实现重写 channelRead()方法时,它将负责显式地释放与池化的 ByteBuf 实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()

    @Sharable
    public class DiscardHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ReferenceCountUtil.release(msg);
        }
    }
    

    这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler,重写channelRead0()方法,就可以在调用过程中会自动释放资源.

    public class SimpleDiscardHandler
        extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
                                        Object msg) {
                // 不用调用ReferenceCountUtil.release(msg)也会释放资源
        }
    }
    

    ChannelOutboundHandler

    出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 ChannelChannelPipeline以及 ChannelHandlerContext 调用。
    ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如, 如果到远程节点的写入被暂停了, 那么你可以推迟冲刷操作并在稍后继续。

    d0PxbT.png

    ChannelPromiseChannelFuture: ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数, 以便在操作完成时得到通知。 ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()setFailure(),从而使ChannelFuture不可变。

    ChannelHandlerAdapter

    ChannelHandlerAdapter顾名思义,就是handler的适配器。你需要知道什么是适配器模式,假设有一个A接口,我们需要A的subclass实现功能,但是B类中正好有我们需要的功能,不想复制粘贴B中的方法和属性了,那么可以写一个适配器类Adpter继承B实现A,这样一来Adapter是A的子类并且能直接使用B中的方法,这种模式就是适配器模式。

    就比如Netty中的SslHandler类,想使用ByteToMessageDecoder中的方法进行解码,但是必须是ChannelHandler子类对象才能加入到ChannelPipeline中,通过如下签名和其实现细节(SslHandler实现细节就不贴了)就能够作为一个handler去处理消息了。

    public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
    

    ChannelHandlerAdapter提供了一些实用方法isSharable()如果其对应的实现被标注为Sharable, 那么这个方法将返回 true, 表示它可以被添加到多个 ChannelPipeline中 。如果想在自己的ChannelHandler中使用这些适配器类,只需要扩展他们,重写那些想要自定义的方法即可。

    ChannelPipeline

    每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的; Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

    Netty 的 ChannelHandler 为处理器提供了基本的抽象, 目前你可以认为每个 ChannelHandler 的实例都类似于一种为了响应特定事件而被执行的回调。从应用程序开发人员的角度来看, 它充当了所有处理入站和出站数据的应用程序逻辑的拦截载体。ChannelPipeline提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline

    ChannelHandler 安装到 ChannelPipeline 中的过程如下所示:

    • 一个ChannelInitializer的实现被注册到了ServerBootstrap
    • ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在 ChannelPipeline中安装一组自定义的 ChannelHandler
    • ChannelInitializer 将它自己从 ChannelPipeline中移除
    dkJuTO.png

    如上图所示:这是一个同时具有入站和出站 ChannelHandlerChannelPipeline的布局,并且印证了我们之前的关于 ChannelPipeline主要由一系列的 ChannelHandler 所组成的说法。 ChannelPipeline还提供了通过 ChannelPipeline 本身传播事件的方法。如果一个入站事件被触发,它将被从 ChannelPipeline的头部开始一直被传播到 Channel Pipeline 的尾端。

    你可能会说, 从事件途经 ChannelPipeline的角度来看, ChannelPipeline的头部和尾端取决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline的入站口(图 的左侧)作为头部,而将出站口(该图的右侧)作为尾端。
    当你完成了通过调用 ChannelPipeline.add*()方法将入站处理器( ChannelInboundHandler)和 出 站 处 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline之 后 , 每 一 个ChannelHandler 从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图 6-3 中的处理器( ChannelHandler)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的 ChannelHandler将是 5。

    ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 Channel�Handler 的类型是否和事件的运动方向相匹配。如果不匹配, ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。 (当然, ChannelHandler也可以同时实现ChannelInboundHandler接口和 ChannelOutboundHandler 接口。)

    修改ChannelPipeline

    修改指的是添加或删除ChannelHandler,见代码示例:

    ChannelPipeline pipeline = ..;
    FirstHandler firstHandler = new FirstHandler();
    // 先添加一个Handler到ChannelPipeline中
    pipeline.addLast("handler1", firstHandler);
    // 这个Handler放在了first,意味着放在了handler1之前
    pipeline.addFirst("handler2", new SecondHandler());
    // 这个Handler被放到了last,意味着在handler1之后
    pipeline.addLast("handler3", new ThirdHandler());
    ...
    // 通过名称删除
    pipeline.remove("handler3");
    // 通过对象删除
    pipeline.remove(firstHandler);
    // 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例
    pipeline.replace("handler2", "handler4", new ForthHandler());
    

    ChannelPipeline的出入站API

    入站API所示:

    [图片上传失败...(image-6037f5-1598167949595)]

    出站API所示:

    dUndZ6.png

    ChannelPipeline 这个组件上面所讲的大致只需要记住这三点即可:

    • ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler
    • ChannelPipeline可以根据需要,通过添加或者删除 ChannelHandler 来动态地修改
    • ChannelPipeline有着丰富的API用以被调用,以响应入站和出站事件

    ChannelHandlerContext

    ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext ,它代表了 ChannelHandlerChannelPipeline 之间的绑定。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler和在同一个 ChannelPipeline 中的其他ChannelHandler之间的交互。

    如果调用ChannelChannelPipeline上的方法,会沿着整个ChannelPipeline传播,如果调用ChannelHandlerContext上的相同方法,则会从对应的当前ChannelHandler进行传播。

    ChannelHandlerContext API如下表所示:

    dUn0IO.png
    • ChannelHandlerContextChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
    • 如同在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流, 应该尽可能地利用这个特性来获得最大的性能。

    ChannelHandlerChannelPipeline的关联使用

    dUnDiD.png

    ChannelHandlerContext访问channel

    ChannelHandlerContext ctx = ..;
    // 获取channel引用
    Channel channel = ctx.channel();
    // 通过channel写入缓冲区
    channel.write(Unpooled.copiedBuffer("Netty in Action",
    CharsetUtil.UTF_8));
    

    ChannelHandlerContext访问ChannelPipeline

    ChannelHandlerContext ctx = ..;
    // 获取ChannelHandlerContext
    ChannelPipeline pipeline = ctx.pipeline();
    // 通过ChannelPipeline写入缓冲区
    pipeline.write(Unpooled.copiedBuffer("Netty in Action",
    CharsetUtil.UTF_8));
    
    dUnrJe.png

    有时候我们不想从头传递数据,想跳过几个handler,从某个handler开始传递数据.我们必须获取目标handler之前的handler关联的ChannelHandlerContext

    ChannelHandlerContext ctx = ..;
    // 直接通过ChannelHandlerContext写数据,发送到下一个handler
    ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
    
    dUnyzd.png

    好了,ChannelHandlerContext的基本使用应该掌握了,但是你真的理解ChannelHandlerContext,ChannelPipelineChannelhandler之间的关系了吗?不理解也没关系,因为源码以后会帮你理解的更为深刻。

    核心组件之间的关系

    • 一个 Channel对应一个 ChannelPipeline
    • 一个 ChannelPipeline 包含一条双向的 ChannelHandlerContext
    • 一个 ChannelHandlerContext中包含一个ChannelHandler
    • 一个 Channel会绑定到一个EventLoop
    • 一个 NioEventLoop 维护了一个 Selector(使用的是 Java 原生的 Selector)
    • 一个 NioEventLoop 相当于一个线程

    粘包拆包问题

    粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生粘包拆包问题,而因此粘包拆包问题只发生在TCP协议中。具体讲TCP是个”流"协议,只有流的概念,没有包的概念,对于业务上层数据的具体含义和边界并不了解,它只会根据TCP缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    问题举例说明

    下面针对客户端分别发送了两个数据表Packet1Packet2给服务端的时候,TCP粘包和拆包会出现的情况进行列举说明:

    (1)第一种情况,服务端分两次正常收到两个独立数据包,即没有发生拆包和粘包的现象;

    dUncQA.png

    (2)第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了客户端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于服务接收端来说很难处理。

    dUn2Lt.png

    (3)第三种情况,服务端分两次读取到了两个数据包,第一次读取到了完整的Packet1Packet2包的部分内容,第二次读取到了Packet2的剩余内容,这被称为TCP拆包;

    d0Pq8s.png

    (4)第四种情况,服务端分两次读取到了两个数据包,第一次读取到了部分的Packet1内容,第二次读取到了Packet1剩余内容和Packet2的整包。

    dUn5FS.png

    如果此时服务端TCP接收滑窗非常小,而数据包Packet1Packet2比较大,很有可能服务端需要分多次才能将两个包接收完全,期间发生多次拆包。以上列举情况的背后原因分别如下:

    1. 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
    2. 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
    3. 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。
    4. 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

    如何基于Netty处理粘包、拆包问题

    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

    1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
    2. 在包尾增加回车换行符进行分割,例如FTP协议;
    3. 将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
    4. 更复杂的应用层协议。

    之前Netty示例中其实并没有考虑读半包问题,这在功能测试往往没有问题,但是一旦请求数过多或者发送大报文之后,就会存在该问题。如果代码没有考虑,往往就会出现解码错位或者错误,导致程序不能正常工作,下面看看Netty是如何根据主流的解决方案进行抽象实现来帮忙解决这一问题的。

    如下表所示,Netty为了找出消息的边界,采用封帧方式:

    方式 解码 编码
    固定长度 FixedLengthFrameDecoder 简单
    分隔符 DelimiterBasedFrameDecoder 简单
    专门的 length 字段 LengthFieldBasedFrameDecoder LengthFieldPrepender

    注意到,Netty提供了对应的解码器来解决对应的问题,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和半包问题。为什么这么说呢?下面列举一个包尾增加分隔符的例子:

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Author: wuxiaofei
     * @Date: 2020/8/15 0015 19:15
     * @Version: 1.0
     * @Description:入站处理器
     */
    @ChannelHandler.Sharable
    public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
    
        private AtomicInteger counter = new AtomicInteger(0);
        private AtomicInteger completeCounter = new AtomicInteger(0);
    
        /*** 服务端读取到网络数据后的处理*/
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf)msg;
            String request = in.toString(CharsetUtil.UTF_8);
            System.out.println("Server Accept["+request
                    +"] and the counter is:"+counter.incrementAndGet());
            String resp = "Hello,"+request+". Welcome to Netty World!"
                    + DelimiterEchoServer.DELIMITER_SYMBOL;
            ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
        }
    
        /*** 服务端读取完成网络数据后的处理*/
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            ctx.fireChannelReadComplete();
            System.out.println("the ReadComplete count is "
                    +completeCounter.incrementAndGet());
        }
    
        /*** 发生异常后的处理*/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    
    import java.net.InetSocketAddress;
    
    /**
     * @Author: wuxiaofei
     * @Date: 2020/8/15 0015 19:17
     * @Version: 1.0
     * @Description:服务端
     */
    public class DelimiterEchoServer {
    
        public static final String DELIMITER_SYMBOL = "@~";
        public static final int PORT = 9997;
    
        public static void main(String[] args) throws InterruptedException {
            DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
            System.out.println("服务器即将启动");
            delimiterEchoServer.start();
        }
    
        public void start() throws InterruptedException {
            final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
                ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
                b.group(group)/*将线程组传入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                    /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                    所以下面这段代码的作用就是为这个子channel增加handle*/
                    .childHandler(new ChannelInitializerImp());
                ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
                System.out.println("服务器启动完成,等待客户端的连接和数据.....");
                f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
            } finally {
                group.shutdownGracefully().sync();/*优雅关闭线程组*/
            }
        }
    
        private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
                        .getBytes());
                //服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
                ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                        delimiter));
                ch.pipeline().addLast(new DelimiterServerHandler());
            }
        }
    
    }
    
    

    添加到ChannelPipelineDelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码,当然还有没有用到的FixedLengthFrameDecoder用于对固定长度的消息进行自动解码等解码器。正如上门的代码使用案例,有了Netty提供的几码器可以轻松地完成对很多消息的自动解码,而且不需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。

    Netty示例代码详解

    相信看完上面的铺垫,你对Netty编码有了一定的了解了,下面再来整体梳理一遍吧。

    dVp7yn.png

    1、设置EventLoopGroup线程组(Reactor线程组)

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    上面我们说过Netty中使用Reactor模式,bossGroup表示服务器连接监听线程组,专门接受 Accept 新的客户端client 连接。另一个workerGroup表示处理每一连接的数据收发的线程组,来处理消息的读写事件。

    2、服务端引导器

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    

    集成所有配置,用来启动Netty服务端。

    3、设置ServerBootstrap信息

    serverBootstrap.group(bossGroup, workerGroup);
    

    将两个线程组设置到ServerBootstrap中。

    4、设置ServerSocketChannel类型

    serverBootstrap.channel(NioServerSocketChannel.class);
    

    设置通道的IO类型,Netty不止支持Java NIO,也支持阻塞式IO,例如OIOOioServerSocketChannel.class)

    5、设置参数

    serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
    

    通过option()方法可以设置很多参数,这里SO_BACKLOG标识服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128,这里设置的是100。

    6、设置Handler

    serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
    

    设置 ServerSocketChannel对应的Handler,这里只能设置一个,它会在SocketChannel建立起来之前执行。

    7、设置子Handler

    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            p.addLast(new LoggingHandler(LogLevel.INFO));
            p.addLast(new ChatNettyHandler());
        }
    });
    

    Netty中提供了一种可以设置多个Handler的途径,即使用ChannelInitializer方式。ChannelPipelineNetty处理请求的责任链,这是一个ChannelHandler的链表,而ChannelHandler就是用来处理网络请求的内容的。

    每一个channel,都有一个处理器流水线。装配child channel流水线,调用childHandler()方法,传递一个ChannelInitializer 的实例。

    child channel 创建成功,开始通道初始化的时候,在bootstrap启动器中配置的ChannelInitializer 实例就会被调用。

    这个时候,才真正的执行去执行 initChannel 初始化方法,开始通道流水线装配。

    流水线装配,主要是在流水线pipeline的后面,增加负责数据读写、处理业务逻辑的handler

    处理器 ChannelHandler 用来处理网络请求内容,有ChannelInboundHandlerChannelOutboundHandler两种,ChannlPipeline会从头到尾顺序调用ChannelInboundHandler处理网络请求内容,从尾到头调用ChannelOutboundHandler处理网络请求内容

    8、绑定端口号

    ChannelFuture f = serverBootstrap.bind(PORT).sync();
    

    绑定端口号

    9、等待服务端端口号关闭

    f.channel().closeFuture().sync();
    

    等待服务端监听端口关闭,sync()会阻塞主线程,内部调用的是 Objectwait()方法

    10、关闭EventLoopGroup线程组

    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    

    总结

    这篇文章主要是从一个demo作为引子,然后介绍了Netty的包结构、Reactor模型、编程规范等等,目的很简单,希望你能够读懂这段demo并写出来。

    后面开始继续Netty源码解析部分,敬请期待。

    参考资料

    1. 《Netty in Action》书籍
    2. 慕课Netty专栏
    3. 掘金闪电侠Netty小册
    4. 芋道源码Netty专栏
    5. Github[fork from krcys]

    感谢Netty专栏作者们优秀的文章内容~

    相关文章

      网友评论

        本文标题:Netty之旅二:口口相传的高性能Netty到底是什么?

        本文链接:https://www.haomeiwen.com/subject/jaoijktx.html