美文网首页MLSQL数据科学家
StreamingPro添加Scala script 模块支持

StreamingPro添加Scala script 模块支持

作者: 祝威廉 | 来源:发表于2016-09-14 22:32 被阅读820次

    SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。

    我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:

    "udf_register": {
        "desc": "测试",
        "strategy": "spark",
        "algorithm": [],
        "ref": [],
        "compositor": [
          {
            "name": "sql.udf",
            "params": [
              {
                "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
              }
            ]
          }
        ]
      }
    
    

    这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。

    这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。

    核心是ScriptCompositor模块:

     {
            "name": "batch.script",
            "params": [
              {
                "inputTableName": "test",
                "outputTableName": "test3"
              },
              {
                "raw": [
                  "val Array(a,b)=rawLine.split(\"\t\");",
                  "Map(\"a\"->a,\"b\"->b)"
                ]
              }
            ]
          }
    

    如果我想在代码里直接处理所有的列,则如下:

    {
            "name": "batch.script",
            "params": [
              {
                "inputTableName": "test2",
                "outputTableName": "test3",
                "useDocMap": true
              },
              {
                "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
              }
            ]
    }
    

    通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。

    如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。

    你可以把代码放到一个文件里,如下:

    {
            "name": "batch.script",
            "params": [
              {
                "inputTableName": "test",
                "outputTableName": "test3"
              },
              {
                "raw": "file:///tmp/raw_process.scala"
              }
            ]
          }
    

    通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。

    这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:

    {
            "name": "batch.script",
            "params": [
              {
                "inputTableName": "test",
                "outputTableName": "test3",
                "schema": "file:///tmp/schema.scala",
                "useDocMap": true
              },
              {
                "raw": "file:///tmp/raw_process.scala"
              }
            ]
          }
    

    schema.scala的内容大致如下:

    Some(
    StructType(
    Array(
    StructField("a", StringType, true),
    StructField("b", StringType, true)))
    )
    

    后续roadmap是:

    1. 支持外部脚本,比如放在hdfs或者http服务器上。
    2. 支持java 脚本
    3. 支持javascript脚本
    4. 支持 python 脚本
    5. 支持 ruby脚本
    6. 支持 groovy 脚本

    举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。

    {
      "convert_data_parquet": {
        "desc": "测试",
        "strategy": "spark",
        "algorithm": [],
        "ref": [],
        "compositor": [
          {
            "name": "batch.sources",
            "params": [
              {
                "path": "file:///tmp/hdfsfile",
                "format": "org.apache.spark.sql.execution.datasources.hdfs",
                "fieldName": "raw",
                "outputTableName":"test"
              }
            ]
          },     
          {
            "name": "batch.script",
            "params": [
              {
                "inputTableName": "test",
                "outputTableName": "test3"
              },
              {
                "raw": [
                  "val Array(a,b)=rawLine.split(\"\t\");",
                  "Map(\"a\"->a,\"b\"->b)"
                ]
              }
            ]
          },
          {
            "name": "batch.sql",
            "params": [
              {
                "sql": "select a,b  from test3 "
              }
            ]
          },
          {
            "name": "batch.outputs",
            "params": [
              {
               "format":"console"
              }
            ]
          }
        ],
        "configParams": {
        }
      }
    }
    
    

    相关文章

      网友评论

      • carrie_chh:请问下,streamingpro是否支持mongo的数据输出,是要用SQLParquetOutputCompositor么,请问是否有相关例子
      • b8c416eb0383:@William,ref算子是可以构建dag图的吗?有没有使用ref的例子呢?
        祝威廉:@morenn520 不是 主要是为了引用需要注册的udf函数库,相关表 等 未来可能会进一步扩展其功能。举个例子 你可以申明多张表 然后在主流程join这些表
      • spark学习:那么reduce这些个操作也是一样的吧?
        祝威廉:@spark学习 嗯 是的 十一快乐 :smile:
        spark学习: @祝威廉 就是对应的spark数据操作,使用的时候,定义好sql语句就可以了,是吧。打扰了,祝十一快乐😊
        祝威廉:@spark学习 reduce 应该不行 对应需要SQL来完成
      • spark学习:求问,这个支持map数据处理的操作吗
        祝威廉:@spark学习 支持的就是map doc就是个map对象

      本文标题:StreamingPro添加Scala script 模块支持

      本文链接:https://www.haomeiwen.com/subject/ictbettx.html