structured streaming window官方例子

package com.ky.service

import java.sql.Timestamp

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._


  • @Author: xwj
  • @Date: 2019/2/2 0002 10:04
  • @Version 1.0
    object WindowExample {

def main(args: Array[String]) {

if (args.length < 3) {

  System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

    " <window duration in seconds> [<slide duration in seconds>]")



val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

if (slideSize > windowSize) {

  System.err.println("<slide duration> must be less than or equal to <window duration>")


val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

val spark = SparkSession





import spark.implicits._

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream


  .option("host", host)

  .option("port", port)

  .option("includeTimestamp", value = true) //输出内容包括时间戳


// Split the lines into words, retaining timestamps

val words = lines.as[(String, Timestamp)].flatMap(line =>

  line._1.split(",").map(word => (word, line._2))

).toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group


val windowedCounts = words.groupBy(

  window($"timestamp", windowDuration, slideDuration), $"word"


// Start running the query that prints the windowed word counts to the console


val query = windowedCounts.writeStream



  .option("truncate", "false")




package com.ky.service

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}


  • @Author: xwj
  • @Date: 2019/2/2 0002 10:51
  • @Version 1.0
    object StructuredNetworkWordCountWindowed {

def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +
" <window duration in seconds> [<slide duration in seconds>]")
val host = args(0)
val port = args(1)
val windowSize = args(2).toInt
val slideSize = if (args.length == 3) windowSize else args(3).toInt
val triggerTime = args(4).toInt
if (slideSize > windowSize) {
System.err.println("<slide duration> must be less than or equal to <window duration>")
val windowDuration = s"windowSize seconds" val slideDuration = s"slideSize seconds"

val spark = SparkSession
spark.sparkContext.setLogLevel(logLevel = "error")

import spark.implicits._

val lines = spark.readStream
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)

val wordCounts: DataFrame = lines.select(window($"timestamp", windowDuration, slideDuration), $"value")

//非聚合操作是指接收到的数据DataFrame进行select等操作,其操作的特征是返回Dataset类型的数据。若Structured Streaming进行非聚合操作,那么输出形式必须为"append",否则程序会出现异常
val query = wordCounts.writeStream
  .trigger(Trigger.ProcessingTime(s"$triggerTime seconds"))
  .option("truncate", "false")




