美文网首页手写RPC框架
手写RPC框架(5)-Netty入门了解和实践

手写RPC框架(5)-Netty入门了解和实践

作者: jwfy | 来源:发表于2019-11-03 17:52 被阅读0次

    手写RPC框架
    1、手写一个RPC框架,看看100个线程同时调用效果如何
    2、手写RPC框架(2)-引入zookeeper做服务治理
    3、手写RPC框架(3)-引入Hessian序列化工具
    4、手写RPC框架(4)-重写服务治理,开启1000个线程看看netty的执行调用情况

    Netty是基于NIO的的服务框架,屏蔽了使用Java原生NIO网络模型的各种问题,对外提供灵活的Reactor模型配置,也提供了插拔式的Handler处理器,便于支持各种网络协议和特定业务等操作,也是异步事件驱动,使得性能能够更高。此前RPC中关于Netty的代码逻辑存在些问题,对Netty的一些概念也没有理解到位,所以这次就一起再学习Netty,先写一个demo有大致的了解和印象,随后通过问题介绍各个组件的功能和特点,其原因是什么。

    • 粘包、拆包是什么情况,为什么会发生这种情况?
    • pipeline 和 handler是什么关系?
    • pipeline.addLast的顺序是如何执行的?
    • handler中的各个fireXXX执行顺序是怎样的?
    • 为什么server是2个EventLoopGroup,而client却只有1个EventLoopGroup?

    Demo

    运行效果如下图

    image

    服务端

    public class Server {
        public void run(int port) throws InterruptedException {
            EventLoopGroup workGroup = new NioEventLoopGroup();
            EventLoopGroup bossGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeServerHandler());
                            }
                        });
    
                ChannelFuture cf = serverBootstrap.bind(port).sync();
                cf.channel().closeFuture().sync();
            } finally {
                workGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Server().run(10002);
        }
    }
    
    public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered ...");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelUnregistered ...");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive ...");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelInactive ...");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("Client:[" + body + "]");
    
            String cur = ("Hello, My name is jwfy".equalsIgnoreCase(body) ? "OK" : "ERROR") + System.getProperty("line.separator");
            ctx.writeAndFlush(Unpooled.copiedBuffer(cur.getBytes()));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelReadComplete ...");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端

    public class Client {
        public void connection(String host, int port) throws InterruptedException {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });
    
                ChannelFuture cf = bootstrap.connect(host, port).sync();
                cf.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Client().connection("127.0.0.1", 10002);
        }
    }
    
    public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered ...");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelUnregistered ...");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive ...");
            for(int i = 0; i < 1; i++ ) {
                ctx.writeAndFlush(Unpooled.copiedBuffer(("Hello, My name is jwfy" + System.getProperty("line.separator")).getBytes()));
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelInactive ...");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("Server:[" + body + "]");
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            super.channelReadComplete(ctx);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    EventLoopGroup

    EventLoopGroup 是一种Reactor多线程模型的抽象,具体实现一般都是NioEventLoopGroup。而Reactor模型又有单线程、多线程、以及主从多线程模型,他们有什么区别呢?

    单线程模型

    image

    1个NIO线程原则上可以负责所有IO相关的请求操作,通过acceptor接收客户端发生的TCP请求,当链接建立成功之后,通过Dispatch将对于的请求数据包装成bytebuf指派给相关的handler处理。但是这在某些场景下也不太合适。

    • 一个NIO线程同时管理成百上千的客户端链接,会严重影响性能
    • 当NIO线程的负载很高时,导致处理速度变慢,同时还可能因为某一请求影响整个NIO线程的工作,进而影响其他端口的处理请求。故有了多线程模型。

    多线程模型

    image

    同样是一个NIO线程接收客户端的请求调用,当链接完成后请求会分配给一个NIO线程池,具体的消息序列化反序列化、数据处理等任务可有NIO线程池中的线程完成。

    同样的基本情况下是没有问题的,但是多个客户端连接依旧可能出现性能问题,故有了主从多线程模型

    主从多线程模型

    主从情况就是从一个NIO线程变成了一个NIO线程池,可同时由多个NIO线程处理客户端的请求连接操作,减少因为性能不足导致的问题,这也是netty推荐的使用方法

    EventLoopGroup 则也是一个NIO线程池,即可用于客户端的TCP请连接求,也可用于数据的IO处理,所以在上述代码中观察发现服务端和客户端的EventLoopGroup个数不一样也是这个道理,服务端一个线程池用来接收客户端连接,另一个则用来进行读写IO操作。

    粘包、拆包

    众所周知,网络上的传输的都是字节流,从TCP/IP协议角度出发无法知道具体的业务数据组装情况,所以实际场景中一个请求可能被分批次传输,也有可能因为请求数据太少故打包多个请求统一传输

    image

    如上图,正常的情况是分别有D1和D2两个数据包发送到服务端,但是因为网络拥塞比较严重,滑动窗口自适应的缩小,使得1个缓冲区的大小无法装满整个请求体,就会出现拆包的情况;又例如请求体内容较少,无法填充完整缓冲区,那么就会等待多个请求把缓冲区填满再发送出去,就会出现粘包的情况,如下距离:

    • D2和D1 同时发送到服务端,那么服务端则需要正确的进行拆分处理,否则反序列化会失败
    • D1和D2 的一部分D2_1 同时发送到服务端,服务端除了需要把D1拆出来,还需要等待D2_2的到来才能开始处理D2数据

    必须首先处理好拆包和粘包问题,才能保证收到正常的完整的消息,而netty则帮我们解决了大部分问题了,例如根据长度拆分(FixedLengthFrameDecoder),根据换行符拆分(LineBasedFrameDecoder),又或者分割符拆分(DelimiterBasedFrameDecoder),只是在本Demo中使用的是换行符切分的LineBasedFrameDecoder

    ChannelPipeline 和 ChannelHandler

    image

    ChannelPipeline 是一个拥有头(Head)和尾(Tail)的双向链式容器,可自由添加不同的handler处理器以满足不同的业务需求。同时因为有从外界读取数据和发送数据两种场景,所以有inbound和outbound两种情况

    image

    ChannelHandler 则是具体的处理器,可通过addLast方式添加到pipeline管道链路上,如粘包说的LineBasedFrameDecoder也是一种具体的handler处理器。demo中提到的添加自定义handler代码块如下所示

    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new TimeServerHandler());
    

    通过这种添加方式形成了下面的链路

    HEAD                                             TAIL
    LineBasedFrameDecoder -> StringDecoder ->  TimeServerHandler
    
    • netty读取的规则是从head开始的,先进行拆分、粘包的处理,再反序列化,后面交由具体的业务处理器,是inbound类处理器
    • netty写出的规则是从tail开始的,先进行数据的序列号,再发送出去,是outbound类处理器

    这整个链路是比较清晰完整的,如果把StringDecoder和LineBasedFrameDecoder的处理器顺序换一下,则会发现出现错误,如下图

    image

    圈住的地方换行符就是我们代码中添加的 System.getProperty("line.separator") 换行导致,因为这个就1次调用,所以会发现只进行了字符串的转换,并没有进行拆包处理,再次把请求的数据量加大些,再测试看看

    image

    会发现服务端接收到的数据全部错误了,没有一个正确,切记不要把handler处理器顺序搞错,如下图是netty源码中关于顺序的说明情况。

    image

    Handler 生命周期

    handler在处理的时候是有着一定的顺序,例如服务端先接收请求的注册,等到TCP/IP三次握手完成后,相当于channel激活完成,开始接受客户端正常的请求调用,然后返回响应结果等,客户端关闭后,服务端也需要进行取消激活,关闭注册的操作,以放弃该channel的管理操作。通过对其各个步骤的生命周期的管理,可以实现自定义的各种管理和控制。fireXXX又被包装成类似于channelRegistered的名字,如下图的调用过程

    **该图来自 http://www.jiangxindc.com/view/2398**

    如本demo的运行结果也可以很明显的看出其执行链路。

    image

    结束

    到此netty的学习就结束了,并没有介绍的太深入,也只是把常用的组件知识梳理了一遍,以便于我们在使用netty的时候注意到这些问题,以发挥netty的最大功效,文中很多内容都参考自《Netty权威指南》,大家如果有兴趣的话可以自行阅读学习和加强理解,下一期将会进行RPC代码中的netty部门的改造。

    如代码存在的问题欢迎提出~

    相关文章

      网友评论

        本文标题:手写RPC框架(5)-Netty入门了解和实践

        本文链接:https://www.haomeiwen.com/subject/xoklbctx.html