- Structured Streaming同一个进程支持多维度的统
- 182、Spark 2.0新特性之智能化Structured S
- spark之旅-6.structured streaming
- Structured Streaming中的Join
- Spark的那些事(二)Structured streaming
- 202、Spark 2.0之Structured Streami
- 201、Spark 2.0之Structured Streami
- StreamingPro 再次支持 Structured Str
- structured streaming 介绍
- Structured Streaming如何实现Parquet存
Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.
-
Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
-
Limit and take first N rows are not supported on streaming Datasets.
-
Distinct operations on streaming Datasets are not supported.
-
Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
-
Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
-
count()
- Cannot return a single count from a streaming Dataset. Instead, useds.groupBy().count()
which returns a streaming Dataset containing a running count. -
foreach()
- Instead useds.writeStream.foreach(...)
(see next section). -
show()
- Instead use the console sink (see next section).
If you try any of these operations, you will see an AnalysisException
like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.
由于Structured Streaming不支持两个流之间的join,但是我们在流计算的业务场景中经常需要,既要按天统计,也需要按小时来统计,并同时输出的场景;但是Structured Streaming 可以支持同一个流分成两个相同的流去分组聚合,跑两个query;代码如下:
package com.spark.sunny.structuredstreaming
import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoders
case class DeviceData(deviceId : String, usage : String,duration : String,eventBeginTime : String, serviceType : String)
/**
* <Description> <br>
*
* @author Sunny<br>
* @taskId: <br>
* @version 1.0<br>
* @createDate 2018/06/23 11:01 <br>
* @see com.spark.sunny.structuredstreaming <br>
*/
object StructuredHdfsDevice {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("StructuredHdfsDevice")
.master("local")
.getOrCreate()
val schema = Encoders.product[DeviceData].schema
val lines = spark.readStream
.format("json")
.schema(schema)
.load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
import spark.implicits._
val beginTimeDevice = lines
.withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyyMMddHHmmss")))
.withColumn("eventBeginHour", substring($"eventBeginTime", 0, 10))
.withColumn("eventBeginDay", substring($"eventBeginTime", 0, 8))
val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")
val queryHour = hourDevice.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
.agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")
val queryDay = dayDevice.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
queryHour.awaitTermination()
queryDay.awaitTermination()
}
}
网友评论