美文网首页
Vert.x + Protobuf二进制协议解析

Vert.x + Protobuf二进制协议解析

作者: 司青玄 | 来源:发表于2019-06-26 11:33 被阅读0次

    这一期介绍如何解析二进制私有协议。

    先说几句题外话,就是绝大多数情况下,可能根本用不着使用私有二进制协议,除非你的业务对性能极其敏感,否则HTTP足矣。

    协议

    我们的协议非常简单,先是一个4字节的整数表示数据长度,然后紧接着就是protobuf序列化后的字节数组。proto定义如下:

    syntax = "proto2";
    package cn.fh.vertx.demo.proto;
    
    option java_multiple_files = true;
    
    message Message {
        required int32 type = 1;
        required string content = 2;
    }
    

    在实际业务场景中,消息格式的定义是有讲究的,这里不深究,重点在于vert.x解析。

    RecordParser

    Vert.x中只有一个二进制协议解析辅助类,即RecordParser,可以很好的解决粘包/拆包问题。它有两种工作模式,一是delimited mode, 即通过固定分隔符分隔数据包,这种用的其实比较少;二是fixed size mode,即固定数据长度模式。诈一看可能很多人会有疑虑,多数协议都是可变长度的啊,这两种模式看起来都不能满足要求。其实模式二支持在处理的过程中随时动态的改变size值,这样就可以间接完成对变长协议的解析。

    首先,RecordParser有两类Builder方法,RecordParser.newDelimited()RecordParser.newFixed(4),分别对应上面的模式一和模式二。构造完成以后,跟其它vert.x的方法一样,我们需要定义一个Handler, RecordParser每分割出一段字节数组都会调用一次Handler,业务逻辑处理就在此Handler中进行。对于上面"长度 + 数据"的协议,我们可以首先构造一个以4字节为单位的RecordParser,在收到长度数据后,再动态将解析器修改成指定长度状态,从而完成对数据部分的分割。代码示例如下:

    // 先以长度4构造对象
            RecordParser parser = RecordParser.newFixed(4);
    
            // 设置处理器
            parser.setOutput(new Handler<Buffer>() {
                // 表示当前数据长度
                int size = -1;
    
                @Override
                public void handle(Buffer buffer) {
                    // -1表示当前还没有长度信息,需要从收到的数据中取出长度
                    if (-1 == size) {
                        // 取出长度
                        size = buffer.getInt(0);
                        // 动态修改长度
                        parser.fixedSizeMode(size);
    
                    } else {
                        // 如果size != -1, 说明已经接受到长度信息了,接下来的数据就是protobuf可识别的字节数组
                        byte[] buf = buffer.getBytes();
                        Message msg = null;
                        try {
                            msg = Message.parseFrom(buf);
                        } catch (InvalidProtocolBufferException e) {
                            System.out.println(e.getMessage());
                            socket.close();
                            return;
                        }
                        
                        System.out.println(msg);
                        // 处理完后要将长度改回4
                        parser.fixedSizeMode(4);
                        // 重置size变量
                        size = -1;
                    }
                }
            });
    

    虽然用起来比较别扭,但这的确是标准使用方法。

    如果表示长度的数据不是从0开始的,比如0 ~ 3为消息类型,4 ~ 7才表示body的长度,那么可以在构造解析器时先将长度设为8, 然后取后4字节做为新的长度即可。

    其实这个类所做的事正是Netty里LengthFieldBasedFrameDecoder的功能,希望Vert.x以后能直接提供好类似的处理器,不要让用户再手动取长度了,毕竟低层的Netty都支持,你更高级的封装怎么可以没有呢。

    这个RecordParser怎么用呢?看一下完整的代码吧:

            Vertx vertx = Vertx.vertx();
            
            // 创建TCP Server
            NetServer server = vertx.createNetServer();
            // 设置Handler
            server.connectHandler(socket -> {
                // 构造parser
                RecordParser parser = RecordParser.newFixed(4);
                parser.setOutput(new Handler<Buffer>() {
                    int size = -1;
    
                    @Override
                    public void handle(Buffer buffer) {
                        if (-1 == size) {
                            size = buffer.getInt(0);
                            parser.fixedSizeMode(size);
                            
                        } else {
                            byte[] buf = buffer.getBytes();
                            Message msg = null;
                            try {
                                msg = Message.parseFrom(buf);
                            } catch (InvalidProtocolBufferException e) {
                                System.out.println(e.getMessage());
                                socket.close();
                                return;
                            }
                            parser.fixedSizeMode(4);
                            size = -1;
                        }
                    }
                });
    
                socket.handler(parser);
            });
            
            // 监听
            server.listen(8008, "localhost", res -> {
                if (res.succeeded()) {
                    System.out.println("tcp server is listening at 8008");
    
                } else {
                    System.out.println(res.cause());
                }
            });
    

    写起来会有一点点奇怪,适应就好。

    相关文章

      网友评论

          本文标题:Vert.x + Protobuf二进制协议解析

          本文链接:https://www.haomeiwen.com/subject/vfaqcctx.html