美文网首页
2023-11-30跟着源码学IM(十二):基于Netty打造一

2023-11-30跟着源码学IM(十二):基于Netty打造一

作者: jackjiang20212 | 来源:发表于2023-11-29 11:43 被阅读0次

    本文由竹子爱熊猫分享,原题“(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序”,本文有修订和改动。

    1、引言

    关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。

    原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。

    2、配套源码

    本文配套源码的开源托管地址是:

    1)主地址:https://github.com/liuhaijieAdmin/springboot-netty

    2)备地址:https://github.com/52im/springboot-netty

    3、知识准备

    关于 Netty 是什么,这里简单介绍下:

    Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

    Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

    有关Netty的入门文章:

    1)新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

    2)写给初学者:Java高性能NIO框架Netty的学习方法和进阶策略

    3)史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战

    如果你连Java NIO都不知道,下面的文章建议优先读:

    1)少啰嗦!一分钟带你读懂Java的NIO和经典IO的区别

    2)史上最强Java NIO入门:担心从入门到放弃的,请读这篇!

    3)Java的BIO和NIO很难懂?用代码实践给你看,再不懂我转行!

    Netty源码和API 在线查阅地址:

    1)Netty-4.1.x 完整源码(在线阅读版)

    2)Netty-4.1.x API文档(在线版)

    4、基于Netty设计通信协议

    协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。

    当自己想要打造一款IM通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。

    但为什么需要通信协议呢?

    因为TCP/IP中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。

    5、Redis通信的RESP协议参考学习

    不知大家是否还记得之前我聊到的RESP客户端协议,这是Redis提供的一种客户端通信协议。如果想要操作Redis,就必须遵守该协议的格式发送数据。

    这个协议特别简单,如下:

    1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;

    2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;

    3)最后再拼接上具体的子命令,同样用换行符分割。

    这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。

    如下:

    客户端命令:

        setname ZhuZi

    转变为RESP指令:

        *3

        $3

        set

        $4

        name

        $5

        ZhuZi

    按照Redis的规定,但凡满足RESP协议的客户端,都可以直接连接并操作Redis服务端,这也就意味着咱们可以直接通过Netty来手写一个Redis客户端。

    代码如下:

    // 基于Netty、RESP协议实现的Redis客户端

    publicclassRedisClient {

        // 换行符的ASCII码

        staticfinalbyte[] LINE = {13, 10};

        publicstaticvoidmain(String[] args) {

            EventLoopGroup worker = newNioEventLoopGroup();

            Bootstrap client = newBootstrap();

            try{

                client.group(worker);

                client.channel(NioSocketChannel.class);

                client.handler(newChannelInitializer<SocketChannel>() {

                    @Override

                    protectedvoidinitChannel(SocketChannel socketChannel)

                                                            throwsException {

                        ChannelPipeline pipeline = socketChannel.pipeline();

                        pipeline.addLast(newChannelInboundHandlerAdapter(){

                            // 通道建立成功后调用:向Redis发送一条set命令

                            @Override

                            publicvoidchannelActive(ChannelHandlerContext ctx)

                                                                throwsException {

                                String command = "set name ZhuZi";

                                ByteBuf buffer = respCommand(command);

                                ctx.channel().writeAndFlush(buffer);

                            }

                            // Redis响应数据时触发:打印Redis的响应结果

                            @Override

                            publicvoidchannelRead(ChannelHandlerContext ctx,

                                                    Object msg) throwsException {

                                // 接受Redis服务端执行指令后的结果

                                ByteBuf buffer = (ByteBuf) msg;

                                System.out.println(buffer.toString(CharsetUtil.UTF_8));

                            }

                        });

                    }

                });

                // 根据IP、端口连接Redis服务端

                client.connect("192.168.12.129", 6379).sync();

            } catch(Exception e){

                e.printStackTrace();

            }

        }

        privatestaticByteBuf respCommand(String command){

            // 先对传入的命令以空格进行分割

            String[] commands = command.split(" ");

            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

            // 遵循RESP协议:先写入指令的个数

            buffer.writeBytes(("*"+ commands.length).getBytes());

            buffer.writeBytes(LINE);

            // 接着分别写入每个指令的长度以及具体值

            for(String s : commands) {

                buffer.writeBytes(("$"+ s.length()).getBytes());

                buffer.writeBytes(LINE);

                buffer.writeBytes(s.getBytes());

                buffer.writeBytes(LINE);

            }

            // 把转换成RESP格式的命令返回

            returnbuffer;

        }

    }

    在上述这个案例中,也仅仅只是通过respCommand()这个方法,对用户输入的指令进行了转换。同时在上面通过Netty,与Redis的地址、端口建立了连接。在连接建立成功后,就会向Redis发送一条转换成RESP指令的set命令。接着等待Redis的响应结果并输出,如下:

    +OK

    因为这是一条写指令,所以当Redis收到执行完成后,最终就会返回一个OK,大家也可直接去Redis中查询,也依旧能够查询到刚刚写入的name这个键值。

    6、HTTP超文本传输协议参考学习

    前面咱们自己针对于Redis的RESP协议,对用户指令进行了封装,然后发往Redis执行。

    但对于这些常用的协议,Netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。

    比如现在咱们可以基于Netty提供的处理器,实现一个简单的HTTP服务器。

    代码如下:

    // 基于Netty提供的处理器实现HTTP服务器

    publicclassHttpServer {

        publicstaticvoidmain(String[] args) throwsInterruptedException {

            EventLoopGroup boss = newNioEventLoopGroup();

            EventLoopGroup worker = newNioEventLoopGroup();

            ServerBootstrap server = newServerBootstrap();

            server

                .group(boss,worker)

                .channel(NioServerSocketChannel.class)

                .childHandler(newChannelInitializer<NioSocketChannel>() {

                    @Override

                    protectedvoidinitChannel(NioSocketChannel ch) {

                        ChannelPipeline pipeline = ch.pipeline();

                        // 添加一个Netty提供的HTTP处理器

                        pipeline.addLast(newHttpServerCodec());

                        pipeline.addLast(newChannelInboundHandlerAdapter() {

                            @Override

                            publicvoidchannelRead(ChannelHandlerContext ctx,

                                                    Object msg) throwsException {

                                // 在这里输出一下消息的类型

                                System.out.println("消息类型:"+ msg.getClass());

                                super.channelRead(ctx, msg);

                            }

                        });

                        pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {

                            @Override

                            protectedvoidchannelRead0(ChannelHandlerContext ctx,

                                                        HttpRequest msg) throwsException {

                                System.out.println("客户端的请求路径:"+ msg.uri());

                                // 创建一个响应对象,版本号与客户端保持一致,状态码为OK/200

                                DefaultFullHttpResponse response =

                                        newDefaultFullHttpResponse(

                                                msg.protocolVersion(),

                                                HttpResponseStatus.OK);

                                // 构造响应内容

                                byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

                                // 设置响应头:告诉客户端本次响应的数据长度

                                response.headers().setInt(

                                    HttpHeaderNames.CONTENT_LENGTH,content.length);

                                // 设置响应主体

                                response.content().writeBytes(content);

                                // 向客户端写入响应数据

                                ctx.writeAndFlush(response);

                            }

                        });

                    }

                })

                .bind("127.0.0.1",8888)

                .sync();

        }

    }

    在该案例中,咱们就未曾手动对HTTP的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个HttpServerCodec处理器,这个处理器是Netty官方提供的。

    其类继承关系如下:

    publicfinalclassHttpServerCodec

        extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>

        implementsSourceCodec {

        // ......

    }

    观察会发现,该类继承自CombinedChannelDuplexHandler这个组合类,它组合了编码器、解码器。

    这也就意味着HttpServerCodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。

    同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。

    此时在浏览器输入http://127.0.0.1:8888/index.html,结果如下:

    消息类型:classio.netty.handler.codec.http.DefaultHttpRequest

    消息类型:classio.netty.handler.codec.http.LastHttpContent$1

    客户端的请求路径:/index.html

    此时来看结果,客户端的请求会被解析成两个部分:

    1)第一个是请求信息;

    2)第二个是主体信息。

    但按理来说浏览器发出的请求,属于GET类型的请求,GET请求是没有请求体信息的,但Netty依旧会解析成两部分~,只不过GET请求的第二部分是空的。

    在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

    在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。

    7、自定义消息传输协议

    7.1概述

    Netty除开提供了HTTP协议的处理器外,还提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列协议的实现,具体定义位于io.netty.handler.codec这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。

    很多基于Netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。

    所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。

    7.2自定义协议的要素

    在自定义传输协议时,咱们必然需要考虑几个因素,如下:

    1)魔数:用来第一时间判断是否为自己需要的数据包;

    2)版本号:提高协议的拓展性,方便后续对协议进行升级;

    3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如Json、ProtoBuf、JDK...;

    4)消息类型:第一时间判断出当前消息的类型;

    5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;

    6)正文长度:提供给LTC解码器使用,防止解码时出现粘包、半包的现象;

    7)消息正文:本次消息要传输的具体数据。

    在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。

    基于上述几个字段,能够在第一时间内判断出:

    1)消息是否可用;

    2)当前协议版本;

    3)消息的具体类型;

    4)消息的长度等各类信息。

    从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。

    7.3自定义协议实战

    前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。

    因此实现如下:

    @ChannelHandler.Sharable

    publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {

        // 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)

        @Override

        protectedvoidencode(ChannelHandlerContext ctx, Message msg,

                              List<Object> list) throwsException {

            ByteBuf outMsg = ctx.alloc().buffer();

            // 前五个字节作为魔数

            byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};

            outMsg.writeBytes(magicNumber);

            // 一个字节作为版本号

            outMsg.writeByte(1);

            // 一个字节表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....

            outMsg.writeByte(0);

            // 一个字节用于表示消息类型

            outMsg.writeByte(msg.getMessageType());

            // 四个字节表示消息序号

            outMsg.writeInt(msg.getSequenceId());

            // 使用Java-Serializable的方式对消息对象进行序列化

            ByteArrayOutputStream bos = newByteArrayOutputStream();

            ObjectOutputStream oos = newObjectOutputStream(bos);

            oos.writeObject(msg);

            byte[] msgBytes = bos.toByteArray();

            // 使用四个字节描述消息正文的长度

            outMsg.writeInt(msgBytes.length);

            // 将序列化后的消息对象作为消息正文

            outMsg.writeBytes(msgBytes);

            // 将封装好的数据传递给下一个处理器

            list.add(outMsg);

        }

        // 消息入站时会经过的解码方法(将自定义格式的消息转变为具体的消息对象)

        @Override

        protectedvoiddecode(ChannelHandlerContext ctx,

                              ByteBuf inMsg, List<Object> list) throwsException {

            // 读取前五个字节得到魔数

            byte[] magicNumber = newbyte[5];

            inMsg.readBytes(magicNumber,0,5);

            // 再读取一个字节得到版本号

            byteversion = inMsg.readByte();

            // 再读取一个字节得到序列化方式

            byteserializableType = inMsg.readByte();

            // 再读取一个字节得到消息类型

            bytemessageType = inMsg.readByte();

            // 再读取四个字节得到消息序号

            intsequenceId = inMsg.readInt();

            // 再读取四个字节得到消息正文长度

            intmessageLength = inMsg.readInt();

            // 再根据正文长度读取序列化后的字节正文数据

            byte[] msgBytes = newbyte[messageLength];

            inMsg.readBytes(msgBytes,0,messageLength);

            // 对于读取到的消息正文进行反序列化,最终得到具体的消息对象

            ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);

            ObjectInputStream ois = newObjectInputStream(bis);

            Message message = (Message) ois.readObject();

            // 最终把反序列化得到的消息对象传递给后续的处理器

            list.add(message);

        }

    }

    上面自定义的处理器中,继承了MessageToMessageCodec类,主要负责将数据在原生ByteBuf与Message之间进行相互转换,而Message对象是自定义的消息对象,这里暂且无需过多关心。

    其中主要实现了两个方法:

    1)encode():出站时会经过的编码方法,会将原生消息对象按自定义的协议封装成对应的字节数据;

    2)decode():入站时会经过的解码方法,会将协议格式的字节数据,转变为具体的消息对象。

    上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:

    1)魔数:使用第1~5个字节来描述,这个魔数值可以按自己的想法自定义;

    2)版本号:使用第6个字节来描述,不同数字表示不同版本;

    3)序列化算法:使用第7个字节来描述,不同数字表示不同序列化方式;

    4)消息类型:使用第8个字节来描述,不同的消息类型使用不同数字表示;

    5)消息序号:使用第9~12个字节来描述,其实就是一个四字节的整数;

    6)正文长度:使用第13~16个字节来描述,也是一个四字节的整数;

    7)消息正文:长度不固定,根据每次具体发送的数据来决定。

    在其中,为了实现简单,这里的序列化方式,则采用的是JDK默认的Serializable接口方式,但这种方式生成的对象字节较大,实际情况中最好还是选择谷歌的ProtoBuf方式,这种算法属于序列化算法中,性能最佳的一种落地实现。

    当然,这个自定义的协议是提供给后续的聊天业务使用的,但这种实战型的内容分享,基本上代码量较高,所以大家看起来会有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑马Netty视频教程》二次改良的,因此如若感觉文字描述较为枯燥,可直接点击前面给出的链接,观看P101~P121视频进行学习。

    最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。

    PS:如果你想系统学习Protobuf,可以从以下文章入手:

    如何选择即时通讯应用的数据传输格式

    强列建议将Protobuf作为你的即时通讯应用数据传输格式

    IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

    IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点

    IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理

    IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理

    IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)

    最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。

    PS:如果你想系统学习Protobuf,可以从以下文章入手:

    如何选择即时通讯应用的数据传输格式

    强列建议将Protobuf作为你的即时通讯应用数据传输格式

    IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

    IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点

    IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理

    IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理

    IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)

    12、系列文章

    跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制

    跟着源码学IM(二):自已开发IM很难?手把手教你撸一个Andriod版IM

    跟着源码学IM(三):基于Netty,从零开发一个IM服务端

    跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统

    跟着源码学IM(五):正确理解IM长连接、心跳及重连机制,并动手实现

    跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统

    跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天

    跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天

    跟着源码学IM(九):基于Netty实现一套分布式IM系统

    跟着源码学IM(十):基于Netty,搭建高性能IM集群(含技术思路+源码)

    跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)

    跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序》(* 本文

    SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能

    13、参考资料

    [1]浅谈IM系统的架构设计

    [2]简述移动端IM开发的那些坑:架构设计、通信协议和客户端

    [3]一套海量在线用户的移动端IM架构设计实践分享(含详细图文)

    [4]一套原创分布式即时通讯(IM)系统理论架构方案

    [5]一套亿级用户的IM架构技术干货(上篇):整体架构、服务拆分等

    [6]一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等

    [7]史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战

    [8]强列建议将Protobuf作为你的即时通讯应用数据传输格式

    [9]IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!

    [10]融云技术分享:全面揭秘亿级IM消息的可靠投递机制

    [11]IM群聊消息如此复杂,如何保证不丢不重?

    [12]零基础IM开发入门(四):什么是IM系统的消息时序一致性?

    [13]如何保证IM实时消息的“时序性”与“一致性”?

    [14]微信的海量IM聊天消息序列号生成实践(算法原理篇)

    [15]网易云信技术分享:IM中的万人群聊技术方案实践总结

    [16]融云IM技术分享:万人群聊消息投递方案的思考和实践

    [17]为何基于TCP协议的移动端IM仍然需要心跳保活机制?

    [18]一文读懂即时通讯应用中的网络心跳包机制:作用、原理、实现思路等

    [19]微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)

    [20]融云技术分享:融云安卓端IM产品的网络链路保活技术实践

    [21]彻底搞懂TCP协议层的KeepAlive保活机制

    [22]深度解密钉钉即时消息服务DTIM的技术设计

    (本文已同步发布于:http://www.52im.net/thread-4530-1-1.html

    相关文章

      网友评论

          本文标题:2023-11-30跟着源码学IM(十二):基于Netty打造一

          本文链接:https://www.haomeiwen.com/subject/jwqfgdtx.html