美文网首页
Structured Streaming同一个进程支持多维度的统

Structured Streaming同一个进程支持多维度的统

作者: SunnyMore | 来源:发表于2018-07-04 09:25 被阅读56次

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

由于Structured Streaming不支持两个流之间的join,但是我们在流计算的业务场景中经常需要,既要按天统计,也需要按小时来统计,并同时输出的场景;但是Structured Streaming 可以支持同一个流分成两个相同的流去分组聚合,跑两个query;代码如下:

package com.spark.sunny.structuredstreaming

import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoders

case class DeviceData(deviceId : String, usage : String,duration : String,eventBeginTime : String, serviceType : String)

/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/23 11:01 <br>
  * @see com.spark.sunny.structuredstreaming <br>
  */
object StructuredHdfsDevice {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("StructuredHdfsDevice")
      .master("local")
      .getOrCreate()
    val schema = Encoders.product[DeviceData].schema
    val lines =  spark.readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
    import spark.implicits._
    val beginTimeDevice = lines
      .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyyMMddHHmmss")))
      .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 10))
      .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 8))

    val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    val queryHour = hourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

    val queryDay = dayDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    queryHour.awaitTermination()
    queryDay.awaitTermination()
  }
}

需要注意的是:不能使用Query start之后,马上去调用awaitTermination,因为之会阻塞第二个分支Query的执行,而应该在所有的Query执行完start之后,使用sparkSession.streams.awaitAnyTermination(),只有这样才能确保两个分支Query都能启动。

相关文章

网友评论

      本文标题:Structured Streaming同一个进程支持多维度的统

      本文链接:https://www.haomeiwen.com/subject/hwwsuftx.html