美文网首页
Presto/Trino权威指南及官方设计文档解读

Presto/Trino权威指南及官方设计文档解读

作者: Caucher | 来源:发表于2021-05-03 21:26 被阅读0次

    官网地址:https://trino.io/docs/current/
    参阅书目《Trino: The definitive guide》
    开源社区博客地址:https://blog.starburstdata.com/
    Trino博客地址:https://trino.io/blog/
    作者均是presto创始人三位,因此这两份材料+2019年的presto论文(见我另一篇博客),是最权威的presto技术文档。然而经过编者阅读,这些还不足够精细,更多的技术文档,在开源社区的博客中有描述,编者会将这些信息补充都相应位置,以使读者获得最新的技术理解。
    trino就是presto的别名,历史原因不细说,为了叙述方便,以下统称presto。
    本文写作时,是trino356版本。写作日期:2021.5.3
    presto的基本术语和概念、安装教程、用途,本文不再赘述,相关文档/教程/博客在中文互联网上有很多,可移步查阅。本文的技术深度在user和contributer之间,不涉及源码阅读,只对软件设计的idea和trade-off做重点解读。之后如有机会,会对源码部分进行解读,更新博客。

    第四章 Presto的架构

    Presto是典型的主从式架构,总结构图如下。


    image.png

    4.3 节点发现服务

    节点启动时会向coordinator discovery service报告/通信,之后定期向coordinator发送心跳以证明存活。
    这个节点发现服务专门用于追踪存活节点,和worker节点列表维护的,这个服务被嵌入到presto内部,所以worker节点直接和coordinator的http服务通信即可。

    4.4 基于连接器的架构

    connector用于连接外部数据源。只要外部数据可以用Presto支持的数据类型表示成行、列、表,就可以创建connector与外部数据源相连接。
    Presto提供了SPI(service provider interface),用于实现一个连接器,只要连接器实现了SPI,就可以将Presto引擎和外部数据连接起来。SPI需要实现三个部分:

    • 获取表/视图/schema元数据的操作;
    • 产生数据分区的逻辑单元的操作,基于这些逻辑单元,presto能够并行读写;
    • 在源数据和Presto内存数据格式之间进行转换的操作。

    Coordinator和worker需要SPI提供的不同接口,如下图:


    image.png

    Presto是以插件形式,动态加载这些Connector。插件的架构,在Presto中得到广泛使用,比如事件监听器、数据类型、函数类型、访问控制等。

    4.7 查询执行模型

    image.png

    在物理计划生成之前,会用到多个SPI接口辅助指定计划:

    • Metadata SPI在Parse过程中可以进行语义校验、类型检查、安全检查等;
    • Data Statistics SPI在plan指定时提供一些表的统计信息,基于此制定基于代价的查询优化;
    • Data Location SPI在制定物理计划时提供数据位置,方便scheduler切片。

    源数据以Page形式产生数据,Page就是按照列式存储格式的多个行。
    task实际上是在具体一个worker node上,给定具体split的一个stage。task创建时,会为每个split初始化一个Driver,每个driver是一个operator流水线的一个实例,负责处理split中的数据。一个task可能会创建多个driver。
    oprertor,即算子,流水线上的基本组成单位,表示一个计算处理过程,常见的包括TableScan、Filter、Join、Aggregator等。


    image.png

    Coordinator首先根据MetaData SPI创建一个splits列表,基于该列表进行任务分配,在查询执行期间,coordinator跟踪所有的task和splits执行进度,一些worker完成了任务,产生了更多的splits待下游处理,那么coordinator继续进行分配任务。

    4.8 查询优化

    这里我们基于一个具体的SQL例子,对查询优化进行讲解。
    如下图,这是一个以地区、国家为维度统计销量的SQL语句,基于国家表、地区表、订单表、用户表来完成,是TPC-H上的一个SQL,我们以它作为例子。


    image.png

    4.8.2 初始查询计划

    我们根据字面含义,直接对上面这条SQL进行查询计划指定,得到如下的树状图。这样的树状图是可以执行出结果的,但是简单评估下,两次CrossJoin的复杂度就是\Omega(N*O*C)级别的操作,这样的查询计划不能在人类有生之年执行完毕。而查询优化器的任务,就是从这样的初始查询计划转化成一个等价的查询计划,能更高效的完成任务。理论上来说,等价的查询计划有指数级别个,optimizer的任务,是在一定的时间内,选取尽量最优的查询计划。

    image.png

    4.9-4.10 优化规则/启发式方法

    optimizer是根据一些优化规则对初始查询计划进行优化的。我们可以称之为启发式的方法。

    4.9.1 谓词下推

    谓词下推很容易理解,尽可能地把谓词下推到靠近叶子节点的位置,使得不会在结果中产生的Records不会过多地在执行过程中被处理。
    以刚才的例子,我们可以让crossjoin算子和filter算子部分进行合并,变成inner join,进行谓词下推。

    image.png
    谓词下推之后,查询计划的复杂度降低到了\Theta(N*O)

    4.9.2 Cross Join消除

    Cross Join一般没有什么业务含义,只会是一些SQL新手写出来的SQL语句,而Cross Join一般代价极高,很难完成,所以要对Cross join进行特定消除。
    Cross join的消除,主要是利用filter的筛选条件,对join的表顺序进行重新排序,力图将所有Cross join都变成非cross join。
    以刚才的例子,通过表顺序重排,我们可以将两个谓词全部下推,消除Cross join。

    image.png
    cross join之后,查询计划的复杂度不再主要由join支配,join的复杂度为\Theta(O)

    4.9.3 TopN

    对于order by + limit的组合在SQL使用中是非常常见的。naive的想法是先排序,再取前N个Record,排序很重因而复杂度较高。而利用堆进行TopN的分布式查询算法,已经比较成熟了,所以ORDER BY + LIMIT的操作,一般会被简化为TopN算子。
    TopN简化之后,排序不再是限速步,排序的复杂度是\Theta(N) * log(limit)

    By the way,在SQL中,ORDER BY 后面只有跟LIMIT或者FETCH FIRST才有用,其他情况下,Presto直接忽视ORDER BY。

    4.9.4 局部聚合

    一般来说,参与Join的表不会用到它所有的明细信息,尤其是在有GROUP BY查询进行聚合时,通常只会用到join表的有限几列的统计(聚合)信息,那么我们可以在join之前先对join表的相关列进行预聚合,然后再join,减少数据传递过程中的流量。
    为了提高并行性,预聚合通常采用局部聚合的方式来实现。当局部聚合能显著减少数据传递量时,能够显著提升性能,但是如果不能有效减少数据量,有可能造成反面效应。(因此presto默认关闭了这条规则,可以手动开启)


    image.png

    4.10.1 Leteral Join去关联化

    Leteral Join就是在SELECT位置的关联子查询,对于这种简化写法,Presto Parser会首先把它改写成标准的Left Join形式,如下图:


    image.png

    这种改写在语义上并不一致,尤其在空值和重复值上,所以为了保证语义一致,在查询计划上,要为原查询结果分配unique id进行区分,join结果要进行重复性检测,一旦有重复就要报错。


    image.png

    4.10.2 Semi-Join(IN) 去关联化

    这里指的是关联子查询,就是在IN子查询内部,需要用到外部表的信息,即产生了关联。

    image.png
    Presto会去除这种关联化,首先让子查询去除与外部的关联条件(本例子中是WHERE p.partkey=l.partkey)之后,计算一次结果,然后利用该结果和外部关联表进行inner join,最后根据IN的key进行filter,最终需要手动去重得到唯一结果。

    By the way, 对于不关联的Semi-Join,Presto采用哈希表来完成。

    4.11 基于代价的优化器

    cost-based优化器(CBO)的一个重要特征:不仅考虑query语句的“形状”,还要考虑输入数据的特征。
    Presto的代价由三个部分组成:CPU时间+内存需求+网络带宽占用。

    4.11.2 Join的代价

    Presto可以基于一个扩展的Hash Join算法进行分布式join。主要思路是,将join的一对表分成build side和probe side。首先基于build side的join key构建哈希表,然后probe side去和hash table进行探测,匹配成功的emit。具体来讲,分成三阶段,构成三层哈希以支持并行:

    1. 首先根据join key的hash范围将build side 和 probe side两张表的数据发送到各个worker node上去,这是第一层哈希f(x);
    2. 在每个worker node内部,将build side按照join key再次哈希分散到各个线程上去,这是第二层哈希g(x);
    3. 在每个线程内部,对build side的join key构建哈希表,这是第三层哈希h(x)。第三层哈希表构建结束之后,交给worker node进行合并;合并完成之后,probe side将join key按照h(x)哈希并在本地哈希表中查找,返回结果,这个探测过程可以直接将probe side数据等分给各个线程查找即可。

    可能会有疑问,第三层的哈希为什么要合并?不合并也同样可以分布式查询,即将probe side数据按照g(x)分散给各个线程,再按照h(x)探测各个线程的哈希表,这样就不必合并了。
    这确实是一种选择,但是考虑到这种方案probe side首先要按照g(x)在本地分散数据,这是一个额外的负担,而presto的方案直接将probe side数据等分即可,考虑到probe side一般是个更大的表,所以presto的选择是更佳的。
    另外,为了合并的方便,可以直接取g(x)=h(x),这样合并就是个O(1)的操作。

    接下来对分布式hash join进行代价分析:build side的数据必须全部放置于内存以快速构建和emit,这个内存开销是很大的;join两侧的表都必须通过网络shuffle广播,这个代价也非常大。
    为了控制内存开销,CBO会选择小表来当build side。这依赖于表的统计信息。

    Join的优化:动态过滤

    一般来说,小表的join key值域范围比大表的join key值域范围要小,对于这些超出值域范围的大表Record,1)成段/分区的数据都没有必要读到Presto中来;2)散装的数据没有必要通过网络shuffle。而build side在构建hash table时统计下值域范围,几乎是0代价的,只需要告诉下上游(一般还是本节点,所以是被一个driver控制的进程)分发大表数据的节点即可。
    由于这种筛选条件,ORC和Parquet能更好地对动态过滤进行利用。目前仅广播式join才有这种优化,分布式join汇总数据范围会很麻烦(不是本节点控制大表读取)。


    image.png
    image.png
    Join的优化:动态分区剪裁

    在数据仓库(比如Hive)中,举一个例子,事实表(订单表)和维度表(时间表)相join的例子,下面这个SQL。

    SELECT COUNT(*) FROM 
    store_sales JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk
    WHERE d_following_holiday='Y' AND d_year = 2000;
    

    维度表的谓词会下推到Hive ORC,进行分区剪裁,惰性读取等各种优化(后面讲Hive Connector会讲到),会提取出来一张小表,很高效,没问题。但是对于庞大的事实表呢,会全量读取,这太可怕了。
    这时候大家会想到,刚才有一个优化动态过滤技术,正好用的上。确实,但是只能用在广播式join和ORC/Parquet上,还是有限制,而且即使使用了,也要逐个分区逐个文件扫描一下,才能确定剪裁,我们想从分区层面直接剪裁掉。
    具体实现:对动态过滤进行扩展,首先collect小表的join key range,将其传递给coordiantor,然后让coordinator根据HMS的信息去裁剪,然后再进行splits分配。这样的话,完全没有对join方式和存储格式的限制。

    4.11.3 表统计信息

    SPI Statistics包括以下信息:

    1. 行的数量;
    2. 列中唯一值的数量(COUNT(DISTINCT ));
    3. 列中空值比例;
    4. 列中极值;
    5. 列的平均数据大小。

    CBO可以利用这些统计信息进行代价度量。
    举一个例子,对于刚才的join过程,CBO可以利用这些统计信息中的行数量、列平均数据大小,判断表的大小,以此来决定join的顺序。
    比如,如下的join顺序就是纯基于内存考虑的,从最大的lineitem开始进行join,build side依次用更小的表:


    image.png

    然而,如果从总的代价上来考虑,我们开启join顺序重排参数,则会选择小表先join,然后是大表,以避免大表造成多次网络shuffle传输:


    image.png
    对于不同的表和不同的集群,就会有不同的查询方案,这就是CBO的价值。

    4.11.4 过滤统计信息

    刚才我们用的两条统计信息来进行join顺序重排是不够的,举个例子,如果最大的lineitem表有一个等值谓词限定条件,他会从最大表,立即变成最小表(谓词下推),那么join顺序就完全不一样了,如下图(编者注:图中的join条件应该是写错了,图来自书中原图)


    image.png

    我们可以利用lineitem partkey的空值比例,极值,唯一值个数,算出平均一个值有几行(假设均匀分布):


    image.png

    当然,当数据明显偏斜时,这个方案就会产生问题,如果数据源能提供histogram,那么CBO就可以更精准,避免skew的问题。
    对于有分区表和分区统计信息的外部源,CBO也会直接利用分区统计。

    4.11.6 join顺序枚举

    • 分区枚举:在多个表进行join的场景,顺序至关重要,而n个表join的顺序有\Theta (n!)种。Presto默认会每9个表,优化一次顺序,否则代价太高了。
    • 动态规划:join顺序枚举问题,存在最优子结构,Presto采用自顶向下(递归式)的动态规划来解决这个问题。
    • 忽略Cross-join:几乎所有的optimizer都会跳过cross-join,包括Presto,所以cross-join的表不在这里参与排序。
    • 两种search space:一般的优化器只考虑left deep模式的join方案,Presto发现Bushy tree的join方案在分布式场景下可能会更好,而且枚举代价也不大,所以Presto枚举这两种搜索空间。


      image.png

    了解了上述基本思路,我们来描述枚举流程:

    1. 把可以重新定序的nodes和它们之间的谓词限定关系,放到一个multi-join node上去;也就是在一个multi-join node中,所有node是可以自由乱序的,任两个表之间都可以进行join(不包括cross-join)。注意到不是所有的joins都能自由乱序,那他们就不在multi-join nodes里。再注意下,任意两个表之间的join谓词关系,实际上是通过谓词同等推断(equality inference)算法导出的,这个算法就不在这里详述了。


      image.png
    2. 接下来,对于每个multi-join node,我们都采用带备忘录的分治策略,将node中的tables和谓词partition,同时保留每个子过程返回的结果,写进内存备忘录,之后会用到;注意到,这里的partition共有2^{n-1}种方案(Cn1+Cn2+...+Cn (n/2)),然而由于我们只搜索两种树形,所以只会搜索杨辉三角一行中的末端(Cn1)和中间部分区域(Cn n/2),由于中间部分最大,所以方案数仍然是\Theta(2^{n}),但是由于过渡段的大部分剪枝,所以前面的常量因子会很小,仍然在接受的范围内;另一方面,由于动态规划的备忘录,会有大量的剪枝;
      image.png
    3. 对于每一种partition方案,我们都要将两个partition合并join,合并join有两种物理执行方式:分布式和广播式(下一节讲到),再次做个选择,选好了之后,记录在备忘录里;
      image.png
    4. 备忘录需要的部分填充好了之后,我们就可以选择最好的方案实施join,实施过程中,仍要不断查看备忘录。


      image.png

    4.11.7 广播式Join和分布式join策略

    这里是说Hash Join的两种具体策略:

    • 广播式:每个worker node都保存build side的一个完整副本和一个完整的哈希表,worker node直接从外部数据源取数据进行join,无需在网络中广播分区probe side。


      image.png
    • 分布式:和之前我们说的hash join流程一致,首先按照f(x)对两侧的表分区,分到不同的worker node,在各个worker node完全独立并行地执行join。

    优劣比较:对于build side非常小或者probe side非常大的情况下,广播式一般会更好,因为避免了probe side在网络中全量传输一次;但是如果两侧表都很大,必须分区入驻内存的话,就要分布式join了。
    可以看到,这种选择必须用CBO,还要把filter信息考虑进去。
    Presto对外部数据源的ANALYZE,可以对统计信息进行收集,对于通过Presto ETL来写入的数据,可以在写入时直接ANALYZE,更方便。

    第六章-第七章 连接器

    6.2 RDBMS-PostgreSQL Connector

    一般来说,任务下发给一个worker node,worker node与postgres server通过JDBC相连接,一个表就会有一个JDBC连接,对于涉及多个表的SQL,可能就会有多个worker noder和多条JDBC连接。这也是RDBMS connector提供的最多的并行性,无论底层的RDBMS是不是分布式的,并行读取数据是不存在的。

    6.2.1 查询下推

    Presto可以将一条SQL或SQL的部分下推到数据源,让数据源先做一部分查询的处理,减少传输给Presto的数据,减少开销。
    举个例子,对于Presto连接Postgres,Presto收到这样一条SQL:


    image.png

    Presto的Worker node会让postgres执行下面这个SQL,告诉自己结果:


    image.png
    可以看到,下推了两个部分:投影列和筛选条件。
    Trino新版本中,聚合、Join、LIMIT、TopN也可以下推。
    具体某个Connector是否支持,要看具体说明了。

    6.4 DFS-Hive Connector

    • Hive Connector不仅用于Hive,而且可以用于各种分布式存储平台。非Hadoop分布式存储平台要想办法搞一个HMS的替代品,要不然就不能用Hive Connector。
    • presto仅仅查看Hive MetaStore中的信息,而对Hive引擎却从不使用,当然也不会有任何查询下推,只是读取数据而已。
    • Presto可以利用到Hive的分区特性和SQL中的分区列,这是Hive最重要的Feature。
    • Presto为Hive中的ORC Parquet RCBinary RCText几种文件格式做了大量的优化,保证读取效率。
    image.png
    image.png

    虽然Hive引擎不会参与到Presto的SQL计算中,但是Hive中特定的文件格式可以参与进来,因为很多流行的格式如ORC RCBinary Parquet等提供了很多统计信息可以利用。
    我们以最常用的ORC为例,ORC提供向量化列式读取的接口,这正好presto的需求;ORC Reader还做了两个重要的优化,一个是谓词下推,一个是惰性读取。

    • 谓词下推:ORC File中从文件到每10000行在三个级别上都会保存极值,谓词下推可以让ORC Reader跳过很多无关的数据段;
    • 惰性读取:对于随机性强的谓词列,下推没有太大意义,为了保证效率,ORC Reader会现在谓词列进行读取然后计算谓词,计算完之后找到相关的Segments,去其他列对应位置补充信息。


      image.png

      这两条性质,让Presto for Hive Connector有了3~8倍的性能提升。

    • 谓词过滤顺序:不同的谓词有不同的筛选性,先选择筛选性强的,后选择筛选性弱的,能获得更好的性能,ORC Reader还需要对谓词的筛选性进行评估。另外,是否对筛选通过的数据缓存,也是需要考量的。


      image.png
    • 复杂类型查询下推:通常对于一个复杂类型,我们只需要它的某些子字段,那么就可以把这种投影操作直接下推到TableScan算子的直接下游去;对于复杂类型的查询,也可以进行同样的谓词下推。

    7.2 Nosql-Accumulo

    Accumulo在架构上和Hbase几乎是一模一样。但是Hbase已经有Phoneix了,所以presto直接连Phoneix连接Hbase即可。所以这里挑了个替代品Accumulo来说明些问题。

    • Presto将Nosql中的rowkey拆分以打平一条key-value record成多条关系型数据,以此对应关系型模型。
    • 对于NoSQL,coordiator首先和数据库连接获取元数据,然后根据查询范围确定region,对于不同的region server,可以并行读取
      image.png
    • Presto可以利用Nosql的二级索引进行查询下推;

    7.6 联邦查询

    Presto可以在一条SQL内同时对多个数据源的表进行查询,对用户来讲非常透明。


    image.png

    联邦查询也有查询下推,原则仍然是,数仓类型的不下推;Nosql只下推投影和过滤;RDBMS部分可以下推join和聚合。


    image.png

    7.7 ETL

    Presto也支持ETL,不过不如只做ETL的一些商业化工具做得好。但是Presto也有自己的优势,第一就是通过联邦查询你有可能不需要ETL了,第二就是Presto的ETL全部都是SQL,在多个数据源上切换很方便。

    第十二章 生产环境中的Presto

    12.2 Presto SQL查询调优

    这里主要讲了几个调优的抓手点:

    • EXPLAIN Join的顺序
    • Join预聚合
    • GROUP BY key预聚合
    • Join散列表大小

    12.3 内存管理

    Presto的内存管理都是基于worker node JVM来说的。
    用户内存:客户端查询中的操作,如聚合、排序等占用的内存;
    系统内存:查询引擎用的内存,比如input/output buffer, table scan buffer等。
    Presto有个初始散列分区数的概念,如果设为8的话,假设query.max-memory(用户内存)=50GB,平均每个worker node用户内存占用为50GB/8=6.25GB,如果query.max-memory-per-node设为13GB的话,那么这个配置可允许超过平均值两倍的数据倾斜。
    Presto的所有Worker node应该同等配置,否则无法妥善利用。

    12.4 任务并发性

    • task work threads:默认是CPU核数的2倍,也可以手动增加,不过要考虑上下文切换的代价是否能被cover;
    • operator concurrecy:这里的并发实际上说的是并行度,算子(比如join,aggregation)在本地的一个并行度,并行度越高,上下文切换时代价也越大。

    12.5 工作节点调度

    • 每个工作节点处理split的数量上限、每个task的splits排队上限都可以适当提高以增加单节点负载;
    • 如果有数据缓存(RubiX),或者存储和计算节点不是完全隔离的话,可以让Presto调度器的队列中保存更多空间给本地数据,减少网络传播,通常这个模式都会被打开。

    12.6 网络数据交换

    • 下游向上游拉数据的线程数,可能会增强吞吐量,但是注意内存使用;
    • input/output buffer大小,默认32MB,确实太小了,可以放大一些,避免back pressure机制生效,降低查询效率。

    性能评估(Benchmark)

    对于CBO性能提升的评估:

    实验和数据来自starburst blog:https://blog.starburstdata.com/technical-blog/presto-cost-based-optimizer-rocks-the-tpc-benchmarks
    实验在8个HDFS数据节点上部署presto worker,1个coordiantor和NameNode、HMS共享节点。
    每个节点24核、256GB内存,160GB堆内存;ORC格式,Zlib压缩。
    80%以上的查询都在1TB数据集(TPC-DS)以上,包括10TB。

    image.png image.png

    相关文章

      网友评论

          本文标题:Presto/Trino权威指南及官方设计文档解读

          本文链接:https://www.haomeiwen.com/subject/mexzrltx.html