原文链接:https://blog.csdn.net/kaaosidao/article/details/85790664
1.complete需要聚合,并将原先批次的数据和本次批次的数据一起聚合,而append是不能聚合的
2.若用append替换complete代码演示:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[1]").getOrCreate()
import spark.implicits._
val wordCounts = spark.readStream.text("D:\\tmp\\streaming\\struct")
.as[String].flatMap(_.split(" "))
.groupBy("value").count()
val query = wordCounts.writeStream
.foreach(new TestForeachWriter())
.outputMode("complete")//complete append
.trigger(ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
//正常运行,若将complete改为append,将报以下错误
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;
3.若用complete替换append代码演示:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[1]").getOrCreate()
import spark.implicits._
val wordCounts = spark.readStream.text("D:\\tmp\\streaming\\struct")
.as[String].flatMap(_.split(" ")).map(T1(_,1)).toDF()
val query = wordCounts.writeStream
.foreach(new TestForeachWriter())
.outputMode("append")//complete append
.trigger(ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
}
case class T1(value:String,num:Int)
//若用complete替换append,将报以下错误
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
4、源码:
/**
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
* the sink
* - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
* every time these is some updates
*
* @since 2.0.0
*/
def outputMode(outputMode: String): DataStreamWriter[T] = {
this.outputMode = outputMode.toLowerCase match {
case "append" =>
OutputMode.Append
case "complete" =>
OutputMode.Complete
case _ =>
throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
"Accepted output modes are 'append' and 'complete'")
}
this
}
网友评论