美文网首页MLSQL
StreamingPro 再次支持 Structured Str

StreamingPro 再次支持 Structured Str

作者: 祝威廉 | 来源:发表于2017-03-28 11:23 被阅读918次

前言

之前已经写过一篇文章,StreamingPro 支持Spark Structured Streaming,不过当时只是玩票性质的,因为对Spark 2.0+ 版本其实也只是尝试性质的,重点还是放在了spark 1.6 系列的。不过时间在推移,Spark 2.0+ 版本还是大势所趋。所以这一版对底层做了很大的重构,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三个引擎了。

准备工作

下载streamingpro for spark 2.0的包,然后下载spark 2.1 的安装包。
你也可以在 streamingpro目录 找到spark 1.6+ 或者 flink的版本。最新的大体会按如下格式统一格式了:

streamingpro-spark-0.4.14-SNAPSHOT.jar  适配  spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar  适配  spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 适配 flink 1.2.0, scala 2.10

测试例子

写一个json文件ss.json,内容如下:

{
  "scalamaptojson": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
    ],
    "compositor": [
      {
        "name": "ss.sources",
        "params": [
          {
            "format": "socket",
            "outputTable": "test",
            "port":"9999",
            "host":"localhost",
            "path": "-"
          },
          {
            "format": "com.databricks.spark.csv",
            "outputTable": "sample",
            "header":"true",
            "path": "/Users/allwefantasy/streamingpro/sample.csv"
          }
        ]
      },
      {
        "name": "ss.sql",
        "params": [
          {
            "sql": "select city from test left join sample on test.value == sample.name",
            "outputTableName": "test3"
          }
        ]
      },
      {
        "name": "ss.outputs",
        "params": [
          {
            "mode": "append",
            "format": "console",
            "inputTableName": "test3",
            "path": "-"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

大体是一个socket源,一个sample文件。socket源是流式的,sample文件则是批处理的。sample.csv内容如下:

id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35

然后你在终端执行 nc -lk 9999 就好了。

然后运行spark程序:

SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json

在nc 那个终端输入比如eason ,然后回车,马上就可以看到spark终端接受到了数据。

相关文章

网友评论

  • CoderJed:org.apache.spark.sql.streaming.StreamingQueryException: Required attribute 'value' not found;
    威廉大神,这个怎么回事啊?
    祝威廉:@CoderJed 打开 streaming.core.compositor.spark.ss.source.MultiSQLSourceCompositor,

    ```
    val df = spark.readStream.format(_cfg("format")).options(
    (_cfg - "format" - "path" - "outputTable")
    .map(f => (f._1.toString, f._2.toString))) //这一行去掉 应该就可以了
    .load(sourcePath)
    df.createOrReplaceTempView(_cfg("outputTable"))
    ```

    然后你可以重新打个包。我在适当的时候会提交到master上。或者你可以提一个PR
    CoderJed:@祝威廉
    基于spark2.3.0
    {
    "ss-testjob-01": {
    "desc": "ss-testjob-01",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [{
    "name": "ss.sources",
    "params": [{
    "format": "kafka",
    "outputTable": "kafka_source_output",
    "kafka.bootstrap.servers": "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
    "subscribe": "streamingpro_source",
    "path": "-"
    }]
    },
    {
    "name": "ss.sql",
    "params": [{
    "sql": "select CAST(value AS STRING) AS value from kafka_source_output",
    "outputTableName": "table1"
    }]
    },
    {
    "name": "ss.sql",
    "params": [{
    "sql": "select split(value, ',')[1] AS username from table1",
    "outputTableName": "table2"
    }]
    },
    {
    "name": "ss.outputs",
    "params": [{
    "mode": "append",
    "format": "kafka",
    "kafka.bootstrap.servers": "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
    "topic": "streamingpro_result",
    "inputTableName": "table2",
    "checkpoint": "/tmp/ss-kafka/",
    "path": "/tmp/ss-kafka-data"
    }]
    }],
    "configParams": {

    }
    }
    }
    这是配置
    祝威廉:@CoderJed 能贴下你的配置或者代码么 另外是基于spark 2.2么
  • 90ab3115005c:请问下代码中MultiSQLSourceCompositor和MultiSQLOutputCompositor的功能主要是啥,使用方式是怎么样的
    90ab3115005c:mongo输出是否也支持~
    90ab3115005c:streamingPro可以实现一个源,多个输出么?

本文标题:StreamingPro 再次支持 Structured Str

本文链接:https://www.haomeiwen.com/subject/lvqjottx.html