美文网首页
流式计算论文阅读

流式计算论文阅读

作者: 灯火gg | 来源:发表于2022-06-27 20:14 被阅读0次

    一、S4:Distributed Stream Computing Platform

    一个开源系统 Apache S4。和同样孵化自 Yahoo 的 Hadoop 不同,S4 虽然是最早发布的开源分布式流式数据处理系统,但是在市场上最终却没有占有一席之地。

    1、 s4逻辑模型

    S4将所有的计算过程,都抽象成了一个个PE(Processing Element)元素,这里PE对象就是面向对象里的实际对象。

    每个对象有四部分要素:

    • 本身功能函数(functionality);
    • 事件类型(types of events);
    • 键(keyed attribute);
    • 值(value)。

    外部发送来的事件流组成由不同PE对象构成的有向无环图(DAG)处理数据的 PE 对象,可以选择处理完之后立刻发送一个新的事件出去;也可以选择在对象内部来维护一个状态,然后当处理了一定数量的消息之后,或者过了一个固定的事件间隔之后把消息发送出去,最后DAG的终点也是类似的发布频率。

    官方举例说明

    image.png

    S4把整个数据处理流程变成了一个DAG,开发只需要做两件事:

    • 设计计算逻辑DAG
    • 实现每个节点中的业务逻辑

    另外S4会有一些内置的PE用来处理像count、aggregate、join等标准任务,也可以使用S4的SDK来编写定制的PE。

    2、 缺点

    • 海量PE数据,设置TTL定期处理
    • 没有时间窗口概念
    • 计算节点挂掉数据丢失
    • 不支持动态扩容

    二、Storm @Twitter

    1、Topology 的逻辑模型

    • Storm弥补了S4的哪些不足?
    • Storm的容错机制、性能提升?

    和 S4 类似,Storm 系统的抽象模型,也是一个有向无环图。在 Storm 里,这样一个有向无环图,叫做 Topology,也就是拓扑图。整个图里有这样几个元素:

    • Spouts(流的源头) 对应S4里的无键PE
    • Tuple(元组) 传输数据的最小粒度单元,对应S4的事件
    • Stream(数据流) 包含无限个Tuple
    • Bolts(计算逻辑处理) 分布式处理任意数量的输入输出流
    image.png

    Storm 的抽象模型里,和 S4 的最大不同就在 Bolts 上。S4 的 PE,不仅是一个功能逻辑的单元,也是一个 KV 对的数据。同样类型的事件下,所有相同的 Key 的数据,都会聚合到同一个 PE 下。这就使得整个系统里有大量的 PE 对象,也导致 S4 的整个系统有几个显著的设计问题。

    2、Storm如何解耦数据分发

    Storm 里的 Bolt 更像是 MapReduce 里的 Map 或者Reduce 函数。我们可以在 Topology 里面,去设置不同 Bolt 的并行度,以及设置数据流是如何分组的。但是,每个 Bolt 输出的 Tuple 本身,却不需要通过生成一个类似于(SortID, N)这样一个特殊的 Key,来定义下一层的 Bolt 的并行度。在 Storm 里面,对应的数据流可以进行这样几种分组(Grouping):

    • 随机分组(Shuffle Grouping) 随机分发给下游Bolt,每个Bolt会收到数量接近的Tuple
    • 字段分组(Fields Grouping) 对所选字段分发Bolt
    • 全局分组(Global Grouping) 全部分发到下游一个Bolt
    • 无分组
    • 指向分组
    • 本地或随机分组 可以在同一台机器同一个进程内通信
    image.png

    3、At Least Once

    Storm 选择的解决方案,是把从 Spout 发起的第一个 Tuple 作为一棵树的根。下游所有衍生出来发送的 Tuple,都是这棵树的一部分。任何一个 Tuple 处理失败或者超时了,那么就从 Spout 重新发送消息。而要做到这一点,Storm 需要在系统里引入一个特殊的 Bolt,叫做 AckerBoltSpout 发送出去的消息,同时会通知给到 AckerBolt。而 Bolt 一旦处理完根 Tuple 相关的消息,也会通知给到 Acker。

    image.png
    16 bytes * 100000 * 30 =48MB
    1秒钟10万条,30秒也就是48兆
    

    异或xor是一个数据运算符,用于逻辑运算,同为0,异为1.
    A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。

    https://blog.csdn.net/dog250/article/details/79673952

    三、Kafka: a Distributed Messaging System for Log Processing

    有了 Kafka 之后,通过Hadoop/Spark 进行批数据处理,通过 Hive 搭建数据仓库,通过 Storm 进行流式数据处理,然后通过 Kafka 作为业务系统和大数据系统之间的消息管道,已经是一个完整而成熟的“标准方案”了。可以说,随着 Kafka 的发布,整个大数据领域开始迈入一个成熟的阶段。大部分公司都可以通过组合开源框架,搭建起完善的大数据系统,而不再需要自己去“造轮子”了。

    1、数据从哪来

    kafka与scribe、flume重要的不同点之一是拉数据而不是推数据,kafka具有优秀的消息存储性能和容错机制,而且不需要维护下游是否成功处理了这个状态。

    • 让所有的 Consumer 来“拉取”数据,而不是主动“推送”数据给到
      Consumer。并且,Consumer 到底消费完了哪些数据,是由 Consumer 自己维护的,而不是由 Kafka 这个消息队列来进行维护。
    • 采用了一个非常简单的追加文件写的方式来直接作为我们的消息队列。在Kafka 里,每一条消息并没有通过一个唯一的 message-id,来标识或者维护。整个消息队列也没有维护什么复杂的内存里的数据结构。下游的消费者,只需要维护一个此时它处理到的日志,在这个日志文件中的偏移量(offset)就好了。

    2、kafka的单个partition的读写实现

    • 在实际的实现上,Kafka 是这么做的。每一个 Topic 会有很多个 Partition,分布到不同的物理机器上。一个物理机上,可能会分配到多个 Partition。实际存储的时候,我们的一个Partition 是一个逻辑上的日志文件。在物理上,这个日志文件会给实现成一组大小基本相同的 Segment 文件,比如每个 Segment 是 1GB 大小。每当有新消息从 Producer 发过来的时候,Broker 就会把消息追加写入到最后那个 Segment 文件里。
    • 而为了性能考虑,Kafka 支持我们自己设置,是每次写入到把数据刷新到硬盘里,还是在写入了一定数量的日志或者经过一个固定的时间的时候,才把文件刷新到硬盘里。
    • Broker 会在内存里维护一个简单的索引,这个索引其实就是每个通过一个虚拟的偏移量,指向一个具体的 Segment 文件。那么在 Consumer 要消费数据的时候,就是根据Consumer 本地维护的已经处理完的偏移量,在索引里找到实际的 Segment 文件,然后去读取数据就好了
    image.png

    3、利用Linux Page Cache

    Kafka 写入的数据,本质上都还是在 Page Cache。而且因为我们是进行流式数据处理,读写的数据有很强的时间局部性,Broker 刚刚写入的数据,几乎立刻会被下游的Consumer 读取访问,所以大量的数据读写都会命中缓存。

    image.png

    避免了两个问题

    • JVM 里面的 GC(垃圾回收)的开销
    • 缓存的“冷启动问题”

    4、动态扩容

    • 利用zk监听-通知机制吧Broker注册到节点中,记录主机名、端口、主题、分区
    • Rebalance
      如果我们有 X 个分区和 Y 个 Consumer,那么 Kafka 会计算出 N=X/Y,然后把 0 到 N-1的分区分配给第一个 Consumer,N 到 2N-1 的分配给第二个 Consumer,依此类推。而因为之前 Partition 的数据处理到了哪个 Offset 是有记录的,所以新的 Consumer 很容易就能知道从哪里开始处理消息。
    image.png

    5、Kafka的限制

    • Kafka 很难提供针对单条消息的事务机制
    • Kafka 里,对于消息是没有严格的“顺序”定义的


      image.png

    四、数据处理的 Lambda 、Kappa 架构

    我们还需要MR吗? 需要。

    • 流处理保证了At Least Once 但是批处理做到了Exactly Once
    • 若没有kafka,流处理的容错就是一个最大的挑战

    1、Lambda 架构的基本思想

    Storm 的作者南森·马茨(Nathan Marz)提出了 Lamda 架构,把大数据的批处理和实时数据结合在一起,变成一个统一的架构。

    结构组成

    • 输入数据 Master Data
    • 批处理层和实时处理层对应MR和Topology
    • 服务层通常是一个数据库,将数据处理结果存储下来,后续生成的批处理结果会不断替代实时处理的结果。

    2、 Kappa架构的基本思想

    image.png

    lambda的缺点

    • 实时处理层计算一次,又要在批处理层计算一次
    • 写一套MR 、写一套Storm的Topology

    Kafka 的作者杰伊·克雷普斯(Jay Kreps)就提出了一个新的数据计算框架,称之为 Kappa 架构。

    通过改进流计算系统来解决数据全量处理的问题,使得实时计算和批处理过程使用同一套代码。此外Kappa架构认为只有在有必要的时候才会对历史数据进行重复计算,而如果需要重复计算时,Kappa架构下可以启动很多个实例进行重复计算,方式是通过上游重放完成(从数据源拉取数据重新计算)。

    • 选择一个具有重放功能,能够保存历史数据的消息队列,根据要求设置历史数据保存时长,例如:Kafka,可以设置保存全部历史数据。
    • 当某个或某些指标有重新处理的需求时,按照新逻辑编写新的作业,然后从上游消息队列最开始地方重新消费数据,把结果写往一个新的下游结果表。
    • 当新作业赶上进度后,切换数据源,读取新作业产生的结果表。
      停止老的作业,删除老的结果表。
    • 另外,Kappa 架构并不是中间结果完全不落地,现在很多大数据系统都需要支持机器学习(离线训练),所以实时中间结果需要落地对应的存储引擎供机器学习使用,另外有时候还需要对明细数据查询,这种场景也需要把实时明细层写出到对应的引擎中。

    缺点开发周期长

    五、Dataflow

    • storm论文里巧妙的异或(XOR)完成了At Least Once
    • kafka论文里利用zk记录的消息偏移量完成了kappa架构

    那google的MillWheel、Dataflow,到开源的 Apache Flink解决了哪些?

    • 为什么“Exactly Once”的消息处理是困难的,但又是必须的。
    • 为什么 Storm 的容错机制,比起实际需求远远不够,和 MapReduce 这样同样粗糙而原始的框架比起来,Storm 的容错机制缺失了什么。
    • “时间窗口(Time Window)”是一个什么样的概念,为什么这个概念对于流式数据处理系统如此重要

    1、MillWheel: Fault-Tolerant Stream Processing at Internet Scale

    • S4 和 Storm 的组合模型
    • 计算和流
    • Computation 定义不同的 key_extractor来复用流
    • Computation+key
    • Bigtable Tablet 分段key实现
    image.png

    从这个角度,MillWheel 的系统逻辑其实更像是 Storm,而 Computation + 一段 Key 的组合,其实就是一个 Bolt,需要处理某一段 Key。

    2、低水位(Low Watermarks)和定时器(Timers)

    每个 Computation 进程在解析消息时会把每条解析成(Key,Value,TimeStamp)这样一个三元组。TimeStamp就是事件发生的时间。

    image.png
    min(oldest work of A, low watermark of C:C outputs to A)
    

    这些水位信息的计算,以及根据水位信息来决定数据是否计算完成了,并不需要应用开发人员关心,而都是系统内建的。

    MillWheel 提供了一组定时器(Timer)的 API。根据日志里的时间戳,你能拿到这条日志对应的时间窗口是哪一个。然后把对应的数据更新,再根据时间窗口,设置到对应的 Timer 上。系统自己会根据水位信息,触发 Timer 执行,Timer 执行的时候,会把对应的统计结果输出出去。

    3、Strong Production 和状态持久化

    MillWheel 也封装掉了整个的数据持久化层,你不需要自己有一个外部数据库的连接,而是直接通过 MillWheel 提供的 API,进行数据的读写。很大程度上依赖了强大的基建,也就是自家的 Bigtable 和 Spanner。

    每一个 Computation + Key 的组合,在接收到一条消息的处理过程是这样的:

    • 利用分段BloomFilter过滤去重
    • 处理用户业务逻辑,所有的更新都是"状态"变更
    • 状态变更都会被一次性提交给后端存储层

    这个被持久化的内容,在 MillWheel 中被称为是检查点(Checkpoint),正是有了这一步,整个 MillWheel 系统才有了容错能力和在线迁移计算节点的能力。而为了性能考虑,在实践上,MillWheel 会把多个记录的操作,放在一个 Checkpoint 里。

    总结:

    • 为了解决数据去重,MillWheel 通过为每一个收发的记录都创建了一个唯一的 ID,然后在每个 Computation 的每一个 Key 上,都通过 Bloomfilter 对处理过的消息进行去重,确保所有的操作都是幂等的。
    • 而为了解决流式计算的容错和扩容问题,MillWheel 会通过 Strong Production 这个方式,对于所有向下游发送的数据先创建 Checkpoint。这个 Checkpoint,其实就是类似于数据库里面的 WAL,通过记录日志的方式,确保即使计算节点挂掉了,也能够在新起来的计算节点上重放WAL,来重新向下游发送数据。
    • 然后,为了解决事件创建时间和处理时间之间的差异,MillWheel 引入了一个独立的Injector 模块。Injector 模块,一方面会收集所有计算节点当前处理数据的进度,另一方面也会反馈给各个节点,当前数据处理的最新“低水位”是什么。这样,对于需要按照时间窗口进行统计分析的数据,我们就可以在所有数据都已经被处理完之后,再输出一个准确的计算结果。
    • 对于流式计算的容错问题,一个很重要的挑战,就是避免僵尸进程仍然在往Bigtable/Spanner 这样的持久化层里面写入数据。这一点,MillWheel 是通过为每个工作进程注册一个 Sequencer,确保所有的 Key 对应的数据只有一个写入者来做到的。这个处理逻辑,也是通过一个租约机制来做到的,和我们之前见过的 GFS/Bigtable 这样的系统非常类似。

    六、2015 The Dataflow Model @Google

    1、Dataflow 的基础模型

    Dataflow 的核心计算模型非常简单,它只有两个概念,一个叫做ParDo,顾名思义,也就是并行处理的意思。另一个叫做 GroupByKey,也就是按照 Key 进行分组数据处理的问题。

    2、时间窗口的分配与合并

    在 MillWheel 的论文里,我们已经看到了一个非常完善的流式数据处理系统了。不过,在这个流式处理系统里,对于“时间”的处理还非常粗糙。MillWheel 的确已经开始区分事件的处理时间(Processing Time)和事件的发生时间(Event Time)了,也引入了时间窗口的概念。但是,对于计算结果何时输出,它仍然采用的是一个简单的定时器(Timer)的方案。而到了 Dataflow 论文里,对这些概念的梳理和抽象就变成了重中之重。

    • 固定窗口
    • 滑动窗口
    • 会话窗口

    所以,不只GroupBykey,实际在统计数据时必须要有时间概念,需要的是GroupByKEyAndWindow。

    而在实际的逻辑实现层面,Dataflow 最重要的两个函数,也就是AssignWindows 函数和 MergeWindows 函数。每一个原始的事件,在我们的业务处理函数之前,其实都是(key, value, event_time)这样一个三元组。而 AssignWindows 要做的,就是把这个三元组,根据我们的处理逻辑,变成(key, value, event_time, window)这样的四元组。

    举例会话窗口


    image.png

    3、触发器和增量数据处理

    在 MillWheel 里,我们向下游输出数据,只能通过定时器(Timer)来触发,本质上也就是通过“时间”这一个维度而已。这个定时器,在 Millwheel 里其实就被改造成了完成度触发器,我们可以根据当前的水位和时间,来判断日志处理的进度进而决定是否触发向下游输出的动作。而在Dataflow 里,除了内置的基于水位信息的完成度触发器,它还能够支持基于处理时间、记录数等多个参数组合触发。而且用户可以实现自定义触发器,完全
    根据自己的需要来实现触发器逻辑。

    PCollection<String> pc = ...; 
    pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(Duration.standardMinutes(1))) .discardingFiredPanes());
    

    我们可以看一下 Apache Beam 项目里的一段示例代码。可以看到,在这段代码里,先是设立了一个 1 分钟的固定窗口。然后在触发器层面,则是设置了在对应的窗口的第一条数据被处理之后,延迟一分钟触发。在 Apache Beam 的文档里,你还能看到更多不同的触发器策略,你也可以根据自己的需要,来撰写专属于你自己的触发器代码.

    • 抛弃策略
    • 累计策略
    • 累计并撤回

    相关文章

      网友评论

          本文标题:流式计算论文阅读

          本文链接:https://www.haomeiwen.com/subject/lirhvrtx.html