美文网首页
structured streaming window官方例子

structured streaming window官方例子

作者: 会飞的蜗牛66666 | 来源:发表于2019-02-02 10:34 被阅读0次

    窗口模式的聚合操作只能用complete模式
    package com.ky.service

    import java.sql.Timestamp

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._

    /**

    • @Author: xwj
    • @Date: 2019/2/2 0002 10:04
    • @Version 1.0
      */
      object WindowExample {

    def main(args: Array[String]) {

    if (args.length < 3) {
    
      System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    
        " <window duration in seconds> [<slide duration in seconds>]")
    
      System.exit(1)
    
    }
    
    val host = args(0)
    
    val port = args(1).toInt
    
    val windowSize = args(2).toInt
    
    val slideSize = if (args.length == 3) windowSize else args(3).toInt
    
    if (slideSize > windowSize) {
    
      System.err.println("<slide duration> must be less than or equal to <window duration>")
    
    }
    
    val windowDuration = s"$windowSize seconds"
    
    val slideDuration = s"$slideSize seconds"
    
    val spark = SparkSession
    
      .builder
    
      .appName("StructuredNetworkWordCountWindowed")
    
      .master("local")
    
      .getOrCreate()
    
    import spark.implicits._
    
    // Create DataFrame representing the stream of input lines from connection to host:port
    
    val lines = spark.readStream
    
      .format("socket")
    
      .option("host", host)
    
      .option("port", port)
    
      .option("includeTimestamp", value = true) //输出内容包括时间戳
    
      .load()
    
    
    
    // Split the lines into words, retaining timestamps
    
    val words = lines.as[(String, Timestamp)].flatMap(line =>
    
      line._1.split(",").map(word => (word, line._2))
    
    ).toDF("word", "timestamp")
    
    // Group the data by window and word and compute the count of each group
    
    //设置窗口大小和滑动窗口步长
    
    val windowedCounts = words.groupBy(
    
      window($"timestamp", windowDuration, slideDuration), $"word"
    
    ).count().orderBy("window")
    
    // Start running the query that prints the windowed word counts to the console
    
    //由于采用聚合操作,所以需要指定"complete"输出形式。指定"truncate"只是为了在控制台输出时,不进行列宽度自动缩小。
    
    val query = windowedCounts.writeStream
    
      .outputMode("complete")
    
      .format("console")
    
      .option("truncate", "false")
    
      .start()
    
    query.awaitTermination()
    

    }
    }

    非聚合操作只能用appen模式
    package com.ky.service

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.{DataFrame, SparkSession}

    /**

    • @Author: xwj
    • @Date: 2019/2/2 0002 10:51
    • @Version 1.0
      */
      object StructuredNetworkWordCountWindowed {

    def main(args: Array[String]): Unit = {
    if (args.length < 3) {
    System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
    " <window duration in seconds> [<slide duration in seconds>]")
    System.exit(1)
    }
    val host = args(0)
    val port = args(1)
    val windowSize = args(2).toInt
    val slideSize = if (args.length == 3) windowSize else args(3).toInt
    val triggerTime = args(4).toInt
    if (slideSize > windowSize) {
    System.err.println("<slide duration> must be less than or equal to <window duration>")
    }
    val windowDuration = s"windowSize seconds" val slideDuration = s"slideSize seconds"

    val spark = SparkSession
      .builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master("local")
      .getOrCreate()
    spark.sparkContext.setLogLevel(logLevel = "error")
    
    import spark.implicits._
    
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .option("includeTimestamp", value = true)
      .load()
    
    val wordCounts: DataFrame = lines.select(window($"timestamp", windowDuration, slideDuration), $"value")
    
    //非聚合操作是指接收到的数据DataFrame进行select等操作,其操作的特征是返回Dataset类型的数据。若Structured Streaming进行非聚合操作,那么输出形式必须为"append",否则程序会出现异常
    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime(s"$triggerTime seconds"))
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    

    }
    }

    相关文章

      网友评论

          本文标题:structured streaming window官方例子

          本文链接:https://www.haomeiwen.com/subject/flhssqtx.html