前言
之前已经写过一篇文章,StreamingPro 支持Spark Structured Streaming,不过当时只是玩票性质的,因为对Spark 2.0+ 版本其实也只是尝试性质的,重点还是放在了spark 1.6 系列的。不过时间在推移,Spark 2.0+ 版本还是大势所趋。所以这一版对底层做了很大的重构,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三个引擎了。
准备工作
下载streamingpro for spark 2.0的包,然后下载spark 2.1 的安装包。
你也可以在 streamingpro目录 找到spark 1.6+ 或者 flink的版本。最新的大体会按如下格式统一格式了:
streamingpro-spark-0.4.14-SNAPSHOT.jar 适配 spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar 适配 spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 适配 flink 1.2.0, scala 2.10
测试例子
写一个json文件ss.json,内容如下:
{
"scalamaptojson": {
"desc": "测试",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "ss.sources",
"params": [
{
"format": "socket",
"outputTable": "test",
"port":"9999",
"host":"localhost",
"path": "-"
},
{
"format": "com.databricks.spark.csv",
"outputTable": "sample",
"header":"true",
"path": "/Users/allwefantasy/streamingpro/sample.csv"
}
]
},
{
"name": "ss.sql",
"params": [
{
"sql": "select city from test left join sample on test.value == sample.name",
"outputTableName": "test3"
}
]
},
{
"name": "ss.outputs",
"params": [
{
"mode": "append",
"format": "console",
"inputTableName": "test3",
"path": "-"
}
]
}
],
"configParams": {
}
}
}
大体是一个socket源,一个sample文件。socket源是流式的,sample文件则是批处理的。sample.csv内容如下:
id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
然后你在终端执行 nc -lk 9999 就好了。
然后运行spark程序:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar \
-streaming.name test \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json
在nc 那个终端输入比如eason ,然后回车,马上就可以看到spark终端接受到了数据。
网友评论
威廉大神,这个怎么回事啊?
```
val df = spark.readStream.format(_cfg("format")).options(
(_cfg - "format" - "path" - "outputTable")
.map(f => (f._1.toString, f._2.toString))) //这一行去掉 应该就可以了
.load(sourcePath)
df.createOrReplaceTempView(_cfg("outputTable"))
```
然后你可以重新打个包。我在适当的时候会提交到master上。或者你可以提一个PR
基于spark2.3.0
{
"ss-testjob-01": {
"desc": "ss-testjob-01",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [{
"name": "ss.sources",
"params": [{
"format": "kafka",
"outputTable": "kafka_source_output",
"kafka.bootstrap.servers": "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
"subscribe": "streamingpro_source",
"path": "-"
}]
},
{
"name": "ss.sql",
"params": [{
"sql": "select CAST(value AS STRING) AS value from kafka_source_output",
"outputTableName": "table1"
}]
},
{
"name": "ss.sql",
"params": [{
"sql": "select split(value, ',')[1] AS username from table1",
"outputTableName": "table2"
}]
},
{
"name": "ss.outputs",
"params": [{
"mode": "append",
"format": "kafka",
"kafka.bootstrap.servers": "172.16.26.6:9092,172.16.26.10:9092,172.16.26.13:9092",
"topic": "streamingpro_result",
"inputTableName": "table2",
"checkpoint": "/tmp/ss-kafka/",
"path": "/tmp/ss-kafka-data"
}]
}],
"configParams": {
}
}
}
这是配置