美文网首页
CockRoachDB-DistSQL设计文档

CockRoachDB-DistSQL设计文档

作者: 吕信 | 来源:发表于2019-04-16 11:51 被阅读0次

    写在前面

    本文是对CockRoachDB的设计文档:https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20160421_distributed_sql.md
    的翻译。

    内容目录

    概述

    在该设计文档中,我们提出了一种分布式执行SQL处理的方法,本文的目的是为了发起一次分布式SQL执行的讨论,而不是一个完整的详细设计文档。

    词汇

    • KV - CockRoachDB的KV系统,由CockRoachDB的key-value、range和batch API构成
    • k/v - 指一个key-value对,通常被用来引用KV系统中的一个entry。
    • Node - 集群中的一台服务器。
    • Client / Client-side - SQL客户端。
    • Gateway node / Gateway-side - 集群中首先接收到客户端SQL查询的节点。
    • Leader node / Leader-side - 集群中直接进行KV操作并且针对本地KV数据进行本地访问的节点。
      本文的大部分内容都是从gateway-side的视角进行描述的,gateway-side上负责进行查询的解析和并行执行。

    动机

    通过distsql我们期望带来如下提升和改进:

      1. remote-side filter(源端过滤)

    当通过过滤条件查询一些数据的时候,我们目前都是先将特定range中的所有key对应的数据通过网络全部都拿到gateway node上来,然后在gateway node上再对这些数据进行过滤。可以看出这种方式太低效了,占用了大量的网络资源,因此我们想要将这些过滤条件下推到源端(leaseholder),从而极大的节省网络资源,并提升数据处理效率。

    在源端不需要支持所有的sql表达式-源端只需要支持常用表达式的一个子集(例如:所有的操作都可以转换成基于string字符串的操作),只要源端支持的这些子集能够完成数据读取过程中最终的过滤条件即可。

      1. 源端更新和删除

    对于例如: UPDATE .. WHEREDELETE .. WHERE 这样的语句,我们目前的实现方式是:先根据where条件查询到相应的数据并返回到gateway上,然后在gateway上执行update或者delete操作。这种实现方式涉及到了太多次的网络传输,性能低下;因此我们希望直接在可以访问到数据的node上直接执行update或者delete操作,这样就可以极大地节省网络传输,进而提升性能。

    这里,也不需要实现所有的SQL语法,但是除了实现一些简单的过滤表达式之外,我们还需要实现一些其他的语法(例如:UPDATE 中经常使用的用于计算新值的方法和函数等)

      1. 分布式SQL操作

    现在的sql操作都是在一个节点上进行处理的,所以sql操作的性能并不能通过集群规模的增大而提升。我们想要在多个node上对sql操作进行分布式的处理 (sql操作的性能就可以与集群规模正相关)。

    3.1. Distributed joins

    Join:多个table基于关联列进行查询,通常关联列的关联条件是“相等”,然后返回相关结果。
    一种分布式出列这种计算的策略就是基于hash的分布式:
    选择K个node(这K个node中的每个node,我们都称之为:k-node),集群中可以访问到数据的各个node,将访问到的数据根据关联列中的值进行hash,
    然后根据hash值对K取模后的值,确定将每行数据发送到哪个k-node上(这样就可以保证关联列的值相同的行,肯定可以发送到同一个k-node上)。
    Hash-join在F1中被广泛采用。
    
    Distributed joins 和 remote-side filtering 可以一起使用:
    例如下面的SQL语句将会查询出所有在用户生日附近产生的订单。
    
    注意:本查询语句需要对最后的结果进行过滤。因为若只采用简单的相等过滤条件,那么该过滤条件将会下推到源端进行过滤。
    
    SELECT * FROM Customers c INNER JOIN Orders o ON c.ID = i.CustomerID
      WHERE DayOfYear(c.birthday) - DayOfYear(o.date) < 7
    

    3.2. Distributed aggregation

    当使用`GROUP BY`时,我们根据一组列或者表达式来分组计算出每组的结果。可以采用与hash-join类似的思想进行aggregation的分布式处理,只不过计算hash值的因子变为了分组元素
    
    1. Distributed sorting
    当进行排序时,我们希望能够进行分布式排序,从而分散排序的压力。各个node可以对自己的数据集先进行排序,然后有一个或多个节点将各个部分排序的结果进行合并。
    
    

    设计细节

    Overview

    实现方法最初的灵感来自Sawzall-- Rob Pike等人在google的一个项目。Sawzall提出了一个“框架”(高级语言解释器)来减少MapReduce的使用。它的主要创新是通过一个简洁的语法来定义“本地”进程,来处理一部分本地数据并产生零个或多个结果(这些结果被转换为Map逻辑);然后采用另一种语法从本地转换中获取结果并以不同的方式聚合它们(这被转换为Reduce逻辑)。简而言之:MapReduce + high-level syntax + new terminology(以更简单的方式实现分布式计算)。

    我们提出了一些与Map-Reduce类似,但与Map-Reduce的执行模型又完全不同的概念。

    1. 一组预定义的聚合器,通过这些聚合器提供SQL所需的功能。大多数聚合器都是可配置的,但不是完全可编程的。
    2. 一个特殊的聚合器,即“evaluator”,可以使用非常简单的语言进行编程,但仅限于一次操作一行数据。
    3. 将聚合器的结果路由到查询管道中的下一个聚合器。
    4. 一个允许以与数据位置无关的方式编译SQL的逻辑模型,但它需要获得足够多的信息以便我们可以进行分布式计算。

    除了累积或聚合数据之外,聚合器还可以将其结果提供给另一个node或node集合,也有可能作为其他程序的输入。具有批量处理结果数据和执行KV命令等特殊功能的聚合器可用于读取数据或对数据库进行更新。
    关键思想就是:是我们可以将SQL映射成逻辑模型,然后我们可以将该逻辑模型转换为分布式执行计划。

    Logical model 和 logical plans

    我们将SQL编译成 logical plan (从表面来看,类似于当前的planNode树),logical plan代表着各个计算阶段的抽象数据处理流。 logical plan与数据在集群中分区和分布无关;但是,它包含足够多的有关计划计算结构的信息,后续我们可以利用这些信息进行数据并行处理 - 在后续阶段,logical plan将转换为physical plan:将抽象计算和数据流映射成具体的数据处理器和它们之间的数据传输通道。

    logical plan由聚合器(aggregators)组成。每个聚合器消费行的input stream(或者更多用于join的流)并产生行的output stream。每行都是含有多个列值的元组;输入和输出流都有一个对应的schema信息。schema是一组column和type,每行都具有各个列的基准。在这里,我们强调stream是逻辑概念,可能不会映射到实际计算中的单个数据流。

    我们引入了grouping的概念来表征聚合器内部发生的分组计算。这些groups是基于group key定义的,group key是输入stream schema中列的子集。对每个group进行的计算独立于其他group中的数据,并且聚合器会将所有组结果串联后进行输出。各个不同的group结果之间顺序不是固定的 - 一些聚合器可能保证某种顺序,而另一些则可能不保证。

    更确切地说,我们可以使用函数agg在聚合器中定义计算,该函数采用单个group(相同的group key)中的一系列输入行并生成一组输出行。聚合器的输出是所有组的“agg”输出的串联,并按某种顺序排列。

    当我们稍后决定如何对聚合器中的计算进行分布式处理时,你会发现grouping特性有用:由于每个group的结果是独立的,因此可以在不同节点上处理不同的组。我们拥有的group越多越好。当使用单group聚合器(group key是空组列 - “group key:[]”,表示所有数据都在同一组中)时,不能进行分布式处理。另一方面,不存在可以任意并行化的group聚合器。请注意, no-grouping aggregators与group key为所有列的aggregators是不同的。no-grouping的聚合器是一个特殊但重要的情况,在这种情况下,我们不会聚合多片数据,但我们可能会对各片数据进行过滤,转换或重新排序。

    聚合器可以使用SQL表达式,输入多个input,并计算出相应的值。例如,所有聚合器都可以选择使用output filter表达式 - 一个布尔函数,用于丢弃原本属于输出流的元素。

    一种特殊类型的聚合器是evaluator聚合器,它是一个“可编程”的聚合器,它按顺序处理输入流(一次一个元素),可能会产生输出元素。这是一个没有分组的聚合器(group key是完整的列集);每行都是独立处理。例如,一个evaluator可以使用求值程序从任意表达式生成新值(如SELECT a + b FROM ..中的a + b);或根据谓词过滤行。

    特殊table reader聚合器,没有输入,这种聚合器可以作为数据源;table reader可以配置为根据需要仅输出某些列。还有一种特殊的** final **聚合器,这种聚合器没有任何输出,通常被用来表示查询/语句的结果。

    某些聚合器(final,limit)在要求输入流有序(要求1列或者多列升序/降序)。一些聚合器(如table reader)可以保证其输出流上的某种排序,称为ordering guarantee(与当前代码中的orderingInfo相同)。所有聚合器都有一个相关的排序特征函数ord(input_order) - > output_order,它将input_order(输入流​​上的排序保证)映射到output_order(输出流的排序保证) - 意思是如果输入流中的行是根据input_order排序的,那么输出流中的行将根据output_order进行排序。

    表读取器的排序保证以及特征函数可用于在逻辑计划中传播排序信息。当存在不匹配(聚合器具有与保证不匹配的排序要求)时,我们插入sorting aggregator - 这是一个非分组聚合器,其输出schema与输入流中的重新排序元素的输入schema相同,无论输入顺序如何,都一定能提供保证输出的顺序。我们可以在逻辑计划级别进行排序来执行优化 - 我们可以将排序聚合器放在整个pipeline的早期,或者将其拆分为多个节点(其中一个节点在前一阶段执行初步排序)。

    我们使用一些简单的查询来介绍聚合器的主要类型。

    例 1

    TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE)
    
    SELECT CID, SUM(VALUE) FROM Orders
      WHERE DATE > 2015
      GROUP BY CID
      ORDER BY 1 - SUM(Value)
    

    这是聚合器和流的潜在描述:

    TABLE-READER src
      Table: Orders
      Table schema: Oid:INT, Cid:INT, Value:DECIMAL, Date:DATE
      Output filter: (Date > 2015)
      Output schema: Cid:INT, Value:DECIMAL
      Ordering guarantee: Oid
    
    AGGREGATOR summer
      Input schema: Cid:INT, Value:DECIMAL
      Output schema: Cid:INT, ValueSum:DECIMAL
      Group Key: Cid
      Ordering characterization: if input ordered by Cid, output ordered by Cid
    
    EVALUATOR sortval
      Input schema: Cid:INT, ValueSum:DECIMAL
      Output schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
      Ordering characterization:
        ValueSum -> ValueSum and -SortVal
        Cid,ValueSum -> Cid,ValueSum and Cid,-SortVal
        ValueSum,Cid -> ValueSum,Cid and -SortVal,Cid
      SQL Expressions: E(x:INT) INT = (1 - x)
      Code {
        EMIT E(ValueSum), CId, ValueSum
      }
    
    AGGREGATOR final:
      Input schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
      Input ordering requirement: SortVal
      Group Key: []
    
    Composition: src -> summer -> sortval -> final
    
    

    请注意,逻辑描述不包括排序聚合器。当加入排序操作的时候,这个初步的逻辑执行计划将会变成一个完整的逻辑计划。我们必须在最终之前插入一个排序聚合器:

    src -> summer -> sortval -> sort(OrderSum) -> final
    
    

    每个箭头都表示一个逻辑上的数据流,这就是一个完整的逻辑执行计划。
    在这个例子中,我们只有一个排序列,让我们在看另一个例子。

    例 2

    TABLE People (Age INT, NetWorth DECIMAL, ...)
    
    SELECT Age, Sum(NetWorth) FROM People GROUP BY AGE ORDER BY AGE
    

    初步的逻辑计划如下:

    TABLE-READER src
      Table: People
      Table schema: Age:INT, NetWorth:DECIMAL
      Output schema: Age:INT, NetWorth:DECIMAL
      Ordering guarantee: XXX  // will consider different cases later
    
    AGGREGATOR summer
      Input schema: Age:INT, NetWorth:DECIMAL
      Output schema: Age:INT, NetWorthSum:DECIMAL
      Group Key: Age
      Ordering characterization: if input ordered by Age, output ordered by Age
    
    AGGREGATOR final:
      Input schema: Age:INT, NetWorthSum:DECIMAL
      Input ordering requirement: Age
      Group Key: []
    
    Composition: src -> summer -> final
    
    

    summer聚合器可以通过两种方式执行聚合操作-若输入没有按照age排序,summer聚合器将会使用一个无序的map来存储输入:每个age一个数据实体,最终会以order乱序的方式产生输出;若输入是按照age排序的,聚合器将会按照age的顺序,每次sum一个age的数据,产生的输出也会与输入中age的排序一致。
    我们看看下面两种情况:

    1. 输入按照 Age排序

      在这种情况下,我们发现summer聚合器会保存并传递age的顺序,这时我们不需要添加额外的排序聚合器。

    2. 数据没有按照任何字段排序

      在这种情况下,summer聚合器不会保证输出的任何顺序并且我们需要在final聚合器之前需要自己添加一个排序聚合器。

      src -> summer -> sort(Age) -> final
      
      

      我们应该意识到在summer聚合器执行summer操作之前,可以先对age进行排序:

      src -> sort(Age) -> summer -> final
      
      

      上述两种逻辑执行计划都没有问题,我们可以任选其一。

    也有可能summer使用一个有序的map,这样可以保证所有的输出结果都是按照age排序的。这样就可以保证:无论输入是否有序,我们都可以保证输出是基于某个字段有序的。

    排序的反向传递

    在前面的例子中,我们看到可以通过table reader流中的排序以及排序保证来避免排序。初步的逻辑计划将尽可能地保持顺序,从而尽可能的减少额外的排序。

    但是,在某些情况下,保持顺序可能需要一些额外的开销;某些聚合器可以配置为保持顺序或不保持。为了避免不必要地排序保持,在排序聚合器ready之后,我们重新分析logical plan并尽可能的去掉对streams的排序。具体来说,我们检查每个logical stream(以相反的拓扑顺序)并检查若删除其排序是否仍然可以得到正确的logical plan;这就导致了排序的反向传播。

    总结一下,逻辑执行计划有三个阶段:

    1. 初步的逻辑计划,尽可能保留排序,没有排序节点
    2. 满足排序的逻辑计划,根据需要添加排序节点
    3. 最终的逻辑计划,将所有排序操作统一,尽可能的优化整个排序的性能,并减少性能消耗

    例 3

    TABLE v (Name STRING, Age INT, Account INT)
    
    SELECT COUNT(DISTINCT(account)) FROM v
      WHERE age > 10 and age < 30
      GROUP BY age HAVING MIN(Name) > 'k'
    
    TABLE-READER src
      Table: v
      Table schema: Name:STRING, Age:INT, Account:INT
      Filter: (Age > 10 AND Age < 30)
      Output schema: Name:STRING, Age:INT, Account:INT
      Ordering guarantee: Name
    
    AGGREGATOR countdistinctmin
      Input schema: Name:String, Age:INT, Account:INT
      Group Key: Age
      Group results: distinct count as AcctCount:INT
                     MIN(Name) as MinName:STRING
      Output filter: (MinName > 'k')
      Output schema: AcctCount:INT
      Ordering characterization: if input ordered by Age, output ordered by Age
    
    AGGREGATOR final:
      Input schema: AcctCount:INT
      Input ordering requirement: none
      Group Key: []
    
    Composition: src -> countdistinctmin -> final
    
    

    聚合器的类型

    • TABLE READER 是一个特殊的聚合器,它没有input stream. 主要用来扫描一个table或者index 以及它需要读取的schema信息。和其他的aggregator一样,它也可以通过配置搭配上一个可编程的output filter一起使用。

    • EVALUATOR 是一个完全可编程的非分组的聚合器.它作用于每一个单独的行。通过该聚合器可以删除行或者任意地修改行。

    • JOIN对两个输入流进行连接,在某些列之间具有相等约束。聚合器分组只能作用于join列上。详细信息请查看 Stream joins.

    • JOIN READER 从input stream中根据指定的Key值进行点查。可以通过执行KV读取(潜在的远程读取)或者建立远程数据流来进行Key值的点查。详情请查看 Join-by-lookupOn-the-fly flows setup.

    • MUTATE 执行基于KV的插入/删除/更新。

    • SET OPERATION 可以将任意的值设置为OPTION。

    • AGGREGATOR 在SQL语义中做聚合的聚合器。它会对每一行分组并且针对于每一组进行聚合操作。分组是根据group key进行的, AGGREGATOR可以配置为通过一个或者多个聚合函数进行聚合:

      • SUM
      • COUNT
      • COUNT DISTINCT
      • DISTINCT

      AGGREGATOR的输出schema信息包括group key, 生成的一些生成列以及原有的一些列。可以添加一些输出过滤规则(当然也可以不添加)针对group key和生成的值进行过滤。 (例如:可以对最终没有输出的值进行过滤).

    • SORT 对输入的数据针对配置的列进行排序。需要注意:这是一个 no-grouping 的聚合器, 因此该聚合器可以分配给任意的数据生产者。当然,这也就意味着它不会产生任何的全局排序结果,而只是针对于每个数据流的内部进行排序。可以通过分组处理器的输入同步器实现全局排序(例如:LIMIT 或者 FINAL)。

    • LIMIT是一个 single-group aggregator,在读取到limit指定的行数之后,就会停止读入。

    • INTENT-COLLECTOR 是在网关上执行的single-group aggregator,它接收由MUTATE聚合器生成的所有意图,并在内存中跟踪它们,直到提交事务为止。

    • FINAL 是在网关上执行的single-group aggregator,用于收集查询结果。此聚合器通过pg协议连接到客户端。

    从逻辑执行计划到物理执行计划

    我们基于以下事实,进行聚合器和逻辑流中的计算的分布式处理:

    • 对于任何聚合器,只要组的所有处理都发生在单个节点上,就可以将组划分为子集并并行处理。

    • 聚合器的排序特征适用于具有特定顺序的任一输入流;聚合器的排序特征甚至可以用在逻辑节点上的多个并行计算实例:如果所有并行实例中的物理输入流和逻辑输入流中的顺序一样(在逻辑计划中),则所有实例中的物理输出流将具有逻辑输出流中保证的输出顺序。如果在后续阶段中将这些流合并为单个流(合并排序),则该物理流将具有和逻辑流一样的正确顺序。

    • 具有空分组键的聚合器(limitfinal)必须在单个节点上进行最终处理(但它们可以具有初始的分布式处理阶段)。
      因此,每个逻辑聚合器可以对应于多个分布式实例,并且每个逻辑流可以对应于多个具有顺序保证物理流。

    我们可以基于一些简单的规则进行分布式处理:

    • 可以有多个实例从而进行并行处理,根据range分割;每个table reader实例由相关range的raft leader处理,并且该实例是 physical stream的开始。

    • streams继续在程序中并行处理。当streams到达聚合器时,可以基于group key的值进行hash 散列将streams重新分布到到任意数量的node上。具有空group key的聚合器将只有一个实例(node),并且根据期望的顺序合并上游过来的多个输入流。如上所述,每个物理流其实早已被排好序了(因为它们都对应于有序的逻辑流)。

    • 排序聚合器作用于每个physical stream(与logical stream的排序一致)。排序聚合器不会导致将结果合并到单个节点中。

    需要注意:按照range的边界进行分布式处理并不是为了保证结果正确的必要条件 - 即使在我们进行query plan的时候,出现了range的分裂或者移动,也不会产生错误的结果。对于一些key的读取可能较慢,这主要是因为对他们是远程读取的,但只要大多数时间,大多数key都在本地读取,性能都不是问题。

    Assume that we run the Example 1 query on a Gateway node and the table has data that on two nodes A and B (i.e. these two nodes are masters for all the relevant range). The logical plan is:
    假设我们在** Gateway **节点上运行示例1中的查询,并且该表的数据分布在两个节点:A和B上(即,这两个节点是所有相关range的主节点)。那么逻辑执行计划应该是:

    原始查询语句及表结构:
    TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE)
    
    SELECT CID, SUM(VALUE) FROM Orders
      WHERE DATE > 2015
      GROUP BY CID
      ORDER BY 1 - SUM(Value)
    
    逻辑执行计划:
    TABLE-READER src
      Table: Orders
      Table schema: Oid:INT, Cid:INT, Value:DECIMAL, Date:DATE
      Output filter: (Date > 2015)
      Output schema: Cid:INT, Value:DECIMAL
      Ordering guarantee: Oid
    
    AGGREGATOR summer
      Input schema: Cid:INT, Value:DECIMAL
      Output schema: Cid:INT, ValueSum:DECIMAL
      Group Key: Cid
      Ordering characterization: if input ordered by Cid, output ordered by Cid
    
    EVALUATOR sortval
      Input schema: Cid:INT, ValueSum:DECIMAL
      Output schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
      Ordering characterization: if input ordered by [Cid,]ValueSum[,Cid], output ordered by [Cid,]-ValueSum[,Cid]
      SQL Expressions: E(x:INT) INT = (1 - x)
      Code {
        EMIT E(ValueSum), CId, ValueSum
      }
    
    

    上面的逻辑计划可以实例化为以下物理计划:

    physical plan中的每个方格都是一个 processor:

    • src是一个table reader,执行KV Get操作并形成行;它被用于读取属于相应节点的上的所有数据。它在输出行之前会首先通过Date> 2015过滤器对结果进行过滤。
    • summer-stage1 is the first stage of the summer aggregator; its purpose is to do the aggregation it can do locally and distribute the partial results to the summer-stage2 processes, such that all values for a certain group key (CId) reach the same process (by hashing CId to one of two "buckets").
      summer-stage1summer聚合器的第一个阶段;它的作用是进行本地聚合并将部分结果分发到summer-stage2进程,这样某个group key(CId)的所有值都会被分发到同一个目的端(summer-stage2的特定的process)(通过哈希CId到两个“桶”中的一个)。
    • summer-stage2 执行实际的sum操作并输出index (CId) 和相应的sum值:ValueSum
    • sortval 计算出额外的列:SortVal的值,以及CIdValueSum
    • sort 基于SortVal的值对stream中的数据进行排序。
    • final 合并前面的两个输入数据流以产生最终的排序结果。

    注意:summer聚合器的第二阶段不需要在相同的节点上运行;例如,可以采用另外一种物理执行计划:可以在单个节点上执行summer-stage2 processor:


    所有的processor总是形成有向无环图。

    Processors

    Processor 通常由三个部分组成:


    1. input synchronizer: 将所有的input streams 合并成单独的一个data stream。主要如以下几种 input synchronizer:
    *   single-input (pass-through)
    *   unsynchronized:从所有输入流传递行,任意交错,不保证顺序,也不做排序。
    *   ordered:  input physical stream本身就已经排好序了(与physical stream的顺序一致);synchronizer小心交错读取流中的数据,以便merged stream具有相同的顺序(该过程实际上就是一个K路归并排序)。
    
    1. data processor :数据转换或聚合逻辑的核心实现部分(在某些情况下执行KV操作)。

    2. output router : 将processor的输出分成多个流;主要有以下几种output router:

      • single-output (pass-through)
      • mirror: 将每一行都输出到所有的output stream中
      • hashing: 每行输出到output stream,根据应用于数据元组的某些元素的散列函数进行选择目标output stream。
      • by range: router配置了range信息(与特定的某个表有关),并且能够将行发送到相应的range的leaseholder节点(对于JoinReader节点很有用(将索引值发送到负责PK的节点)和INSERT(将新行添加到它们的leaseholder上))。

    Joins

    Join-by-lookup

    join-by-lookup方法涉及从一个表接收数据并从另一个表中查找相应的行。它通常用于将索引与表连接,但它们可以在适当的情况下可用于任何连接,例如,将一个表中的少量行与另一个表的主键连接。我们引入了一个TABLE-READER的变体,它有一个输入流。针对输入流的每个元素都在另一个表或索引中进行点查,并在输出中产生相应的值。在内部,聚合器分批执行查找,例如:

    TABLE t (k INT PRIMARY KEY, u INT, v INT, INDEX(u))
    SELECT k, u, v FROM t WHERE u >= 1 AND u <= 5
    

    Logical plan:

    TABLE-READER indexsrc
    Table: t@u, span /1-/6
    Output schema: k:INT, u:INT
    Output ordering: u
    
    JOIN-READER pksrc
    Table: t
    Input schema: k:INT, u:INT
    Output schema: k:INT, u:INT, v:INT
    Ordering characterization: preserves any ordering on k/u
    
    AGGREGATOR final
    Input schema: k:INT, u:INT, v:INT
    
    indexsrc -> pksrc -> final
    
    

    也可以用在join查询中:

    TABLE t1 (k INT PRIMARY KEY, v INT, INDEX(v))
    TABLE t2 (k INT PRIMARY KEY, w INT)
    SELECT t1.k, t1.v, t2.w FROM t1 INNER JOIN t2 ON t1.k = t2.k WHERE t1.v >= 1 AND t1.v <= 5
    
    

    Logical plan:

    TABLE-READER t1src
    Table: t1@v, span /1-/6
    Output schema: k:INT, v:INT
    Output ordering: v
    
    JOIN-READER t2src
    Table: t2
    Input schema: k:INT, v:INT
    Output schema: k:INT, v:INT, w:INT
    Ordering characterization: preserves any ordering on k
    
    AGGREGATOR final
    Input schema: k:INT, u:INT, v:INT
    
    t1src -> t2src -> final
    
    

    注意,JOIN-READER具有将输入列直接转换为输出列的能力(在本示例中“v”就属于这种情况)。在基于索引join的情况下,它仅用于跳过读取或解码“v”的值;但在一般情况下,有必要从第一个表中传递列。
    在“JOIN-READER”的物理实现方面,有两种实现方式:

    1.它可以在接收物理输入流的节点上执行KV查询(分批);并在同一节点上继续输出。
    这种方式这很简单,但会涉及到在node和range leaseholder之间的多于的网络开销。我们可能会有限采用此策略进行实现。

    1. 可以通过基于range的router将每个输入路由到其对应的range的leaseholder节点上的JOIN-READER实例;然后在range的leaseholder节点上继续后续的数据处理。
      这种做法避免了过多的网络往返开销,但是存在其他的问题,因为我们可能在太多节点上创建这种处理流(对于大型表,群集中的许多/所有节点都会存在range)。为了有效地实现上述特性,只有当我们实际找到需要经过某个特定流的行时,我们才会“lazy”的(根据需要)创建流。当t1t2的顺序相关时(例如,t1可以按日期排序,t2可以由隐式主键排序),这种策略特别有用。
      即使进行了这种优化,如果我们只进行少量的一些查询,也会涉及到很多remote node,这就存在极大地浪费。因此我们可以研究一种混合方法,根据处理数据的大小以及这些数据跨越多少range和node,在两种策略之间进行选择。

    Stream joins

    join聚合器在两个流上执行join,在join列之间存在的是相等约束。聚合器基于join key进行分组。

    TABLE People (First STRING, Last STRING, Age INT)
    TABLE Applications (College STRING PRIMARY KEY, First STRING, Last STRING)
    SELECT College, Last, First, Age FROM People INNER JOIN Applications ON First, Last
    
    TABLE-READER src1
    Table: People
    Output Schema: First:STRING, Last:STRING, Age:INT
    Output Ordering: none
    
    TABLE_READER src2
    Table: Applications
    Output Schema: College:STRING, First:STRING, Last:STRING
    Output Ordering: none
    
    JOIN AGGREGATOR join
    Input schemas:
      1: First:STRING, Last:STRING, Age:INT
      2: College:STRING, First:STRING, Last:STRING
    Output schema: First:STRING, Last:STRING, Age:INT, College:STRING
    Group key: (1.First, 1.Last) = (2.First, 2.Last)  // we need to get the group key from either stream
    Order characterization: no order preserved  // could also preserve the order of one of the streams
    
    AGGREGATOR final
      Ordering requirement: none
      Input schema: First:STRING, Last:STRING, Age:INT, College:STRING
    
    

    stream join aggregators的物理实现的核心是 join processor 。 join processor 通常将来自一个流的所有行放入哈希映射中,然后处理另一个流。如果两个流都按group column 排序,则它可以消耗很少内存的前提下就可以执行merge-join。执行需要较少内存的合并连接。
    即使采用相同的join processor 实现,我们也可以根据我们创建physical streams和routers的方式不同而采用不同的分布式处理策略:

    • router可以根据group key元素的散列值将每一行分配给多个join processor之一;这可确保group key的值相同的所有元素到达同一join processor,从而实现hash-join。一个示例物理计划如下:


    • router可以复制来自一个表的物理流的所有行,并将副本分发给所有processor实例;另一个表的流在各自的节点上处理。当我们使用小表join大表时,此策略非常有用,并且对于子查询尤其有用,例如: SELECT ... WHERE ... AND x IN(SELECT ...)
      上面的查询,若src2中的数据比较少,那么这样的物理执行计划的效率会更高:

      image
      这种物理执行计划的不同之处在于第一个表的数据不会通过网络传输到其他节点,而src2table reader之后的router会将src2的结果向每个src1所在的节点都广播一份(而不是在前一种物理执行计划下通过hash-join将两个表的所有数据全部进行hash分布)。

    Inter-stream ordering

    这是一个优化功能特性,不会改变逻辑或物理计划的结构。也不是初始执行计划的一部分,但是会在后续的执行计划优化中用到该功能特性。
    示例如下:

    TABLE t (k INT PRIMARY KEY, v INT)
    SELECT k, v FROM t WHERE k + v > 10 ORDER BY k
    

    最简单的执行计划如下:

    READER src
      Table: t
      Output filter: (k + v > 10)
      Output schema: k:INT, v:INT
      Ordering guarantee: k
    
    AGGREGATOR final:
      Input schema: k:INT, v:INT
      Input ordering requirement: k
      Group Key: []
    
    Composition: src -> final
    
    
        现在假设该表跨越两个不同节点上的两个range - 一个range的key范围是:“k <= 10”,一个range的key范围是:“k> 10”。在物理计划中,将存在两个table reader产生的两个stream;在“final”stage之前,两个stream将合并为单个stream。但是我们知道,在合并之前,其实在两个stream中,每个stream中的元素是已经排好序的了- 这种情况我们称之为:**inter-stream ordering**。这样我们在合并时(在`final`之前)会更加高效:我们只需要简单地先读取第一个流中的所有元素,然后再读取第二个流中的所有元素。在第一个流被消耗完毕之前不需要调度第二个流的读取器和其他处理器。特别是,当我们使用`ORDER BY`和`LIMIT`进行查询时:可以使用具有单个group的aggregator来表示limit,通过该aggregator合并所有的physical stream;通过`inter-physical-stream`我们就可以仅仅通过从一个range中读取数据,就可以实现limit功能。
    

    我们在logical plan中加入了 inter-physical-stream ordering的概念,将其作为logical stream的属性(即使它指的是与该logical stream相关联的多个physical stream)。我们使用inter-stream ordering characterization function来注释所有聚合器(类似于上面描述的"intra-stream"排序特征)。 inter-stream排序函数将输入流的排序映射到输出流的排序,其含义是:若输入流中的数据本来就已经有序了,经过了aggregator的处理后,其输出流中元素的顺序会和输入流中的顺序一致。
    如果logical stream具有适当的相关联的inter-stream ordering,则可以通过顺序读取流来实现物理流的合并。

    执行架构

    一旦生成了物理计划,系统就需要将其拆分并分布到各个node之间进行运行。每个node负责本地调度data processors 和 input synchronizers。node还需要能够彼此通信以将输出output router连接到input synchronizer。特别是,需要一个streaming interface来连接这些组件。为了避免额外的同步成本,需要足够灵活的执行环境以满足上面的所有这些操作,以便不同的node除了执行计划初始的调度之外,可以相对独立的启动相应的数据处理工作,而不会受到gateway节点的其他编排影响。

    创建一个local plan: ScheduleFlows RPC

        当开始对执行计划开始进行分布式执行的时候,首先由gateway向每个节点发送request,由每个node执行其负责的部分计划,并要求node调度它负责的sub-plan(详情可以参考下文的:模数“on-the-fly”flows)。一个node可能负责整个DAG的多个不同部分。我们将DAG中的每个部分称为*flow*。一个flow由其中的physical plan节点序列,它们之间的连接(input synchronizers, output routers)加上physical plan中起始节点的输入流的标识符和(可能是多个)结束节点的输出流组成。集群中的一个node可能同时负责多个不同的flow。更常见的情况是,当集群中的一个node是一个query中涉及到的多个range的leaseholder的时候,它将负责一组相同类型的flow,每个range对一个一个同类的flow,全部以“TableReader”处理器开始。在开始时,我们将所有这些`TableReader'合并为一个,由这一个TableReader读取所有位于该node上的ranges,但是这也就意味着我们将不会进行流间排序(因为我们已将所有内容都转换为单个流)。稍后我们可能会在每个range内使用一个“TableReader”,以便我们可以进行并行读取,从而加快读取速度。
    

    因此,集群中的node实现了一个ScheduleFlows RPC,它接受一组flow,设置输入和输出相关的信息(见下文),创建本地processor并开始执行。在node对输入和输出数据进行处理的时候,我们需要对flow进行一些控制,通过这种控制,我们可以拒绝request中的某些请求。

    flow的本地调度

    在本地节点调度不同processor的最简单方法就是并行执行:每个processor,synchronizer和router都可以作为goroutine运行,它们之间由channel互联。这些channel可以缓冲信道以使生产者和消费者同步。

    Mailboxes

    不同节点上的flow通过GRPC stream相互通信。为了允许生产者和消费者可以在不同时间启动,ScheduleFlows为所有输入和输出流都创建了命名mailboxes。这些命名mailboxes实际上有一个内部队列用于临时保留一定数量的数据,直到建立GRPC流来传输它们。一旦建立起GRPC流,GRPC flow control 将会控制同步生产者和消费者的数据同步传输。消费者根据mailbox id(与已传递给ScheduleFlows的流中已使用的相同)使用StreamMailbox RPC建立GRPC流。mailboxes是动态创建的,一旦创建之后希望很快就会出现ScheduleFlows。如果在超时时间内没有创建ScheduleFlows,则mailbox将停用。邮箱代表了本地处理器的channel接口。如果我们想要每个节点的多个TableReader/flow合并成一个,那么我们也要将这些flow的输出也合并成一个。(如果一个节点有10个range,我们可以通过合并来减少所需要的mailbox/stream的个数)。此时,我们可能希望将进入同一个mailbox的不同stream打上不同的tag,以便消费者仍可以进行流间排序。
    使用mailbox执行的简单查询的执行图:

    创建即时flows

    在一些情况下,我们不希望从一开始就将所有的flow创建起来。 PointLookupMutate操作通常从几个range开始,然后将数据发送到任意node。要发送到每个节点的数据量通常非常小(例如,PointLookup可能会在表A上执行少量查找,因此我们不希望在所有节点上都为这些查找设置receiver,来接受这些少量的数据。相反,物理执行计划将只包含一个processor,使PointLookup聚合器成为只有一个stage;此node可以选择是否针对该查找直接执行KV操作(对于查找次数较少的range)或者使用ScheduleFlows RPC动态设置远程流量来查找大量的查找range。在这种情况下,最好的解决方案就是将计算发送到数据所在的node,因此传递给ScheduleFlows的flow将是聚合器下游的物理节点上的计算逻辑,包括过滤和聚合。一旦processor发现它在同一range内需要批量进行大量的点查,它将会进行下游计算逻辑的发送。

    废弃 flows

    在很多情况下Processor和mailbox都需要销毁:

    1.processor在其所有输入流上接收到一个哨兵标记并将哨兵标记以及其之前的所有数据都已经输出完毕后,会进行销毁。
    2.一旦processor的输入或输出流关闭,processor就会退出。消费者可以使用它来告知其生产者它已获得所需的所有数据。
    3.input mailbox将会在将哨兵标记发送出去或远程关闭GRPC流后退出。
    4.output mailbox一旦将哨兵标记传递给reader就会退出,一旦所有输入通道都关闭,它就会退出(请记住,output mailbox可能会接收来自多个channel的输入,每个同类的flow一个channel)如果其GRPC流远程关闭,它也会销毁。
    5.TableReader一旦它在其range内传递了最后一个元组(+一个哨兵标记)就销毁。

    错误处理

    最初,执行计划是没有错误恢复的(执行期间出现任何问题,查询失败并且事务被回滚)。唯一的问题是释放执行计划的各个处理节点采用的所有资源。可以通过在任意的GRPC流突然关闭时发送一个error signal来完成资源释放。类似地,可以通过在“FINAL”processor中关闭其输入通道来取消正在运行的查询。此关闭将向后传播到所有查询执行计划的节点。

    一个更加复杂的例子: Daily Promotion

    我们尝试一个更复杂的查询。查询的目的是帮助推广每日发布的广告,定位去年花费超过1000美元的客户,'DailyPromotion`中存储了每个客户以及他们最近的订单。

    TABLE DailyPromotion (
      Email TEXT,
      Name TEXT,
      OrderCount INT
    )
    
    TABLE Customers (
      CustomerID INT PRIMARY KEY,
      Email TEXT,
      Name TEXT
    )
    
    TABLE Orders (
      CustomerID INT,
      Date DATETIME,
      Value INT,
    
      PRIMARY KEY (CustomerID, Date),
      INDEX date (Date)
    )
    
    INSERT INTO DailyPromotion
    (SELECT c.Email, c.Name, os.OrderCount FROM
          Customers AS c
        INNER JOIN
          (SELECT CustomerID, COUNT(*) as OrderCount FROM Orders
            WHERE Date >= '2015-01-01'
            GROUP BY CustomerID HAVING SUM(Value) >= 1000) AS os
        ON c.CustomerID = os.CustomerID)
    

    Logical plan:

    TABLE-READER orders-by-date
      Table: Orders@OrderByDate /2015-01-01 -
      Input schema: Date: Datetime, OrderID: INT
      Output schema: Cid:INT, Value:DECIMAL
      Output filter: None (the filter has been turned into a scan range)
      Intra-stream ordering characterization: Date
      Inter-stream ordering characterization: Date
    
    JOIN-READER orders
      Table: Orders
      Input schema: Oid:INT, Date:DATETIME
      Output filter: None
      Output schema: Cid:INT, Date:DATETIME, Value:INT
      // TODO: The ordering characterizations aren't necessary in this example
      // and we might get better performance if we remove it and let the aggregator
      // emit results out of order. Update after the  section on backpropagation of
      // ordering requirements.
      Intra-stream ordering characterization: same as input
      Inter-stream ordering characterization: Oid
    
    AGGREGATOR count-and-sum
      Input schema: CustomerID:INT, Value:INT
      Aggregation: SUM(Value) as sumval:INT
                   COUNT(*) as OrderCount:INT
      Group key: CustomerID
      Output schema: CustomerID:INT, OrderCount:INT
      Output filter: sumval >= 1000
      Intra-stream ordering characterization: None
      Inter-stream ordering characterization: None
    
    JOIN-READER customers
      Table: Customers
      Input schema: CustomerID:INT, OrderCount: INT
      Output schema: e-mail: TEXT, Name: TEXT, OrderCount: INT
      Output filter: None
      // TODO: The ordering characterizations aren't necessary in this example
      // and we might get better performance if we remove it and let the aggregator
      // emit results out of order. Update after the section on backpropagation of
      // ordering requirements.
      Intra-stream ordering characterization: same as input
      Inter-stream ordering characterization: same as input
    
    INSERT inserter
      Table: DailyPromotion
      Input schema: email: TEXT, name: TEXT, OrderCount: INT
      Table schema: email: TEXT, name: TEXT, OrderCount: INT
    
    INTENT-COLLECTOR intent-collector
      Group key: []
      Input schema: k: TEXT, v: TEXT
    
    AGGREGATOR final:
      Input schema: rows-inserted:INT
      Aggregation: SUM(rows-inserted) as rows-inserted:INT
      Group Key: []
    
    Composition:
    order-by-date -> orders -> count-and-sum -> customers -> inserter -> intent-collector
                                                                      \-> final (sum)
    
    

    可能的物理执行计划如下:


    实现策略

    实现的时候,有两个设计原则:

    • Milestone M1: 将过滤尽量下发到源端
    • Milestone M2: 将更新下发到源端

    逻辑执行计划

    基于查询语句构建逻辑执行计划会涉及到很多方面:

    • 索引选择
    • 查询优化
    • 选择各种排序,聚合策略
    • 选择各种join策略

    基于语句构建查询执行计划涉及到的领域很多,我们可以从基于现有代码的基本实现开始,随着时间的推移,不断改进。

    物理执行计划

    物理执行计划阶段的许多决策都是“强制”的 - table reader根据range的分布情况进行分布式分发,并且大部分物理执行计划都遵循这一点。

    物理执行计划涉及到的困难的决策是在对aggregation 或者join操作的第二阶段的分布式处理的时候-我们可以在任意nodes上设置任意数量的“buckets”(以及后续的flow)。例如。 summer的例子。幸运的是,我们可以从一个简单的策略开始 - 使用尽可能多的桶作为输入流并在相同的节点之间分配它们。此策略可以根据查询的数据量的大小很好地进行扩展:如果查询从单个节点中提取数据,我们将在该节点上进行所有聚合;如果查询从许多节点中提取数据,我们将在这些节点之间进行分布式聚合。

    我们还将支持通过配置的方式以最小化分布式处理 - 尽可能快地在单个网关节点上获取所有内容。这也是一个保守的做法,可以避免在太多节点之间进行分布式查询出现的一些问题。

    “阶段2”将一直检测何时计算(和后续阶段)可能足够快以及消耗的资源足够小从而以不需要进行分布式处理,而自动切换为在gateway节点上直接执行聚合。可以在以后研究进一步的改进(基于统计)。

    我们应该添加扩展的SQL语法,以允许查询发起者控制其中一些参数。

    Processor 架构及实现

    本节会详细说明processor的架构。我们可以从table reader开始(足够实现M1)。

    Joins

    正交工作流是为了支持join(最初是非分布式的)。这会涉及构建processor,该processor将成为hash join实现的核心,并将该代码与当前的“planNode”树集成。

    调度

    有效排队和processor调度的问题将不断改善。但我们可以从一个基于简单策略的基本实现开始:

    • 只会在事务中才会进行队列排序;我们在单个processor内是不进行排序的
      任何时候都要限制事务的个数或者运行的processor的总数
    • txn排队的排序是基于txn时间戳及其优先级的,允许节点自动就事务的相对排序达成一致,消除死锁情况(死锁的示例:txn A有一些processor在节点1上运行并等待,在节点2上被txnB使用的processor;并且txn B也有一些处理器在节点2上运行,同时等待在节点1上运行的被txn A使用的processor)

    KV 整合

    我们不建议引入任何新的KV Get / Put API。当前的这些API已经够用了。当在lease holder上运行查询的时候,查询的速度将会和在本地运行的速度一样快。
    但是,我们还需要对KV层进行一些其他的集成:

    1. 查找range信息

      在物理执行计划阶段,我们需要将关键span拆分为range,并确定每个range的lease holder。我们还可以在逻辑执行阶段使用range信息来帮助评估table大小(用于索引选择,join顺序等)。 KV层已经有一个range cache来维护这些信息,但我们需要根据我们维护的cache中的信息量以及更新/过期cache信息的方式来更加主动的对cache中的信息进行维护。

    2. 分布式读

      KV层几乎不需要更改就可以支持分布式读取

    3. 分布式写

    事务协调器会跟踪所有修改的key或者修改的range。sql分发层会将修改后的key的信息汇集回gateway节点(充当事务协调器)。我们需要做些集成的工作,以便我们将此信息传递给KV层。最终我们需要一个可以用于range的流式读取接口。

    实现说明

    可视化/追踪

    必须提供有关逻辑和物理计划的详细信息,以及查询所有阶段的详细跟踪,包括执行时间,统计信息等。

    一些简单查询的数据流向说明

    一个简单查询(基于过滤的查询或者更新)的数据流:

    Node A Node B Node C Node D
    Receives statement
    Finds that the table data spans three ranges on B, C, D
    Sends scan requests to B, C, D
    Starts scan (w/filtering, updates) Starts scan (w/filtering, updates) Starts scan (w/filtering, updates)
    Sends results back to A Sends results back to A Sends results back to A
    Aggregates and returns results.

    hash join的数据流:

    Node A Node B Node C Node D
    Receives statement
    Finds that the table data spans three ranges on B, C, D
    Sets up 3 join buckets on B, C, D
    Expects join data for bucket 0 Expects join data for bucket 1 Expects join data for bucket 2
    Sends scan requests to B, C, D
    Starts scan (w/ filtering). Results are sent to the three buckets in batches Starts scan (w/ filtering) Results are sent to the three buckets in batches Starts scan (w/ filtering). Results are sent to the three buckets in batches
    Tells A scan is finished Tells A scan is finished Tells A scan is finished
    Sends finalize requests to the buckets
    Sends bucket data to A Sends bucket data to A Sends bucket data to A
    Returns results

    复杂性

    我们需要为SQL-to-SQL构建新的基础结构和API。 API需要支持SQL表达式,可以是SQL字符串(需要每个节点重新解析表达式),也可以是更有效的AST序列化。
    API还需要包含有关请求应限制到哪些key range的信息。由于表可以跨越许多raft ranges,因此该信息可包括大量不相交的key range。
    在未来,我们可能会实现像EPaxos这样的共识算法,它允许直接在副本上直接操作,这样就为我们的分布式处理提供了更多的选择。
    最后,API的设计必须要考虑到数据处理,网络传输和存储操作之间的并行- 应该可以在结果全部可用之前对结果进行流式传输(F1通过取消流结果的排序而加快了获取结果的速度)。

    Spark: 将SQL编译成一个运行于分布式执行环境之上的数据并行处理语言

    这里我们介绍一个新系统- 一个分布式计算的执行环境。计算采用类似于M/R的编程模型,或者是更流水线化的模型 - Spark, 或者 Google's Dataflow (Dataflow的一部分作为apache的项目,可以运行在其他执行环境(例如:spark)之上)。

    在这些模型中,您可以考虑使用可以并行操作的数据结构,如:数据和map。对于这些数据结构的存储是分布式的。您所做的就是对这些数组或map进行操作 - 对它们进行排序,按key对它们进行分组,转换和过滤。您还可以对多个数据集进行操作以进行join操作。

    这些模型试图拥有 * a)执行符号执行的智能编译器,例如融合尽可能多的操作 - “map(f,map(g,dataset))== map(f●g,dataset)`和 * b)动态运行。运行时会一遍根据部分输入的计算结果动态的决定后续的计算流程。

    我们的想法是将SQL编译成这种语言,考虑到我们以一个大的有序map作为数据集,然后在该数据集上运行查询和计算。如果执行环境良好,它会利用数据拓扑。这与“分布式sql”不同,因为* a)执行环境是动态的,所以你不需要预先提出一个执行计划,说明哪个节点会向其他节点发出什么命令和 b )*数据可以从一个节点推送到另一个节点,而不仅仅是拉动。

    我们可以从小的开始 - 不执行分布式运行,只需过滤“SELECTS”并过滤“UPDATE,DELETE,INSERT FROM SELECT”。

    相关文章

      网友评论

          本文标题:CockRoachDB-DistSQL设计文档

          本文链接:https://www.haomeiwen.com/subject/kmgbiqtx.html