《F1 Query: Declarative Querying

作者: xumingmingv | 来源:发表于2018-08-31 23:02 被阅读27次

    导读: Google 的 F1 Query 论文详细阐述了 Google 对于企业数据处理领域三大类需求的解决办法。即使你对这个主题不感兴趣,单纯的对 Presto 感兴趣你应该读一读 F1 Query 的论文,可以解答很多 Presto 设计上的疑问。

    概述

    最近 Google 发表了一篇《F1 Query: Declarative Querying at Scale》的论文来详细阐述了一个叫做 F1 Query 的大数据处理系统的设计。F1 Query 是Google内部进行异构查询的引擎,它支持对各种不同的文件格式、各种不同的存储系统( Bigtable, Spanner, Google Spreadsheets ) 的数据进行联合查询。听起来跟 Presto 很像对吧,这确实也是我看到这篇论文介绍的第一反应,但是随着你看得更深入一点你就会发现这篇论文的着重点完全不在于对多数据源的支持,它甚至完全没有描述是怎么做到支持多种不同异构数据源的。F1 Query 更引以为傲的是:

    We present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis. (F1 Query 能够覆盖企业级大数据处理和分析领域所有数据处理需求。)

    注意这里的“所有”这个词, 太霸气了,初听起来有点反智,因为常识告诉我们没有什么东西是全能的,那么我们就来仔细看看 F1 Query 到底是怎么实现“全能”的。这里说的“全能” 在企业级数据处理领域的主要对应三大类需求:

    • 支持对小规模的 OLTP 式的数据进行高效查询。
    • 支持低延迟地对大批量的(异构)数据进行快速即席查询。
    • 支持对超大规模数据进行可靠的 ETL 处理。

    可以看出 Presto 的能力只涵盖其中的第二项,第一项和第三项都是 Presto 所没有的。

    设计初衷

    F1 Query 之所以被设计出来是因为 Google 内部一些业务需求驱动。

    首先数据一定是碎片化的。即使是单个应用也是如此: 一部分数据可能保存在关系型数据库里面,一部分存在 KV Store 里面,还有些可能以日志的形式保存在文件系统里面,那么一个统一的总体数据视图就十分必要了。

    设计一定要充分考虑现代数据中心的架构。在这一点上,F1 Query 主要想强调的是,它设计的视野不是某一台机器,或者某几台紧密关联的机器,而是跨数据中心的机器集群。传统的设计方法都是把计算跟存储尽量绑定在一起的,这种架构在数据量不大的时候是很好的选择,但是当发展到如今这种超大数据规模的时代,这种架构已经不是最优的了;而且 Google 机房内带宽很高,要访问的数据到底在计算节点本地还是在远端几乎没有太大的区别,而且数据在分布式文件系统上多副本保存反而可以让我们以更大的并行度去访问,得到更好的查询性能。

    这里其实主要就是在说我们也经常说的计算与存储分离啦。

    可伸缩性: 客户的需求各种各样,从只影响一两条数据的 OLTP 类需求,到大规模、超大规模的数据处理,不应该随着数据量、请求延迟性的变化而要用完全不同的数据处理引擎来处理,这里面有很大的迁移成本。

    可扩展性:用户的需求千奇百怪,用户可能需要支持新的存储格式、存储系统、嵌入新的业务的逻辑等等,一个理想的系统要支持这些扩展性。

    其实这些需求都不是 Google 特有的,任何一个大公司甚至任何一个公司在这个大数据的时代都需要这些数据处理的能力,那么我们一起来看看 F1 Query是怎么做到如上的这些特点的。

    整体架构

    整体架构

    整个架构从纵向来看分为三层: F1 Client, F1集群以及各种异构的数据源。而F1集群内部主要的角色主要是5个。

    首先是 F1 Master , 它负责对所有的查询进行监控并且管理所有 F1 Worker 。然后是 F1 Server ,F1 Server 在角色上有点像我们 Data Lake Analytics 的 FrontNode 的角色, 在请求真正执行之前做一些执行计划编译、优化的工作,是整个系统的“前端”,而真正的数据处理是由 F1 Worker 来完成的。

    Catalog Service 扮演的元数据中心的角色,各种异构数据的元信息都保存在这个服务里面,也就形成了一个全局的统一视图 -- 不管你数据是保存在什么介质里面。(我们 Data Lake Analytics 和 AWS的 Athena Glue都有类似的服务)。

    Batch Metadata 保存的是 Batch Execution 模式下任务的一些元信息,比如执行计划之类的。

    UDF Server 是 Google 比较创新的一个概念,它是一个 UDF 的仓库,而且是在执行引擎之外的,执行引擎通过 RPC 与 UDF Server 进行交互。

    由于整体架构上存储和计算的分离,F1 ServerF1 Worker 都是无状态的,当需要水平扩展的时候,只需要向集群里面加入新的机器就好了,数据层面不需要做任何重新分布的工作。

    查询的执行

    因为 F1 Query 强调的是跨机房部署,因此查询的请求跟实际的数据很可能不在一个集群里面,当请求到达一台 F1 Server的时候,它首先对查询进行解析,看看查询里面涉及哪些数据源,如果有任何数据源不在这个数据中心里面, 它会看看哪些 F1 Server 离这些数据更近,然后返回一个 F1 Server 的列表给客户端,客户端接到之后,把这个请求重新发给这些新的 F1 Server 进行查询。F1 Query 强调虽然把计算和存储分离了,并且借助高效的网络设置,已经解决了很多数据本地化的问题,但是数据还是离计算越近,性能越好。

    一个查询过来之后,首先在接到请求的 F1 Server 上进行编译和优化,然后把这个优化好的执行计划推到执行层,而执行的时候根据客户端指定的模式偏好来选择到底用何种模式来执行。

    数据源

    F1 ServerF1 Worker 不止可以访问本数据中心的数据,还可以跨数据中心访问数据。F1 Query 同时也像 Presto 一样,可以支持对各种异构数据源的查询。而且跟 Presto 一样,F1 Query 把所有的数据源都抽象成一个关系型的表(因为最终使用的查询语言是SQL嘛),因此隐藏掉了数据源本身的实现细节。不同的数据源之间可以进行关联的JOIN查询,同时借助前面提到的 Catalog Service 来统一管理这些异构数据源的元数据。整个就是一个企业级的大数据库啊,可以看到整个企业里面的所有数据。

    除了查询 Catalog Service 管理的数据, F1 Query 还能查询不在这个元数据中心里面的数据,通过一个叫做 DEFINE TABLE(而不是 CREATE TABLE )的语句来对这个要查询的数据源进行描述,描述之后就可以进行查询了:

    DEFINE TABLE People(
       format = 'csv',
       path = '/path/to/peoplefile',
       columns = 'name:STRING, DateOfBirth:DATE'
    );
    
    SELECT Name, DateOfBirth FROM People WHERE Name = 'John Doe';
    

    其实本质上就是创建一个临时表,只在当前的 session 有效,为什么不用 CREATE TEMP TABLE 这种更容易理解的语法呢?这是我始终不大明白的地方。

    我们 Data Lake Analytics 也有类似的直接查询裸数据的语法,可以说英雄所见略同啊:

    SELECT count(*) FROM
    TABLE temp_1
    (
      col1 int,
      col2 string
    )
    LOCATION 'oss://test-oss-bucket/tbl1_part/kv1.txt';
    

    如果要支持一种新的数据源的话,在Presto里面,我们是实现一个 Connector , 而在 F1 Query 里面是实现一个 Table-Valued Function(TVF)。

    Data Sink

    数据查询出来之后可以直接返回给客户端显示,也可以根据客户端的语句直接插入到另外一个表,这个表可以是被 Catalog Service 管理的表,也可以不是。如果是被管理的表,那么是通过 CREATE TABLE 语法创建出来的。而这个 Data Sink 的表默认是的实现是保存到 Google 的 Colossus 分布式文件系统上面去了。而用户也可以像 DEFINE TABLE 语法一样,可以用 EXPORT DATA 语法指定输出到自定义的表里面去。

    在这一点上 F1 Query 貌似没有 Presto 来的灵活,Presto 里面的 Data Sink 可以是任何类型的存储,并且不需要什么特殊的 EXPORT DATA 的语法。

    查询语言

    F1 Query的查询语言是 SQL 2011 , 他们在这上面做了一些扩展以进行嵌套结构的数据查询。比较值得一提的是,F1 Query 的SQL方言跟 Big QueryDremel 以及 Spanner SQL 是一样的,这样用户可以在这些系统之间很容易进行迁移 -- 统一是主旋律啊。

    三大执行模式

    前面也提到过,F1 Query 支持三种执行模式,他们的名字分别为 Centralized Execution , Distributed Execution 以及 Batch Execution。其中 Centralized ExecutionDistributed Execution 都属于交互式(Interative)的执行模式。

    交互式执行模式

    所谓的交互式执行模式很容易理解:用户是“在线等”的,因此要求响应时间要短,F1 Query内部对于这种执行模式使用的都是完全基于内存的流式执行策略的。

    Centralized Execution

    对于中心化的执行,接到这个请求的 F1 Server 直接就执行掉了, 因为这种请求处理的数据量不大,对于资源的要求不高,因此 F1 Server 内部其实是以单线程的 pull-based 模式来执行的:

    pull-based模型

    之所以把它叫做 pull-based 模型,是因为当这个计划开始执行的时候,上层的算子递归地调用底层算子的 GetNext() 方法来获取它自己的输入。总体来说数据都是被从下向上“拉”出来的,因此叫 pull-based

    Distributed Execution

    对于 Distributed Execution ,第一个接到这个查询请求的 F1 Server 只是充当一个调度者的角色,真正的执行是由一组 F1 Worker 共同执行。

    这种模式的架构就跟 Presto 很像了,这两个角色在 Presto 里面分别叫做 CoordinatorWorker

    那么什么时候用 Centralized 模式,什么时候用 Distributed 模式呢? 优化器对 SQL 进行解析,如果发现这个查询最好要用大并发进行分区读的话,那么它会走 Distributed 的模式,否则走的就是 Centralized 模式。

    在分布式的执行计划里面,整个执行计划会被分拆成一些执行计划片段( Fragments ), 每个片段由一组 F1 Worker 来执行,这些片段是同时并发执行的,并且内部可能会应用流水线技术。

    优化器是怎么把整个执行计划拆分成多个 Fragments 的呢? 优化器使用的是自底向上的策略来拆分的,每个单独的算子对于输入数据的分布(Data Distribution)都会可以有一定的要求的。一般来说这种要求是指数据是否按照某个字段进行分片。典型的例子是 Hash Join , Hash Join 需要数据按照 Group Key 或者 Join Key 进行 hash 分片 -- 这就是 HashJoin 算子的数据分布需求。如果当前的数据分布策略能够满足这个算子的要求,那么这个算子保留在当前的 Fragment 里面,否则我们就要在执行计划当中插入一个 Exchange 节点来进行数据的重新分布,同时也划分了Fragment 之间的边界。

    分布式模式下的执行计划分片

    划分了 Fragment 边界之后下面一件事件就是决定这些 Fragment 的并行度, 并行度的计算也是自底向上的过程,首先最底层的 TableScan 决定了最初的并行度,然后这种并行度的信息会被一层一层地上推给一个叫做 Width Calculator 的模块来逐步计算每个 Fragment 的并行度。比如一个 HashJoin 在一个 50 并行度和一个100 并行度的两个输入 Fragment 之间进行的话,那么这个 HashJoin 算子会选用 100 并行度以照顾比较大的那个输入算子。

    感觉这就是在描述Presto的实现啊。在读这篇论文之前我一直搞不清楚的就是这个神奇的 Exchange 算子是怎么来的,看了这篇论文总算搞清楚了。

    数据重分布(Reparitition)策略

    F1 Query 里面的 Fragment 是并行执行的,整个执行的数据流可以看作一个DAG,数据在流经 Fragment 边界的时候会被一个 Exchange 算子进行重新分布(repartition), 对于每条数据, 数据的发送者利用一个分区函数来计算它的目的地(一个分区值: partition number ),而每个 partition number 对应到目标 Fragment 里面的一个具体的 Worker

    而这个 Exchange 的算子是通过 RPC 来实现的(Presto里面也是这样的), 而且数据的发送和接收之间还有流控的机制,这种基于 RPC 的通信机制的并发性还是挺好的,可以做到每个 Fragment 几千个分区,如果要求更高的并发度,那么就要使用 Batch Execution 模式来执行了。

    为了达到高效的查询,查询优化器会要求最底层的 TableScan 算子把数据切分成指定的并发度,而具体的 TableScan 算子就会产生 N 个分片描述,然后集群的调度器就会起N个 Worker ,来执行这N个分片的数据扫描操作。有时候数据的分片的个数会比 Worker 的数量要大,这样调度器会动态的把数据分片交给比较空闲的 Worker 去做,这样可以避免数据倾斜。

    有些算子本身会作为当前 Fragment 的一个输入,比如 LookupJoin 会作为所在 Fragment 的左边输入,因为 LookupJoin 的两个输入的数据分布规则是一样的(左边输入的数据是根据右边输入数据查询出来的)。相对应的 HashJoin 则需要多个属于不同的Fragment,并且都有自己的多个分区。

    一般来说优化器会把 HashJoin 每个输入放在一个单独的 Fragment 里面,除非它本身的数据分布跟 HashJoin 算子已经一样了。HashJoin 的两个输入根据相同的分片函数把数据发送到 HashJoin 所在的 Fragment 里面, 这样才能保证相同的 Key 的数据最后是在同一个分区里面,从而可以让每个分区可以处理一段独立的 Key 的取值空间。(否则数据就 Join 不上啊)

    跟 HashJoin 类似, 聚合操作通常也需要对输入进行重新分布, 只不过聚合操作是根据要聚合的 Key 进行数据重分布。而如果聚合函数不是针对特定的 Key 进行聚合(比如 count(*) ), 那么所有的数据会被发送一个分区。这种情况是可以优化的,通常会在目标 Aggregation 算子之前生成另外一个 PartialAggregation 算子,这样做的好处一是提高了总体的并行度,因为多个Worker参与了聚合操作;另外因为做了部分聚合之后,要往下游发的数据变少了,Worker 间传送的总数据也就少了。

    前面也说过 F1 Query 的执行是一个可能会有多个根节点的DAG, 一个上游节点的数据可能会流向多个下游的 Fragment , 比如对同一份输入进行多种聚合,F1 Query在实现这种执行计划的时候上游的 Fragment 只会执行一次,只是把数据发往多个下游而已。这种方式对于下游数据消费速度非常敏感,因为多个不同分支可能以不同的速度消费数据,任何一个有问题就可能造成上游 Fragment 数据的堆积。F1 Query 规避这个问题的方法是把数据在内存里面进行缓冲,让下游 Fragment 慢慢消费;如果所有的下游都 Block 住的话,那么它会把数据吐到文件系统上面去避免上游 Fragment 内存爆掉。

    这貌似在描述我们要做的多路输出的技术方案啊。

    性能考虑

    F1 Query 里面性能问题的主要诱因是数据倾斜以及不理想的数据访问模式。比如 HashJoin 就对热点数据比较敏感,因为比较热门的 Key 的数据被读入到 HashTable 里面,数据太多的时候可能会被吐到磁盘上面去,导致性能的下降。

    如果 HashJoin 的一个输入很小的话,那么F1 Query支持把这个输入完全读入内存,并且把这个输入发送到所有的 HashJoin Worker 的内存里面,Broadcast HashJoin 对于数据倾斜天生免疫,因为数据是可以随机发的,但是对于 Build Input 的大小比较敏感。

    对于 LookupJoin ,比较初级的做法是来一条数据我们查询一下 BuildInput ,这样显而易见性能会很差,时间可能都花在查询 BuildInput 上面了。F1 Query 当然不会这么做,F1 Query会做批量、异步处理,它会 batch 一堆数据,一次性的发给 BuildInput 去一次性查询,因为是批量查询,中间如果有重复的key也可以自动去重,节省总体的执行时间。而查询 BuildInput 的时候它会继续消费上游过来的数据,而不会堵住,保证整个过程的流水线式的执行。

    在 LookupJoin 中如果我们不做任何优化直接对 Join 的左边输入进行查询的话可能也会产生性能问题,因为同一个 Key 可能被分配到不同的 Worker 去做,从而使得单个 Worker 里面的去重效果大大降低,如果一个查询里面有多个这种 LookupJoin 累加在一起的话就可能导致数据倾斜。

    对于这种问题 F1 Query 的优化器会对 LookupJoin 的左边输入的数据进行重新分布,比如进行 Hash 分布,这样相同的 Key 被分配到同一个worker,去重效果就能提高。但是因为进行了 Hash 分布,同一个 Worker 里面对 Lookup 数据源访问会呈现出类似随机访问的特性,使得 Lookup 数据源的查询完全没有本地性可言,效率会比较低。

    一个解法是静态的给不同的 Worker 分配不同的 Key 的分区,这样因为 Key 是连续的,因此可以得到比较好的数据本地性,但是如果某些 Key 过热的话,又会出现数据倾斜。

    F1 Query 发明了一种叫做 动态KeyRange 的数据分布算法,上游的数据发送者根据它看到的数据的分布动态地对数据的KeyRange进行分配,这个做法的依据是它本地看到的数据分布情况应该跟总体数据的分布情况类似,因此可以得到比较好的数据分布效果,避免数据倾斜。F1 Query没有透露关于这个算法更详细的信息。

    在交互式的执行模式下, 查询基本都是在内存里面执行的,因为没有中间落盘的过程,纯内存的计算速度非常的快,这样才可以保证“交互性”。再加上数据源端比较激进的缓存策略,对于分布式的复杂查询,影响时间可以做到几十到几百毫秒;同时也因为中间数据不落盘,纯内存的计算对于局部的 Worker 的失败比较敏感,只能依赖客户端的自动重试。在实际过程中,执行时间在一个小时内的查询还是比较可靠的,超过一个小时的查询往往会不停的失败,这种情况下使用 Batch Execution 更好。

    Batch Execution

    在Google内部,ETL的 Pipeline 基本都是用 FlumeJava 或者 MapReduce 写出来, 而不是SQL,MapReduce (以及 FlumeJava ) 代码主要的问题主要在于开发和维护的成本太高,而且SQL优化器层面可以做很多优化的事情比如属性裁剪、条件下推等等,手写的 MapReduce 都是享受不到的。

    这一点貌似我们阿里巴巴倒是更现代化,阿里巴巴内部绝大多数的这种ETL工作都是用SQL来写,通过UDF来支持特定的业务逻辑,实在复杂的才用 MapReduce 任务来做。

    Batch Execution 模式下接受的查询语言还是SQL,但是它后台会把SQL任务翻译成 MapReduce 的任务来执行。我们知道跟流式执行不一样,MapReduce 的不同 Stage 不是同时执行的,后一个Stage 必须等前一个 Stage 完全成功之后才能开始,因此中间结果全部落盘(Colossus分布式文件系统),这使得 MapReduce 的不同的 Stage 可以异步交互,而不需要同时在线。同时这种机制又提供了一定的容错性,如果一个 Stage 出错了,我们不需要重跑整个任务,因为 Stage 的输入保存在文件系统上,我们重跑这个失败的 Stage 就好了。

    Batch Execution 不止可以自动处理服务端的异常,它还能自动规避客户端的异常,客户端可以提交异步的查询,然后断开连接,而在服务器端查询会继续执行不会终止。而交互式查询都是同步执行的,客户端一旦断开整个查询也就失败了。

    虽然 Interative ExecutionBatch Execution 执行方式迥异,但是他们的查询解析、查询优化等等前端模块是完全一样的,区别只在于最后的执行阶段,如下图所示·:

    三种模式共享同一个前端

    个人理解这样的好处很多,一是省时省力,一份代码可以供底层不同的核心引擎共享,上层有改进所有的引擎都受益;另外对用户来说也有好处,因为整个系统的前端完全一样,那么对用户来说体验也是完全一样的,不管你使用哪种模式。只是要能做到这一点应该也不是很容易的事情,要求设计者对于几种执行模式特点的深刻理解以及高超的抽象、设计能力。

    可扩展性

    F1 Query的可扩展性主要表现在两个方面

    • 支持自定义的数据源
    • 支持新的UDF, UDAF, TVF

    UDF,UDAF 本身并不是什么新概念,很多系统都有。但是F1 Query还是有一些创新在上面。一是跟其它系统 UDF 必须用高级编程语言来编写相比,F1 Query可以用高级编程语言Lua来编写,也可以用 SQL 直接描述,简单的 UDF 用 SQL来描述无疑会更便捷。

    另外 F1 Query 还支持一种叫做 UDF Server 的服务,UDF Server 是一个远程的 RPC 服务,它里面承载着 UDF,而这些 UDF 可以用任何语言比如 C++, Java, Go 来写。UDF Server 的概念第一次听说是在 Apache Beam 里面,Apache Beam 也是出自 Google 之手,可见 UDF Server 在 Google 内部已经是个很成熟的概念了。这个概念还是很创新的,以前总感觉UDF这种东西性能一定要高,不能有远程调用,否则性能会很差,没想到 Google 干脆把真个 UDF 的实现都放到远端了。而性能问题则通过之前解决 HashJoin 里面 解决 BuildInput 性能类似的手段,通过 批量化 + 异步化 + 流水线化,使得远端 UDF Server 的延迟完全被掩盖掉了。跟UDF类似,UDAF也是采用类似的策略,只不过调用UDAF 远程服务的时候除了要传递当前要聚合的输入数据,还要传当前已经聚合的结果,远程的UDAF服务则会返回新的聚合结果。因为UDF Server都是无状态的,使得F1 Query可以很好地把整体的流量分布到整个 UDF Server 集群里面,提高整体的性能。

    UDF Server 这样设计的好处我理解有两个;

    • 把 UDF 的概念从具体的执行引擎里面拿出来了,使得各种不同的数据执行引擎可以共用同一个 UDF Server,而不需要重复开发。用户在编写 UDF 的时候也只需要编写一份,因为业务处理的逻辑都是一样的,没必要为了每种引擎单独适配。
    • 因为引擎与 UDF Server 通过 RPC 进行交互,这就不限定 UDF 到底用什么编程语言进行编写了,给了 UDF 编写者更大的自由度。

    再说说 Table Valued Function , 这种函数比较有意思,它的输入是一张表(当然还可以有其它普通的参数),输出是另外一张表,这种给了用户更大的自由度,对于一些新兴的场景比如机器学习就特别适合: 机器学习在模型训练的时候就是把一张表作为输入,然后输出一张新的表。比如下面的例子:

    SELECT * FROM EventsFromPastDays( 3, TABLE Clicks);
    

    通过从 Clicks 表查数据,通过一个TVF( EventsFromPastDays ) 产生新的一张临时表( EventsFromPastDays( 3, TABLE Clicks) ),最后再用 SELECT 查询出来进行展现。

    F1 Query 也支持通过SQL定义 TVF:

    CREATE TABLE FUNCTION EventsFromPastDays(
       num_days INT64,
       events ANY TABLE
    ) AS
    SELECT * FROM
    events
    WHERE date >= DATE_SUB( CURRENT_DATE(), INTERVAL num_days DAY);
    

    总结

    F1 Query 比较创新的点在于通过一个引擎 + 内嵌的三种不同执行模式解决了企业数据领域的所有需求。通过支持对异构数据源的联合查询把企业内部的所有散落的数据组成一个整体,一个大的数据库,这个在数据时代对所有的公司都有绝对的价值。另外F1 Query还提出一些比较有创新的点子比如 UDF Server, 也提出了一些性能优化的手段,个人感触最深的是多次出现的 批量化 + 异步化 + 流水线 的优化策略, 另外动态 KeyRange 的点子也是蛮有意思的。

    Presto 社区应该好好感谢下这篇论文,个人感觉 Presto 的代码非常的晦涩难懂,因为使用了 Guice 这种依赖注入的框架,Presto 代码的作者在很多类的设计上开始有点肆无忌惮,一个构造函数10几个参数是常有的事情,这样的代码写的人是挺爽的,读代码的人就难受了;很多核心的接口、类都没有注释,更使大家要深入理解 Presto 的原理变得困难。不过现在好了,F1 Query 这篇论文里面描述 Distribution Execution 的部分几乎就是在描述 Presto 的实现,想了解 Presto 大体原理的同学可以从这篇论文开始。

    相关文章

      网友评论

      • haitaoyao:UDF server 效率应该比直接在 worker node 上执行低, 并且要单独的集群进行扩展需要集群量足够大才值得, 隔离性也要做的足够好才行. 话说 aliyun 的服务可以考虑做一个 UDF server 哈哈

      本文标题:《F1 Query: Declarative Querying

      本文链接:https://www.haomeiwen.com/subject/rpaewftx.html