美文网首页
Protobuf结合Spark Structured Strea

Protobuf结合Spark Structured Strea

作者: 小段DSH12138 | 来源:发表于2021-08-01 17:03 被阅读0次

    背景

        在项目开发中对流式数据使用Spark Structured Streaming进行处理,处理流程为:消息中间件(source) -> Spark Structured Streaming(work) -> AFS(sink)。
        在source->work这个过程中,消息以protobuf形式存储,其中 Spark Structured Streaming接受到的数据格式为Array[Byte],所以我们需要将Array[Byte]形式的数据通过protobuf反序列化出来,得到最终完整的String。
        PS:一开始比较懒,直接使用new String(Array[Byte])的方式企图将数据粗暴的转成String,最后果然只能看到其中一部分数据,剩下的另外很大一部分数据全都是乱码,或者直接是空格,导致我拿不到我想要的字段内容,所以最后还是老老实实使用protobuf进行反序列化。


    头秃

    心酸历程

        废话少说,过程如下:

    需准备的东西

    • 一个可以拿到protoBuf格式数据的消息中间件;
    • Spark Structured Streaming运行环境,我是用的是Hadoop环境;
    • 消息中间件中protobuf数据对应的.proto文件;
    • 本地可以执行protoc命令的protobuf编译器;

    protobuf及本地环境相关准备

        protobuf相关知识、proto语法等proto相关基础知识见proto官网:https://developers.google.cn/protocol-buffers/ 或者可以自己去搜一搜一些别人的博客之类的。
        我的test.proto文件如下:

    syntax = "proto2";
    
    //语言种类枚举,后续可持续补充
    enum LanguageTypes {
        CH = 0; //中文
        ENGLISH = 1;    //英文
        //以下补充标准列表
    }
    
    //日志字符编码枚举
    enum CodeType {
        CODE_TYPE_UNKNOWN = 0;  //为止类型
        UTF8 = 1;   //utf8
        GBK = 2;    //gbk
    }
    
    message Log {
        //以上ID小于129的字段为预留字段,不能添加,用户自有字段ID从130开始
        optional int64 connection_code = 130;   //用户连接号
        optional string action_json = 131;  //用户行为数据
        required string send_time = 132;    //请求发送时间
    }
    

        自己电脑安装proto编译器,Window、Mac环境的安装教程一搜一大堆,自己去下载,安装就行。
        因为我的程序运行环境问题,我在本机上安装了2.4.1版本,下载路径如下:https://github.com/protocolbuffers/protobuf/releases/tag/v2.4.1
        我的系统是MAC,所以安装完成后,在终端输入命令

    protoc --version
    

    结果显示为

    libprotoc 2.4.1
    

    就成功了。

    protobuf编译成Java类

    在本机终端输入命令:

    protoc -I=proto文件存储目录 --java_out=最终Java类想要存放目标目录的绝对路径 proto文件的绝对路径
    

    就会在--java_out参数指定的路径下生成目标Java类,我使用test.proto生成了Test.java类;将生成的Test.java类移到代码中指定目录下。

    Maven依赖

    我的项目是Scala项目,所以需要在maven项目中需添加protobuf-java依赖才可以使用上面的Java类,我的POM依赖如下:

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>2.4.1</version>
    </dependency>
    

    Maven依赖的坑

    • Maven依赖版本一定要和本地生成Java类的protoc编译器版本保持一致,否则将造成proto对应的Java类不可用;
    • 我们的Hadoop环境中依赖的protobuf版本为2.5.0,但是引入的spark-core依赖,使用的是protobuf2.4.1,其中,2.4.1和2.5.0不可兼容。
      • 可在项目路径下使用命令mvn dependency:tree来查看maven依赖自身还依赖了哪些包及具体版本;
      • 一开始我安装了最新的3.5.1版本,但是编译运行时,报错信息为:
    java.lang.NoSuchMethodError
    

    但是我在本地跳转却可以跳转到相应的方法中去,造成这种情况是因为包冲突,所以我将protoc版本降到了2.5.0,但是报错信息为:

    java.lang.VerifyError:class com.XX.XX.Test$Log overrides final method......
    

    这是因为虽然Hadoop环境中用到了2.5.0,但在运行Spark程序的时候,还是会去调用2.4.1,所以我又将版本降成了2.4.1。在降为2.4.1后,对一些与2.4.1版本无法兼容的其他依赖的版本做了相应修改。
    调用成功!

    Spark代码

    val inputStream = spark.readStream
        ......
        .load
        .as[Array[Byte]]
        .map(row => {
            val log = Test.Log.parseFrom(row)
            val action_json = log.getActionJson
            action_json
        })
        .toDF("value")
    

    [图片上传失败...(image-b2d825-1627808612072)]

    相关文章

      网友评论

          本文标题:Protobuf结合Spark Structured Strea

          本文链接:https://www.haomeiwen.com/subject/ejyimltx.html