论文概要
"Realtime Data Processing at Facebook"是Facebook 在2016年发表的论文。论文中介绍了Facebook是如何构建分布式实时数据处理系统的。论文的亮点在于:着重介绍了Facebook在设计实时数据处理系统过程中,针对系统部分关键点的做了哪些设计、以及为什么这样设计,同时业界是怎么做的,这些设计点也是当前实时计算领域关键设计点。
Facebook实时处理场景
-
实时数据聚合报表。
-
移动应用内分析。
-
Facebook页面数据统计。
Facebook实时系统现状
-
秒级延迟,而非毫米级(秒级延迟几乎能够支撑内部所有业务,毫秒级没有场景);
-
借助可持久化消息总线(persistent message bus)来进行数据传输,这种传输机制为实现容错、扩展性和多种正确语义打下了基础。
-
每秒数百GB数据处理吞吐。
Facebook的消息总线就是一套分布式消息队列,基本和kafka类似。
系统架构
Facebook内部的实时数据处理系统实际是由多个系统组成
-
左侧的mobile或web数据被送入到Scribe消息总线。
-
实时数据处理系统Puma、Stylus和Swift从Scribe中读取数据进行处理,然后将处理之后的数据在写入到Scribe中。Puma、Stylus和Swift可以分别单独使用,可以通过Scribe来来构成一个复杂的DAG。
-
右侧Laser、Suba和Hive是用于提供不同类型查询的存储系统,他们的数据也是从Scribe中摄取。Laser还可以将数据提供给流处理系统和线上产品。
分布式数据通信Scribe
Scribe是一个秒级延迟、高吞吐、可持久化的分布式消息系统。Scribe中数据组织方式和Kafka类似,数据通过category(对应kafka中的topic)组织,category内部bucket(对应kafka中的partition)是数据处理单位。Scribe将数据持久化到HDFS中,具备回读、重复读能力。
Scribe在Facebook也称为分布式数据传输系统,意思就是专门用于系统内或系统间的数据传输解决方案。
流处理引擎Puma
Puma是一个流处理系统,应用程序使用类SQL语言编写,UDF使用Java编写。它的优点就是能在很段的时间(1h内)完成application的编写、测试和部署。
使用Puma编写的应用主要有两个应用目的:
- Stateful计算与服务:为简单的聚合查询提供预计算,查询延迟就等于聚合窗口的大小,查询结果是通过Thrift API来查询Puma,Puma的数据结果是存储在HBase上的。(有点类似Queryable state,但是Queryable state不绑定窗口,可查询窗口内数据、并且数据可以存储在本地)
计算top K 事件,5min时间窗口
- Stateless计算:Puma提供了Scribe流的过滤和处理能力。
Puma不具备ad-hoc查询能力,因为在编译阶段对查询进行了优化,所以Puma应用都是部署几个月甚至几年的应用。
流处理引擎Swift
Swift是一个具备Checkpoint功能的基础流处理引擎,它的使用非常简单灵活,可以指定从Scribe中读取固定字符串或者字节来作为一个Checkpoint。如果Swift application 挂掉后,可以从最后一次Checkpoint读取数据。所以能够保证数据至少被处理一次(at least once)。
Swift一般使用Python脚本语言来编写流应用处理程序。
流处理引擎Stylus
Stylus是一个使用C++编写的low level的流处理框架,Processor是Stylus的基础组件。Processor可以是Stateless或Stateful,Processor可以组成一个复杂的DAG。
Processor API基本和我们现在各个流处理引擎一致,Stylus也支持event time和wartmark。
高性能存储服务Laser
Laser是一个在RocksDB之上构建的,高查询吞吐量、低延迟的ky存储服务。Laser的数据来源于Scribe或者Hive(每天读取一次)。Laser能够被Facebook线上产品以及Puma和Stylus所访问。
Laser主要有两个用途:
- 能够将Puma和Stylus处理结果(Scribe)应用到Facebook的线上产品。
- 将Hive中复杂查询结果和Scribe数据存储起来,供Puma或Stylus使用。(有点缓存中间结果的意思)
OLAP Store Scuba
Scuba是一个快速切片的分析数据存储,Scuba具有每秒摄入数百万行数据并插入到数千个表中的能力。Scuba中的数据是由线上产品输出到Scribe,然后在摄入到Scuba中,这个过程大概有1min延迟。Scuba也支持Puma、Stylus和Swify中的输出数据。
Scuba具备ad-hoc的查询能力,查询延迟一般在1s以内。同时Scuba通过UI可以展示查询结果(支持各种图标)。
数据仓库Hive
Hive在Facebook中是用于存储EB级别的数据仓库,每天会接收几PB的数据写入。Hive中的数据,Facebook使用Presto查询,Presto提供了完成ANSI SQL查询语义,查询结果可以存储到Laser中,被线上产品或者其它流处理引擎所使用。
设计决策
Paper中提到了Facebook在做流处理系统过程中,在一些关键点上做了一些设计决策,这些设计决策对比了业界已有的方案,并且给出了这些决策对Streaming系统的影响。
Paper中首先说明了流处理系统的存在五方面的重要设计:易用性(Easy of use)、性能(Performance)、容错(Fault-tolerance)、扩展性(Scalability)和正确性(Correctness)。
易用性:处理是否复杂?SQL是否够用?是否还需要通用语言(比如java或C++)?用于编写、测试和部署的速度有多快。
性能:多少延迟是ok的,毫秒级、秒级还是分钟级?需要多高的吞吐?
容错:可以容忍什么类型的失败?数据处理或者输出的次数保证了什么语义?系统如何存储和恢复内存中的状态数据。
扩展性:数据能不能分片和重分片来并行处理?系统能否根据流量扩缩容?
正确性:是否需要ACID?所有输入数据是否都需要输出?
这易用性、性能、容错、扩展性和正确性基本是现在流系统的对比标准。
Paper分别从语言范式、数据传输、处理语义、状态存储机制和再处理五方面来说明Facebook在流系统设计之初的设计决策。
image.png-
语言范式会对系统易用性和性能有所影响。
-
数据传输会对易用性(比如debug)、性能、容错和扩展性有影响。
-
处理语义会对容错和正确性有影响。
-
状态存储机制会对易用性、性能、容错、扩展性和正确性五方面都有所影响。
-
再处理对系统易用性、扩展性和正确性有所影响。
Paper对业界已有流处理引擎和Facebook流处理引擎在上面设计决策进行了对比。
image.png语言范式设计决策
语言范式是指用户编写流处理应用时所使用的语言。对于语言范式的选择决策,决定了应用应用程序编写的难易以及编写者对引擎性能的控制粒度。
在流处理场景,可选择的通用语言范式有三种:
-
声明式语言(declarative),主要以SQL为代表。它的优势在于简单可快速上手,但是表达能力有限,许多系统需要额外增加一些UDF。
-
函数式(functional),函数式编程模型将应用程序表示为一系列预定义的operator。它方便编写,并且有更多操作(operator)可用,而且能够控制这些operator的顺序。
-
程序语言(procedural),像c++、java、python都是通用的程序语言,它们具备非常高的灵活性和性能。但是它们往往需要更长时间的编写和测试。比如Storm、Heron、Samza等。
在Facebook内部,由因为没有单一的语言范式能够满足所有场景,所以需要多种语言范式的流处理引擎(针对不同易用性和性能),这也是为什么他们内部有三套流处理引擎。Puma使用Sql、Swift使用Python、Stylus使用C++来编写应用程序。对于函数式编程,FB内部还没有支持,他们在调研Spark Streaming这些流处理引擎。
目前主流流计算引擎已经能一套runtime支持多种level的api,比如Flink的SQL/Table API 、DataStream/DataSet API和ProcessFunction。
数据传输设计决策
流处理应用通常是由多个节点组成的一个复杂的DAG,所以需要在节点间进行数据传输。数据传输对于系统的容错、性能和扩展性都是非常重要功能。由于应用程序调试,所以对易用性也有影响。
通用的传输机制主要有以下三类:
-
直接消息传输,通常使用RPC或者内存消息队列来实现进程间数据传输,比如MillWheel、Spark、Flink等都采用RPC进行通信,而Storm则使用ZeroMQ进行通信。直接消息传输的优点在于性能非常好,基本能保证端到端在10ms以内。
-
基于代理的消息传输,通过单独的代理节点来链接流处理节点,并进行消息的转发。该模式会增加系统开销,但系统会有很好的扩展性。代理模式的消息传输,还可以将输入流复用到多个输出的Processor中,同时也具备背压的能力。比如Heron所使用的stream manager就是该模式。
-
基于持久存储的消息传输,streaming的processor链接持久化的消息总线(消息队列),读写都直接操作该消息队列。该模式是可靠性最强的传输方式,不仅具备多路复用的能力,数据写入和读取还能以不同速率处理。该模式也具备完整的单点故障恢复能力。比如Samza就是使用kafka来做数据传输。
我们上面说了Facebook使用Scribe来做数据传输,该模式大概会有1s左右的数据延迟(并且会落盘,所以也受限于磁盘和网络io)。之所以Facebook会选用这种模式,主要考虑在当时Facebook内部大部分场景都能够容忍这个延迟,并且该模式为容错、易用性、扩展性和性能带来很大便利。
处理语义
处理语义的选择会影响流系统的容错和正确性能。
paper首先将流处理系统所做的事情总结为三部分。
-
处理输入数据,比如反序列化、查询外部系统、更新内存状态等。
-
生成输出,基于输入数据和内存状态,为下游系统生成输出数据。
-
保存Checkpoint到数据库,用于故障恢复。需要保存的内容有三部分:
-
内存中的状态。
-
输入流的offset。
-
输出数据。
上面这三部分可以总结为两种类型的处理语义:
- 状态语义,每个输入事件至少被处理一次(at-least-once)、最多处理一次(at-most-once)和只处理一次(exactly-once)。
- 输出语义,给定的输出值至少出现一次、最多出现一次和只出现一次。
状态语义就是我们平时所讨论的流处理引擎的数据处理语义;输出语义就是我们所说的端到端的数据一致性语义。
对于Stateless处理节点只有输出语义,对于Stateful处理节点两种处理语义都存在。
状态语义
状态语义只取决于存储offset和存储内存状态的顺序。
-
at-least-once:先存储内存中的状态,然后在存储offset。
-
at-most-once:先存储offset,然后存储内存中状态。
-
exactly-once:内存状态和offset存储是原子语义的,比如通过事务。
下图是当发生fo后,不同状态语义所带来的结果。
image.png输出语义
输出语义,除了依赖内存中状态和offset,还取决输出值的保存。
-
at-least-once:将结果发射到输出流,然后保存内存状态和offset。(性能高,无需等待检查点保存)
-
at-most-once:为offset和内存状态保存检查点,然后发射输出结果。(先保存Checkpoint,然后发射结果,需要缓存处理数据)
-
exactly-once:为offset和内存状态保存检查点,并且发射输出结果是一个事务中的原子操作。(processor依赖事务,有性能损耗)
状态语义与输出语义的对应关系:
image.png在Facebook内部,虽然对于各种语义需求都有,但是从论文看是非常重视at-least-once处理的。因为at-least-once一方面能提供极致的性能体验,另一面exactly-once需要借助事务存储系统。
状态存储机制
状态存储的目的是当发生fo后,来恢复状态。对于状态的存储和恢复,有多种方式。
-
状态节点多副本,通过启动多个状态节点,来达到状态节点多副本的能力。该方式会带来额外的硬件开销。
-
本地状态存储,类似Samza将状态存储到本地db,并同时将数据写到kafka。
-
远端数据库持久化,将Checkpoint数据存储到远端数据库中,比如HBase。
-
上游备份,事件缓存在上游节点,当当前节点fo后,上游重放。
-
全局一致性快照,Flink所使用的分布式快照算法,当一个节点fo后,需要将多个节点恢复到一致(联动fo)。
在Facebook 内部对流处理系统的容错有着不同的需求,Puma提供了有状态聚合的容错,Stylus提供了local database模式和remote database模式。
image.pnglocal database 首先定时将内存数据写到local database,后端进程在使用一个更大的时间间隔来将local database数据异步同步到HDFS。当处理进程fo时优先使用本地数据恢复,如果恢复后的进程不在之前的机器,则从HDFS并行拉取数据。
image.pngremote database( ZippyDB) 不会将状态存储在本地,使用remote database一个主要优势就是快速FO,因为不需要等从远端下载完整的状态数据。
回填处理(Backfill processing)
回填处理也就是需要重新处理旧数据。
-
仅流处理,需要数据传输机制保存足够长的数据,以便当需要回放时进行重新处理。
-
双系统(lambda架构),流批双系统进行处理。
-
批处理环境可运行流应用的系统,比如Spark Streaming、Flink等。
Facebook内部使用MapReduce读取Hive数据方式来进行旧数据处理。
一些经验教训
多系统能够快速实现和迭代(分别针对易用性、容错、扩展等)、流系统的易用性非常重要(编写、调试、部署、监控)。
总结
-
设计目标是秒级延迟而不是毫秒级延迟,是一个重要的设计决策。在Facebook还有很多系统能够提供毫秒级或微妙级延迟。秒级延迟允许使用消息队列来做数据传输,这种传输机制方便系统实现容错、扩展和多种语义的正确性。
-
易用性和其它特性一样重要。在Facebook内部的黑客文化中,快速实现是非常重要的,产品需要具备正确的学习曲线。简单的调试、部署和监控,能够增加系统的使用率。
-
提供一个正确性的选择范围。并不是所有的case都需要ACID语义的,用户根据自己需要选择正确性语义。
网友评论