美文网首页
205、Spark 2.0之Structured Streami

205、Spark 2.0之Structured Streami

作者: ZFH__ZJ | 来源:发表于2019-02-12 17:27 被阅读0次

    output操作

    定义好了各种计算操作之后,就需要启动这个应用。此时就需要使用DataStreamWriter,通过spark.writeStream()方法返回。此时需要指定以下一些信息:
    output sink的一些细节:数据格式、位置等。
    output mode:以哪种方式将result table的数据写入sink。
    query name:指定查询的标识。
    trigger interval:如果不指定,那么默认就会尽可能快速地处理数据,只要之前的数据处理完,就会立即处理下一条数据。如果上一个数据还没处理完,而这一个trigger也错过了,那么会一起放入下一个trigger再处理。
    checkpoint地址:对于某些sink,可以做到一次且仅一次的语义,此时需要指定一个目录,进而可以将一些元信息写入其中。一般会是类似hdfs上的容错目录。

    output mode

    目前仅仅支持两种output mode
    append mode:仅适用于不包含聚合操作的查询。
    complete mode:仅适用于包含聚合操作的查询。

    output sink

    目前有一些内置支持的sink
    file sink:在spark 2.0中,仅仅支持parquet文件,以及append模式
    foreach sink
    console sink:仅供调试
    memory sink:仅供调试


    output sink.png

    代码

    val noAggDF = deviceDataDf.select("device").where("signal > 10")   
    
    noAggDF
       .writeStream
       .format("console")
       .start()
    
    noAggDF
       .writeStream
       .parquet("path/to/destination/directory")
       .start()
       
    val aggDF = df.groupBy(“device”).count()
    
    aggDF
       .writeStream
       .outputMode("complete")
       .format("console")
       .start()
    
    aggDF
       .writeStream
       .queryName("aggregates")    // this query name will be the table name
       .outputMode("complete")
       .format("memory")
       .start()
    
    spark.sql("select * from aggregates").show()   // interactively query in-memory table
    

    foreach sink详解

    使用foreach sink时,我们需要自定义ForeachWriter,并且自定义处理每条数据的业务逻辑。每次trigger发生后,根据output mode需要写入sink的数据,就会传递给ForeachWriter来进行处理。使用如下方式来定义ForeachWriter:

    datasetOfString.write.foreach(new ForeachWriter[String] {
      def open(partitionId: Long, version: Long): Boolean = {
        // open connection
      }
      def process(record: String) = {
        // write string to connection
      }
      def close(errorOrNull: Throwable): Unit = {
        // close the connection
      }
    })
    

    需要有如下一些注意点:

    1. ForeachWriter必须支持序列化,因为该对象会被序列化后发送到executor上去执行。
    2. open、process和close这三个方法都会给executor调用。
    3. ForeachWriter所有的初始化方法,必须创建数据库连接,开启一个事务,打开一个IO流等等,都必须在open方法中完成。必须注意,如果在ForeachWriter的构造函数中进行初始化,那么这些操作都是在driver上发生的。
    4. open中有两个参数,version和partition,可以唯一标识一批需要处理的数据。每次发生一次trigger,version就会自增长一次。partition是要处理的结果数据的分区号。因为output操作是分布式执行的,会分布在多个executor上并行执行。
    5. open可以使用version和partition来决定,是否要处理这一批数据。此时可以选择返回true或false。如果返回false,那么process不会被调用。举个例子来说,有些partition的数据可能已经被持久化了,而另外一些partiton的处理操作由于失败被重试,此时之前已经被持久化的数据可以不再次进行持久化,避免重复计算。
    6. close方法中,需要处理一些异常,以及一些资源的释放。

    相关文章

      网友评论

          本文标题:205、Spark 2.0之Structured Streami

          本文链接:https://www.haomeiwen.com/subject/dygceqtx.html