美文网首页
Spark SQL + Streaming

Spark SQL + Streaming

作者: 歌哥居士 | 来源:发表于2019-03-26 07:01 被阅读0次
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    
    $ nc -lk 9999
    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
    
    /**
      * Spark Streaming 整合 Spark SQL完成词频统计
      */
    object SqlNetworkWordCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setMaster("local[2]")
          .setAppName("SqlNetworkWordCount")
          .set("spark.driver.host", "localhost")
        val ssc = new StreamingContext(conf, Seconds(5))
    
    
        val words = ssc.socketTextStream("localhost", 9999).flatMap(_.split(" "))
    
        // Convert RDDs of the words DStream to DataFrame and run SQL query
        words.foreachRDD { (rdd: RDD[String], time: Time) =>
          // Get the singleton instance of SparkSession
          val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
          import spark.implicits._
    
          // Convert RDD[String] to RDD[case class] to DataFrame
          val wordsDataFrame = rdd.map(w => Record(w)).toDF()
    
          // Creates a temporary view using the DataFrame
          wordsDataFrame.createOrReplaceTempView("words")
    
          // Do word count on table using SQL and print it
          val wordCountsDataFrame =
            spark.sql("select word, count(*) as total from words group by word")
          println(s"========= $time =========")
          wordCountsDataFrame.show()
        }
    
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      /** Case class for converting RDD to DataFrame */
      case class Record(word: String)
    
    
      /** Lazily instantiated singleton instance of SparkSession */
      object SparkSessionSingleton {
    
        @transient  private var instance: SparkSession = _
    
        def getInstance(sparkConf: SparkConf): SparkSession = {
          if (instance == null) {
            instance = SparkSession
              .builder
              .config(sparkConf)
              .getOrCreate()
          }
          instance
        }
      }
      // scalastyle:on println
    
    
    }
    

    相关文章

      网友评论

          本文标题:Spark SQL + Streaming

          本文链接:https://www.haomeiwen.com/subject/aricvqtx.html