美文网首页MLSQL
StreamingPro 基于Spark 2.1.1版本 支持S

StreamingPro 基于Spark 2.1.1版本 支持S

作者: 祝威廉 | 来源:发表于2017-06-29 22:01 被阅读1307次

源码构建简化

很多人吐槽StreamingPro构建实在太麻烦了。看源码都难。然后花了一天时间做了比较大重构,这次只依赖于ServiceFramework项目。具体构建方式如下:

git clone https://github.com/allwefantasy/ServiceFramework.git
cd ServiceFramework
mvn install -Pscala-2.11 -Pjetty-9 -Pweb-include-jetty-9
mvn install -Pscala-2.10 -Pjetty-9 -Pweb-include-jetty-9

//如果你需要切换scala版本,在构建之前,记得运行下面的命令
./dev/change-version-to-2.10.sh

接着就可以构建StreamingPro了:

git clone https://github.com/allwefantasy/streamingpro.git
// for spark 1.6.*
mvn -DskipTests clean package  -pl streamingpro-spark -am  -Ponline -Pscala-2.10  -Pcarbondata -Phive-thrift-server -Pspark-1.6.1 -Pshade
// for spark 2.*
mvn -DskipTests clean package  -pl streamingpro-spark-2.0 -am  -Ponline -Pscala-2.11  -Phive-thrift-server -Pspark-2.1.0 -Pshade 

基于Spark 2.1.1 的StreamingPro 同时支持Spark Streaming 以及Structured Streaming

Structured Streaming 的支持参看文章:
StreamingPro 再次支持 Structured Streaming

Spark Streaming 则和Structure Streaming的形态一模一样:

我们看具体的配置文件:

{
  "scalamaptojson": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
    ],
    "compositor": [
      {
        "name": "stream.sources",
        "params": [
          {
            "format": "socket",
            "outputTable": "test",
            "port": "9999",
            "host": "localhost",
            "path": "-"
          },
          {
            "format": "com.databricks.spark.csv",
            "outputTable": "sample",
            "header": "true",
            "path": "/Users/allwefantasy/streamingpro/sample.csv"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select city from test left join sample on test.content == sample.name",
            "outputTableName": "test3"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "mode": "Overwrite",
            "format": "console",
            "inputTableName": "test3",
            "path": "-"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}

只是把 ss 前缀换成了 stream。 启动方式如下:

SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_streaming \
-streaming.job.file.path file://$SHome/spark-streaming.json

相关文章

网友评论

  • 黑帅:按照github上步骤编译成功,没有在对应的目录下生成streamingpro-spark-2.0-0.4.15-SNAPSHOT.jar这个jar包,--class streaming.core.StreamingApp 这个类我用的是streamingpro-commons下streamingpro-common-1.0.0.jar,使用命令提交然后报错:java.lang.ClassNotFoundException: streaming.core.StreamingApp,,,请问这是哪的问题呢?
    黑帅:@祝威廉 编译时没报错,每个子项目下面都生成了jar包,但是我看streamingpro-commons/target/streamingpro-common-1.0.0.jar包中没有streaming.core.StreamingApp这个.class文件
    祝威廉:@黑帅 有报错么 编译时
  • 黑曼巴_1c12:大神,还能麻烦给个多个filter顺序执行的例子么?自己用SparkStreamingStrategy策略,配置多个Filter Compositor时,发现同个filter执行了多次:disappointed_relieved:
    黑曼巴_1c12:已解决,Filter Compositor返回前做了foreachRDD,将filter后的结果cache就可以了,3q all the same:smile:
  • ad2a188b727a:大神,能不能给一个在IDE比如intellij idea上面build的教程,小白谢了
    祝威廉:@mvpanswer7 是按这里的方式编译的么? https://github.com/allwefantasy/streamingpro/wiki/Build 另外 如果在idea里,你需要运行mvn install -Pscala-2.11 -Pjetty-9 -Pweb-include-jetty-9 和 ./dev/change-version-to-2.10.sh
    mvn install -Pscala-2.10 -Pjetty-9 -Pweb-include-jetty-9 。也就是serviceframework 需要scala-2.10,scala-2.11 两个版本。之后就没啥问题。另外,你给的链接是非常早的版本了。现在spark streaming 的配置已经得到很大的简化。可以参看这篇文章的内容
    ad2a188b727a:@祝威廉 大神,再问你一个问题哈~我在idea里build好之后,按照https://github.com/allwefantasy/streamingpro/wiki/Run-your-first-application 这里说的跑了LocalStreamingApp,结果报错信息是:
    Error:(14, 28) java: 程序包net.csdn.http.server不存在
    Error:(26, 1) java: 程序包org.eclipse.jetty.server不存在
    Error:(27, 40) java: 程序包org.eclipse.jetty.server.handler不存在
    Error:(45, 19) java: 找不到符号
    符号: 类 Server
    位置: 类 net.csdn.modules.http.HttpServer
    ......
    祝威廉:参看楼上的回复。 下载ServiceFramework 框架然后进行编译安装: mvn install -Pscala-2.11 -Pjetty-9 -Pweb-include-jetty-9 之后下载StreamignPro mvn clean package -DskipTests -pl streamingpro-spark-2.0 -am -Ponline -Pscala-2.11 -Phive-thrift-server -Pspark-2.2.0 -Pshade 这里编译的是基于spark 2.2.0 你也可以换成 2.1.0
  • d36c0c56c192:编译spark2.2.0的版本,可以支持么?@祝威廉
    祝威廉:@呵库拉玛塔塔 jetty 版本不对。我今天也编译了基于spark 2.2.0 的版本,并没有上面的问题。编译语句如下: mvn clean package -DskipTests -pl streamingpro-spark-2.0 -am -Ponline -Pscala-2.11 -Phive-thrift-server -Pspark-2.1.0 -Pshade

    而对于依赖的SF,进行如下编译:

    mvn install -Pscala-2.11 -Pjetty-9 -Pweb-include-jetty-9
    d36c0c56c192:@祝威廉 2.2.0下编译后,提交任务会出现
    1) Error injecting constructor, java.lang.NoSuchMethodError: org.eclipse.jetty.server.Server.<init>(Lorg/eclipse/jetty/util/thread/ThreadPool;)V
    at net.csdn.modules.http.HttpServer.<init>(HttpServer.java:74)
    at net.csdn.modules.http.HttpModule.configure(HttpModule.java:15)
    while locating net.csdn.modules.http.HttpServer

    1 error
    at com.google.inject.internal.Errors.throwCreationExceptionIfErrorsExist(Errors.java:435)
    at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:183)
    at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)
    at com.google.inject.Guice.createInjector(Guice.java:95)
    at net.csdn.bootstrap.loader.impl.ControllerLoader.load(ControllerLoader.java:46)
    at net.csdn.bootstrap.Bootstrap.configureSystem(Bootstrap.java:103)
    at net.csdn.bootstrap.Bootstrap.main(Bootstrap.java:41)
    这个是有冲突么?
    祝威廉:@呵库拉玛塔塔 我觉得是可以的 你直接改下spark版本编译应该就可以了
  • d36c0c56c192:如果从kafka中读取的是一个json,怎么通过 transaction配置,完成json到 table的映射? @祝威廉
    d36c0c56c192:@祝威廉 迫切期待中
    祝威廉:@呵库拉玛塔塔 新版本也是可以的 我晚点会出例子
    d36c0c56c192:老版本可以通过各种compositor来完成,但是不支持批量输出,2.1.1的版本用最新的配置就不知道怎么办了

本文标题:StreamingPro 基于Spark 2.1.1版本 支持S

本文链接:https://www.haomeiwen.com/subject/vdbvcxtx.html