美文网首页
Structured Streaming之outputMode(

Structured Streaming之outputMode(

作者: 程序媛啊 | 来源:发表于2019-12-04 17:00 被阅读0次

    原文链接:https://blog.csdn.net/kaaosidao/article/details/85790664

    1.complete需要聚合,并将原先批次的数据和本次批次的数据一起聚合,而append是不能聚合的

    2.若用append替换complete代码演示:

    
    def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().master("local[1]").getOrCreate()
            import spark.implicits._
            val wordCounts  = spark.readStream.text("D:\\tmp\\streaming\\struct")
                .as[String].flatMap(_.split(" "))
                .groupBy("value").count()
     
     
            val query = wordCounts.writeStream
                    .foreach(new TestForeachWriter())
                .outputMode("complete")//complete  append
                .trigger(ProcessingTime("10 seconds"))
                .start()
            query.awaitTermination()
     
        }
    //正常运行,若将complete改为append,将报以下错误
    org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;
    

    3.若用complete替换append代码演示:

    def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().master("local[1]").getOrCreate()
            import spark.implicits._
            val wordCounts  = spark.readStream.text("D:\\tmp\\streaming\\struct")
                        .as[String].flatMap(_.split(" ")).map(T1(_,1)).toDF()
     
            val query = wordCounts.writeStream
                    .foreach(new TestForeachWriter())
                .outputMode("append")//complete  append
                .trigger(ProcessingTime("10 seconds"))
                .start()
            query.awaitTermination()
     
        }
        case class T1(value:String,num:Int)
    //若用complete替换append,将报以下错误
    org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
    

    4、源码:

    
    /**
       * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
       *   - `append`:   only the new rows in the streaming DataFrame/Dataset will be written to
       *                 the sink
       *   - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
       *                 every time these is some updates
       *
       * @since 2.0.0
       */
      def outputMode(outputMode: String): DataStreamWriter[T] = {
        this.outputMode = outputMode.toLowerCase match {
          case "append" =>
            OutputMode.Append
          case "complete" =>
            OutputMode.Complete
          case _ =>
            throw new IllegalArgumentException(s"Unknown output mode $outputMode. " +
              "Accepted output modes are 'append' and 'complete'")
        }
        this
    }
    

    相关文章

      网友评论

          本文标题:Structured Streaming之outputMode(

          本文链接:https://www.haomeiwen.com/subject/dsplgctx.html