背景
Facebook的数据仓库存储在少量大型Hadoop/HDFS集群。Hive是Facebook在几年前专为Hadoop打造的一款数据仓库工具。在以前,Facebook的科学家和分析师一直依靠Hive来做数据分析。但Hive使用MapReduce作为底层计算框架,是专为批处理设计的。但随着数据越来越多,使用Hive进行一个简单的数据查询可能要花费几分到几小时,显然不能满足交互式查询的需求。Facebook也调研了其他比Hive更快的工具,但它们要么在功能有所限制要么就太简单,以至于无法操作Facebook庞大的数据仓库。
2012年开始试用的一些外部项目都不合适,他们决定自己开发,这就是Presto。2012年秋季开始开发,目前该项目已经在超过 1000名Facebook雇员中使用,运行超过30000个查询,每日数据在1PB级别。Facebook称Presto的性能比Hive要好上10倍多。2013年Facebook正式宣布开源Presto。
Presto架构
presto架构
Presto查询引擎是一个Master-Slave的架构,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。Coordinator负责解析SQL语句,生成执行计划,分发执行任务给Worker节点执行。Worker节点负责实际执行查询任务。Worker节点启动后向Discovery Server服务注册,Coordinator从Discovery Server获得可以正常工作的Worker节点。如果配置了Hive Connector,需要配置一个Hive MetaStore服务为Presto提供Hive元信息,Worker节点与HDFS交互读取数据。
Presto是一个交互式的查询引擎,我们最关心的就是Presto实现低延时查询的原理,特点如下:
1、完全基于内存的并行计算
2、流水线
3、本地化计算
4、动态编译执行计划
5、小心使用内存和数据结构
6、类BlinkDB的近似查询
7、GC控制
Presto查询执行过程
和常用的数据查询引擎一致,查询过程包括以下几个部分。
1、提交查询
Presto Cli提交一个查询语句后,Cli使用HTTP协议与Coordinator通信,Coordinator收到查询请求后调用SqlParser解析SQL语句得到Statement对象,并将Statement封装成一个QueryStarter对象放入线程池中等待执行。
提交查询2、编译、解析为逻辑执行计划
和hive一样,presto使用Antlr编写SQL语法,语法规则定义在Statement.g和StatementBuilder.g两个文件中。
从SQL编译为最终的物理执行计划大概分为5部,最终生成在每个Worker节点上运行的LocalExecutionPlan,SQL解析为逻辑执行计划的过程,通过一个SQL语句来理解查询计划生成之后的计算过程。
解析流程样例SQL:
selectc 1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group byc 1.rank limit10;
逻辑执行计划3、生成物理执行计划
生成的逻辑执行计划根据是否有shuffle数据拆分为多个sunplan,每一个SubPlan都会提交到一个或者多个Worker节点上执行。
物理执行计划
完全基于内存的并行计算
Presto SQL的执行流程如下图所示。
1.Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行
2.每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行
3.每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行
4.Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行
查询执行流程上面的执行计划实际执行效果如下图所示。
1.Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点(图中蓝色箭头)
2.SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作
3.SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点
4.所有SubPlan2节点计算完成后将数据分发到SubPlan3节点
5.SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator
执行计划计算流程待补充完善。。。。
网友评论