美文网首页
Structured Streaming

Structured Streaming

作者: 丹之 | 来源:发表于2019-01-06 10:25 被阅读86次

编程模型

Structured Streaming 的关键思想是将持续不断的数据当做一个不断追加的表。

基本概念

将输入的流数据当做一张 “输入表”。把每一条到达的数据作为输入表的新的一行来追加。



在输入表上执行的查询将会生成 “结果表”。每个触发间隔(trigger interval)(例如 1s),新的行追加到输入表,最终更新结果表。无论何时更新结果表,我们都希望将更改的结果行 output 到外部存储/接收器(external sink)。



output 有以下三种模式:
  • Complete Mode:整个更新的结果表将被写入外部存储。由存储连接器(storage connector)决定如何处理整个表的写入
  • Append Mode:只有结果表中自上次触发后附加的新行将被写入外部存储。这仅适用于不期望更改结果表中现有行的查询。
  • Update Mode:只有自上次触发后结果表中更新的行将被写入外部存储(自 Spark 2.1.1 起可用)。 请注意,这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。如果查询不包含聚合操作,它将等同于附加模式。
    请注意,每种模式适用于某些类型的查询。

为了说明这个模型的使用,让我们来进一步理解上面的快速示例:

  • 最开始的 DataFrame lines 为输入表
  • 最后的 DataFrame wordCounts 为结果表
    在流上执行的查询将 DataFrame lines 转化为 DataFrame wordCounts 与在静态 DataFrame 上执行的操作完全相同。当启动计算后,Spark 会不断从 socket 连接接收数据。如果有新的数据到达,Spark将运行一个 “增量” 查询,将以前的 counts 与新数据相结合,以计算更新的 counts,如下所示:



    这种模式与许多其他流处理引擎有显著差异。许多流处理引擎要求用户自己维护运行的状态,因此必须对容错和数据一致性(at-least-once, or at-most-once, or exactly-once)进行处理。 在这个模型中,当有新数据时,Spark负责更新结果表,从而减轻用户的工作。

流式 DataFrames/Datasets 上的操作

可以在流式 DataFrames/Datasets 上应用各种操作:从无类型,类似 SQL 的操作(比如 select、where、groupBy),到类似有类型的 RDD 操作(比如 map、filter、flatMap)

基本操作 - Selection, Projection, Aggregation

大部分常见的 DataFrame/Dataset 操作也支持流式的 DataFrame/Dataset。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

event-time(事件时间)上的 window 操作

使用 Structured Streaming 进行滑动的 event-time 窗口聚合是很简单的,与分组聚合非常类似。在分组聚合中,为用户指定的分组列中的每个唯一值维护一个聚合值(例如计数)。在基于 window 的聚合的情况下,为每个 window 维护聚合(aggregate values),流式追加的行根据 event-time 落入相应的聚合。让我们通过下图来理解。

想象下,我们的快速示例现在改成了包含数据生成的时间。现在我们想在 10 分钟的 window 内计算 word count,每 5 分钟更新一次。比如 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 等。12:00 - 12:10 是指数据在 12:00 之后 12:10 之前到达。现在,考虑一个 word 在 12:07 的时候接收到。该 word 应当增加 12:00 - 12:10 和 12:05 - 12:15 相应的 counts。所以 counts 会被分组的 key 和 window 分组。

结果表将如下所示:



由于这里的 window 与 group 非常类似,在代码上,你可以使用 groupBy 和 window 来表达 window 聚合。例子如下:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Watermark 和延迟数据处理

现在考虑一个数据延迟到达会怎么样。例如,一个在 12:04 生成的 word 在 12:11 被接收到。application 会使用 12:04 而不是 12:11 去更新 12:00 - 12:10的 counts。这在基于 window 的分组中很常见。Structured Streaming 会长时间维持部分聚合的中间状态,以便于后期数据可以正确更新旧 window 的聚合,如下所示:



然后,当 query 运行了好几天,系统必须限制其累积的内存中中间状态的数量。这意味着系统需要知道什么时候可以从内存状态中删除旧的聚合,因为 application 不会再为该聚合更晚的数据进行聚合操作。为启动此功能,在Spark 2.1中,引入了 watermark(水印),使引擎自动跟踪数据中的当前事件时间,并相应地清理旧状态。你可以通过指定事件时间列来定义一个 query 的 watermark 和 late threshold(延迟时间阈值)。对于一个开始于 T 的 window,引擎会保持中间状态并允许后期的数据对该状态进行更新直到 max event time seen by the engine - late threshold > T。换句话说,在延迟时间阈值范围内的延迟数据会被聚合,但超过该阈值的数据会被丢弃。让我们以一个例子来理解这一点。我们可以使用 withWatermark() 定义一个 watermark,如下所示:

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

在这个例子中,我们定义了基于 timestamp 列定义了 watermark,并且将 10 分钟定义为允许数据延迟的阈值。如果该数据以 update 输出模式运行:

  • 引擎将不断更新结果表中 window 中的 counts 直到该 window 比 watermark 更旧
  • 数据中的 timestamp 值比当前的最大 event-time 落后 10 分钟以上的数据将被丢弃
    以下为示图:



    如图所示,引擎跟踪的最大 event-time 是蓝色虚线,并且在每个 trigger 开始时设置 watermark 为 (max event time - '10 mins') 的红线例如,当引擎发现 (12:14, dog) 时将下次 trigger 的 watermark 设置为 12:04。然后,当 watermark 更新为 12:11 时,window (12:00 - 12:10) 的中间状态被清除,所有后续数据(例如(12:04,donkey))被认为是“太晚”,因此被丢弃。根据 output 模式,每次触发后,更新的计数(即紫色行)都将作为触发输出进行写入到 sink。

某些 sink(例如文件)可能不支持 update mode 所需的细粒度更新。所以,我们还支持 append 模式,只有最后确定的计数被写入。这如下图所示。

注意,在非流式 Dataset 上使用 withWatermark 是无效的空操作。



与之前的 update mode 类似,引擎维护每个 window 的中间计数。只有当 window < watermark 时才会删除 window 的中间状态数据,并将该 window 最终的 counts 追加到结果表或 sink 中。例如,window 12:00 - 12:10 的最终结果将在 watermark 更新到 12:11 后再追加到结果表中。
watermark 清除聚合状态的条件十分重要,为了清理聚合状态,必须满足以下条件(自 Spark 2.1.1 起,将来可能会有变化):

  • output mode 必须为 append 或 update:complete mode 需要保留所有的聚合数据,因此 watermark 不能用来清理聚合数据
  • 聚合必须具有 event-time 列或基于 event-time 的 window
  • withWatermark 必须调用在用来聚合的时间列上。比如 df.withWatermark("time", "1 min").groupBy("time2").count() 是无效的
  • withWatermark 必须在调用聚合前调用来说明 watermark 的细节。比如,df.groupBy("time").count().withWatermark("time", "1 min") 是无效的

Output Modes

有几种类型的输出模式:

Append mode(默认的):这是默认模式,其中只有从上次触发后添加到结果表的新行将被输出到 sink。适用于那些添加到结果表中的行从不会更改的查询。只有 select、where、map、flatMap、filter、join 等查询会支持 Append mode
Complete mode:每次 trigger 后,整个结果表将被输出到 sink。聚合查询(aggregation queries)支持该模式
Update mode:(自 Spark 2.1.1 可用)。只有结果表中自上次 trigger 后更新的行将被输出到 sink
不同类型的流式 query 支持不同的 output mode。以下是兼容性:


https://github.com/xy2953396112/spark-sourcecodes-analysis/blob/master/structured-streaming/Structured-Streaming-%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97.md

相关文章

网友评论

      本文标题:Structured Streaming

      本文链接:https://www.haomeiwen.com/subject/yjajrqtx.html