美文网首页
Structured Streaming 官方示例运行及问题解决

Structured Streaming 官方示例运行及问题解决

作者: lei_charles | 来源:发表于2019-12-05 10:54 被阅读0次
    1. 示例代码
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.streaming.OutputMode
      import org.apache.spark.sql.{DataFrame, SparkSession}
      
      /**
        * 监听网络端口发来的内容,然后进行 WordCount
        */
      object StructuredStreamingDemo {
      
        def main(args: Array[String]): Unit = {
      
          Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
          val conf = new SparkConf()
            .setIfMissing("spark.master", "local[4]")
            .setAppName("Structured Network Count")
            .set("fs.defaultFS","file://D:/temp/defaultFS/")
      
          // 创建程序入口 SparkSession,并引入 spark.implicits 来允许 Scalaobject 隐式转换为 DataFrame
          val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
          import spark.implicits._
          
          // 第二步: 创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999
          val lines: DataFrame = spark.readStream.format("socket")
          .option("host", "192.168.1.101")
          .option("port", "9999")
          .load()
      
          // 第三步: 进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为 Dataset, 之后在 Dataset 里进行单词统计。
          val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))
          val wordcount: DataFrame = words.groupBy("value").count()
      
          // 第四步: 创建查询句柄,定义打印结果方式并启动程序 这里使用 writeStream 方法, 输出模式为全部输出到控制台。
          val query: StreamingQuery = wordcount.writeStream
            .outputMode(OutputMode.Complete)
            .format("console")
            .start()
          // 调用 awaitTermination 方法来防止程序在处理数据时停止
          query.awaitTermination()
        }
      }
      
    2. 运行结果
      ...
      Connected to the target VM, address: '127.0.0.1:64497', transport: 'socket'
      Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      19/12/06 10:41:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
      -------------------------------------------
      Batch: 0
      -------------------------------------------
      +-----+-----+
      |value|count|
      +-----+-----+
      |  dog|    3|
      |  cat|    1|
      +-----+-----+
      
      -------------------------------------------
      Batch: 1
      -------------------------------------------
      +-----+-----+
      |value|count|
      +-----+-----+
      |  dog|    3|
      |  cat|    2|
      |  owl|    1|
      +-----+-----+
      
      -------------------------------------------
      Batch: 2
      -------------------------------------------
      +-----+-----+
      |value|count|
      +-----+-----+
      |  dog|    4|
      |  cat|    2|
      |  owl|    2|
      +-----+-----+
      ...
      
    3. 遇到错误及解决

      错误日志:

      Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket'
      Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
      Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename.
       at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196)
       at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
       at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
       at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
       at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
       at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
       at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
       at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221)
       at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
       at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
       at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40)
       at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala)
      Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket'
      
      Process finished with exit code 1
      

      解决办法:

      1. 去掉 core-site.xml 配置文件或注释掉该文件中的 fs.defaultFS 配置
        <property>
          <name>fs.defaultFS</name>
          <value>hdfs://cdh01:8020</value>
        </property>
        
      2. 代码中添加 set("fs.defaultFS","file://D:/temp/defaultFS/")

    相关文章

      网友评论

          本文标题:Structured Streaming 官方示例运行及问题解决

          本文链接:https://www.haomeiwen.com/subject/euisgctx.html