美文网首页Spark学习之路
Structured-Streaming 学习三-exactly

Structured-Streaming 学习三-exactly

作者: 九七学姐 | 来源:发表于2017-07-24 17:04 被阅读464次

    #一. 流处理的三个步骤:
    1). Receiving the data
    2). Transforming the data
    3). Pushing out the data

    二. 三种类型保证数据一致性

    1). At most once:每条记录最多被处理一次或不被处理,数据有可能丢失。
    2).At least once:每条记录至少被处理一次,保证数据不丢失,但是有可能重复。
    3).Exactly once:每条记录只被处理一次,保证数据不丢失的同时数据也不重复。

    三. structured streaming如何保证end to end exactly-once?

    1. structured streaming exactly-once的流程

    structured streaming通过三个组件(source、sink、StreamExection),每个组件保证其内部的容错性,来保证structured streaming整体的end to end exactly-once。与Spark Streaming相比,最重要的是state management的区别,加了个全局变量——state store。其内存储了operationID+partitionID,且是存储于HDFS中分布式实现,使用时与version进一步确定要操作的数据。对,其operator也是分区的!
    下图是StreamExecution的持续查询的实现:

    StreamingExecution的持续查询

    exactly-once执行的过程如上图:
    这里有 6 个关键步骤:
    1). StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度;
    2). StreamExecution 将 offsets 等写入到 offsetLog 里, 这里的 offsetLog 是一个持久化的 WAL (Write-Ahead-Log),是将来可用作故障恢复;
    3). StreamExecution 构造本次执行的 LogicalPlan
    (3a) 将预先定义好的逻辑(即 StreamExecution 里的 logicalPlan 成员变量)制作一个副本出来
    (3b) 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 Dataset/DataFrame 表示,并替换到 (3a) 中的副本里
    经过 (3a), (3b) 两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 Dataset/DataFrame 变换(即整个处理逻辑)了
    4). 触发对本次执行的 LogicalPlan 的优化,得到 IncrementalExecution
    逻辑计划的优化:通过 Catalyst 优化器完成
    物理计划的生成与选择:结果是可以直接用于执行的 RDD DAG
    逻辑计划、优化的逻辑计划、物理计划、及最后结果 RDD DAG,合并起来就是 IncrementalExecution
    5). 将表示计算结果的 Dataset/DataFrame (包含 IncrementalExecution) 交给 Sink,即调用 Sink.add(ds/df)
    6). 计算完成后的 commit
    (6a) 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection
    (6b) 将本次执行的批次 id 写入到 batchCommitLog 里

    2. StreamExecution 的持续查询(增量)

    StreamExecution 的持续查询(增量)

    Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据(而不仅仅是本次执行信收到的数据),所以每次执行的结果是针对全量数据进行计算的结果。

    但是在实际执行过程中,由于全量数据会越攒越多,那么每次对全量数据进行计算的代价和消耗会越来越大。

    Structured Streaming 的做法是:

    引入全局范围、高可用的 StateStore
    转全量为增量,即在每次执行时:
        先从 StateStore 里 restore 出上次执行后的状态
        然后加入本执行的新数据,再进行计算
        如果有状态改变,将把改变的状态重新 save 到 StateStore 里
    为了在 Dataset/DataFrame 框架里完成对 StateStore 的 restore 和 save 操作,引入两个新的物理计划节点 —— StateStoreRestoreExec 和 StateStoreSaveExec
    

    所以 Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据,但在具体实现上转换为增量的持续查询。

    3. 故障恢复

    通过前面小节的解析,我们知道存储 source offsets 的 offsetLog,和存储计算状态的 StateStore,是全局高可用的。仍然采用前面的示意图,offsetLog 和 StateStore 被特殊标识为紫色,代表高可用。

    Spark 1.0Spark 1.0
    由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,我们本节的故障恢复只讨论 driver 故障恢复。
    如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
    读取 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步
    读取 batchCommitLog 决定是否需要重做最近一个批次
    如果需要,那么重做 (3a), (3b), (4), (5), (6a), (6b) 步这里第 (5) 步需要分两种情况讨论(i) 如果上次执行在 (5) 结束前即失效,那么本次执行里 sink 应该完整写出计算结果
    (ii) 如果上次执行在 (5) 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)

    这样即可保证每次执行的计算结果,在 sink 这个层面,是 不重不丢 的 —— 即使中间发生过 1 次或以上的失效和恢复。

    四. 聚合(aggregation)

    聚合操作,处理数据记录并返回计算结果,可将多个文档中的值组合起来,对成组数据执行各种操作,返回单一的结果,相当于SQL中的count(*)组合groupby.
    注意:Append模式不支持基于数据流上的聚合操作(Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets)

    主要参考文章:
    Structured Streaming 源码解析系列

    相关文章

      网友评论

        本文标题:Structured-Streaming 学习三-exactly

        本文链接:https://www.haomeiwen.com/subject/qbmskxtx.html