论文概要
该论文是Spark团队在2018年发表的一篇基于Spark引擎之上新构建的一套Streaming api,Structured Streaming项目在2016年就已经开始开发了。
Structured Streaming采用了不同于Spark Streaming、Flink这类DataStream的角度来处理流数据。Structured Streaming视图通过“增量查询模型”来处理流数据,将流式数据作为增量数据(也就是分批)进行处理。
待定,看他API。
流处理现状与挑战
虽然近些年分布式流处理系统取得了巨大的进步,但是我们基于我们在Spark Streaming上的经验,认为在实际使用过程中仍然具有很多挑战。
复杂低级的API(Complex and Low-Level APIs)
流处理系统之所以在使用上相较批处理复杂很多,主要的原因就是流处理系统中的复杂API语义。这些复杂的语义需要用户感知和并且指定具体的物理执行操作,比如物理执行计划编排、at-least-once语义、state storage和触发模式等。
比如在Google Dataflow模型中,能够处理基于event-time聚合、窗口和乱序数据。但是需要用户指定窗口模式、触发模式等,这样就需要用户手写物理算子执行计划图。向Spark Streaming和Apache Flink的DataStream API,也需要编排任务的物理执行DAG,并且管理复杂的状态。
对于这类问题,Structured Streaming使用增量查询模型来解决。对于简单的流应用,直接使用增量查询模型来进行表达;对于复杂的应用,也可以在该模型之上很容易的添加自定Stateful Operator来满足。(感觉就是Flink的High-level SQL和Stateful Stream Processing API)
Structured Streaming的自动增量查询模型就是通过对Spark SQL和DataFrame/DataSet API描述的静态数据集进行增量查询。所以用户只需要了解Spark 的批处理API就可以编写流处理作业。
不太同意论文这里的描述。首先,Structured Streaming增量模型就是直接使用的Saprk SQL和DataFrame API。但像Flink不止有DataStream 这类low level api,也很早就支持SQL、Table API这类High-level API了。
端到端集成挑战
在实际使用中,流处理系统往往是大型业务系统的一部分,业务系统同时也包括批处理、join静态数据和交互查询等。传统的流处理系统API主要关注的是读取数据源数据,并将流数据输出到接收器中,但作为端到端的业务应用程序,可能还需要执行其他任务。
比如在ETL场景,任务可能需要去join另一个存储系统或批计算转换出来的静态数据。这时候保证两个系统数据的一致性,以及能够使用一套API来将业务表达出来,就非常重要了。再比如在我一个流处理应用中,我需要通过批出的能力来处理一些历史数据(追数据)。
对于这类问题,我们将Structured Streaming和Spark 中的批处理和交互式API 进行了紧密的集成。
我理解Google Dataflow模型视图要解决的其中一个问题就是将通过一套执行引擎来支持批、微批、流等场景,所以应该能够很容易做流批这类集成。但是目前Flink貌似确实不支持流批的集成。
运维挑战
部署完流应用后,如何管理和运维这些任务也是一项非常大的挑战,比如失败处理(Failover,包括单点故障、优雅停止作业、作业重启继续处理等)、代码更新(应用代码变更、系统版本升级,这类作业重启后需要继续从之前的位置进行数据处理)、弹性伸缩(Rescaling)、热点处理(Stragglers)和监控等等。
对于运维挑战,Structured Streaming通过以下几种方式来处理:
-
容错处理,通过WAL和state store 进行回滚和恢复。
-
节点crash、伸缩和慢节点处理,通过自动新增节点来支持。
-
UDF代码更新,通过作业停止、重启解决。
-
微批处理模式,通过自适应批大小(adaptively batch)来应对负载高峰和追历史数据。
成本和性能挑战
流应用往往是7 * 24h运行,如果系统不具备动态伸缩能力,那么就需要以高峰流量的资源来长时间运行任务,即便具备动态伸缩能力,连续计算结果的成本可能也比运行定期批处理作业昂贵。
对于这类问题,Structured Streaming复用Spark SQL中所有执行优化能力,来提升系统的吞吐能力。
Structured Streaming组件架构
下面是Structured Streaming组件构成图,主要包括:数据流输入和输出connector、API、Structured Streaing执行引擎。
![](https://img.haomeiwen.com/i4120356/5bb40b177a7f1c07.png)
输入输出Connector
Structured Streaming 提供了用于I/O的各类输入数据源和输出接收器,并且提供了“exactly-once”语义的输出和容错。为了保证“exactly-once”语义,Structured Streaming系统和其它流系统基本类似,对输入source和输出sink有进行了限制。
- 输入source必须具备重放功能(replayable),允许重复读取最近数据。
- 输出sink必须支持幂等写(idempotent writes),如果节点在写入sink时发生fo,能够确保可靠恢复。
上面组件图我们看到,Structured Streaming除了能够接收外部数据流(比如Kafka)系统外,还支持接收Spark SQL中的Table。而且Structured Streaming也能将数据流输出到Spark Table中,这样用户就可以对该table进行交互查询了。(这就是Structured Streaming中端到端集成的方式,能够接收batch 类型的table,也能输出table进行交互查询)。
API
上面我们提到过,Structured Streaming作业是可以通过Spark SQL的batch API(SQL和DataFrame)来描述任务的。
Structured Streaming为了能够支持一些Streaming场景,在原有Spark SQL API之上,增加了一些新API,这些新API也能在batch场景工作。
-
触发控制(Triggers control),触发引擎计算新的结果集并且更新到输出接收器中。
-
指定event time字段和设置watermark 策略。
-
Stateful operator操作,允许用户实现复杂任务处理,类似Spark Streaming中的"updateStateByKey" API。
上面的1和2是Structured Streaming从Google Dataflow 模型中借鉴的思想。
执行引擎
执行引擎主要分为三部分:递增器(Incrementalizer)、优化器(Optimizer)和处理器。递增器和优化器用于对用户编写的query进行处理。
Structured Streaming提供了两类处理器:微批模式的Microbatch Execution和连续处理模式的Coniuous Processing。默认系统使用微批模式,微批模式支持动态负载、伸缩、容错恢复和慢节点处理。这两类处理器都能支持容错,都是用两种形式的持久存储来实现容错
- WAL(write-ahead log),WAL用于跟踪数据是否可靠处理完成。对于一些sink,可以集成WAL来实现原子接收器(sink atomic)。
- 大规模状态存储(state store),采用状态存储的方式来保存长时间运行的聚合算子状态快照。
值得吐槽的是Spark 2.3开始支持连续处理模式,但是该执行器一直处于试验阶段。
无论WAL还是Checkpoint,都是记录每次触发器中正在处理数据的偏移范围。
Structured Streaming编程模型
Structured Streaming的核心思想就是将数据流抽象为一个连续不断追加的表。这是一种新流处理模型,通过使用类似批处理的查询,来查询这个静态无界的表,查询过程就是Spark对表中的增量数据进行查询。
数据流中的数据,作为这个无界表的新纪录供Spark进行增量查询处理。
![](https://img.haomeiwen.com/i4120356/cd66325e1adbece4.png)
下面Structured Streaming的增量编程模型,每个触发间隔(比如下面的1s),新数据追加到input table中,然后对input table中的增量部分进行查询(这个查询就是map、flatmap这类算子),并将增量结果更新到result table中,最后将result table根据不同的输出模式,将result table输出。
![](https://img.haomeiwen.com/i4120356/a268a5e591ddc9d6.png)
下面是这张带有实例数据的图,更容易理解。
-
第一次触发时,输入数据“cat dog”和“dog dog”,查询逻辑对新增数据进行word count查询计算,得到“cat=1, dog = 3”。
-
第二次输入数据触发,读取“owl cat”加入到input table中(此时input table 中包括了原始数据“cat dog”、“dog dog”和增量数据“owl cat”),这时候Spark在进行增量查询(查询新加入表中的“owl cat”),并和result table进行合并。
-
第三次输入“dog” 和 “owl”,查询计算逻辑和上面一样。
![](https://img.haomeiwen.com/i4120356/814f7af0b94db8f0.png)
下面是一段Structured Streaming的DataFrame API代码示例,我们从代码上来理解Structured Streaming的编程模型:
-
读取指定目录下的json数据(注意这里是流式读取,也就是这个目录会一直新增json文件),data就是对应了上面input table。
-
对data中增量数据进行聚合查询计数(按照数据中的"contry"字段),counts对应了上面的result table。
-
将result table以parquet文件格式输出,输出模型为“complete”。
![](https://img.haomeiwen.com/i4120356/ae45c5b11cbaab82.png)
上面的模型图和实例代码,我们都看到在result table到输出到外部存储时,有一个output mode参数。该参数定义了Structured Streaming的输出模式。
-
complete mode,将整个更新的结果表全量写入到外部存储。如何存储整个表取决于sink connector。比如上面将每次更新的全量结果都写到一个parquet文件中。
-
append mode,只将上次result table中追加的内容写入到外部存储。该模式只适用于结果表不会有更新行的查询场景。
-
update mode,只将上次result table中更新的内容写入到外部存储。
![](https://img.haomeiwen.com/i4120356/b7fdb1592dd96967.png)
查询计划
Structured Streaming的查询计划是通过Spark SQL中的Catalyst可扩展优化器来实现的。查询计划的处理主要分为三个阶段:
-
分析阶段(Analysis),分析阶段主要用于校验用户查询是否能够被增量执行引擎执行和解析查询引用中的属性和数据类型。比如对于append 模式的输出类型,只能用于查询输出结果是单调的查询算子,也就是输出结果不能移除。
-
增量化(Incrementalization),增量化的过程就是递增用户提供的静态查询,以便更新结果来响应新数据。(我理解就是比如之前有一组查询了,新的查询要和之前查询逻辑合并,已达到更新处理逻辑的目的)。在这步通过Catalyst转换规则,将将查询映射执行计算和状态管理的算子树中,也就是翻译成执行树(对应Flink中的DAG)。比如aggregation会翻译为StatefulAggregate算子。
-
查询优化(Query Optimization),直接使用Spark SQL中的优化规则,比如谓词下推、简化表达之类的。
应用执行
状态管理和容错
上面我们提到过,Structured Streaming目前支持两种类型的执行模式,微批执行模式和连续处理模式。无论哪种模式都是通过两种外部存储来管理状态,支持低延迟原子写入的WAL和能够存储大量数据的state store,比如HDFS、S3等。Structured Streaming这种状态管理方式基本和Spark Streaming类似。
![](https://img.haomeiwen.com/i4120356/a37ffa1461f464a2.png)
上图是Structured Streaming的状态管理逻辑图,可以看到:
-
Input operator负责定义每个epoch(纪元、时代)并且保存这个epoch的相关信息到WAL,比如offset(有一点类似Flink中Checkpoint对应的offset)。
-
Stateful operator根据当前的epoch也异步检查点状态(这个操作其实是将算子内存数据写到state中),需要注意的是这里不会每个epoch都去做Checkpoint(epoch很短)。
-
Output operator记录哪些epoch的数据成功输出到sink中,成功的epoch数据会commit到WAL中。因为epoch是串行的,也就是只有上一个epoch commit后才能commit下一个epoch,所以当节点fo时,最后一个epoch数据可能会重新写。
当作业重启或恢复时,应用从WAL读取最后一次没有成功commit的epoch,包括start offset和end offset。对于Stateful operator会加载最近一次的epoch的状态数据,上面我们说过Stateful operator并不会每次epoch都生成检查点,所以Stateful operator从state store加载最近的状态数据同时,以这个状态数据的offset重新进行计算(这时候会禁掉output),等状态数据恢复到和input source 相同offset后,开始处理最后一次没有commit成功的epoch数据。
上面的状态管理与容错,对于用户来说都是透明的,用户代码逻辑不需要针对这些做任何事情。
这种机制和Storm的ack机制有点类似。
微批执行模式
微批执行模式就是Spark Streaming中的离散流执行模式,该模式能够进行dynamic load balancing、rescaling、缓解慢节点和细粒度容错恢复(无需全局回滚)。
该模式下epoch一般是几百毫秒到几秒钟,并且每个epoch就是一个传统Spark job组成的 DAG任务(也就是攒一批数据,启动一个DAG任务)。比如一个查询操作在select 后跟随了一个aggregation操作,实际执行时就select可能就转换为了一组map task,aggregation转换为了reduce task,这时这组查询就对应了map和task组成的DAG。 reduce中的状态数据是在内存中的,并且会定期checkpoint到state store中。
使用该模式的优点:
-
Dynamic load balancing,因为每个operator都会被拆分为很小并且相互独立的task,所以他们能够调度到任意节点,所以当一个node执行缓慢时,可以在其它节点启动满节点的copy back task来执行任务。
-
细粒度容错恢复,当一个节点挂掉,只需要重新执行这个节点的task(因为他就是batch task),而不用向其它长时间存活operator的流处理引擎那样,全局恢复到上一次检查点。而且Structured Streaming恢复时,可以增加并发度,来加速恢复。
-
Rescaling,增加和删除节点都很简单,因为当下一次epoch dag调度的时候,只会调度到存活的节点。
-
规模和吞吐,以为直接复用Spark的批出引擎,所以spark的所有优化都能利用,比如高性能shuffle实现等。
该模式的主要缺点就是延迟高,因为每次启动DAG task时都存在开销。
连续处理模式
连续处理模式是Spark 2.3版本时加入的,Structured Streaming任务使用long-lived operator来执行。该模式延迟非常低,但是失去了灵活度,比如限制了运行时的rescaling。
连续模式设计和提出的原因并不是想要提供一种独立的执行策略,而是想要推广Structured Streaming API。因为早期的Spark Streaming API在编程模型上,一些操作对用户透露了微批语义,用户很难将程序移动到其它引擎之上。而Structured Streaming API是与引擎无关的实现,能够迁移到其它引擎之上。
略微牵强,一方面很少见直接将当前引擎代码迁移到其它引擎之上,另一方面既然Structured Streaming API已经与引擎不是强绑定,运行在微批模式不也是ok的?
在这个版本的连续处理模式只支持简单类似map操作的任务,不支持shuffle操作。在目前最新的Structured Streaming文档中(好像一直停留在2.4),该模式还是处于试验阶段,并且只能保证at-least-once,算子支持上也还不能支持shuffle类型。
下面是官网的在该模式下的demo,相较mico-batch模式,只是将Trigger修改为连续模式。下面的1秒代表,每秒触发一次Checkpoint,FO恢复时从上一次Checkpoint恢复。
<pre data-language="java" id="ZSynd" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">mport org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();</pre>
有文章说该模式也是使用的Chandy-Lamport分布式快照算法,待确定。
网友评论