Catalyst
Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架。
目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail。
以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程。
Catalyst定位
其他系统如果想基于Spark做一些类sql、标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器、执行计划树结构、逻辑执行计划的处理规则体系等类体系来实现执行计划的解析、生成、优化、映射工作。
对应上图中,主要是左侧的TreeNodelib及中间三次转化过程中涉及到的类结构都是Catalyst提供的。至于右侧物理执行计划映射生成过程,物理执行计划基于成本的优化模型,具体物理算子的执行都由系统自己实现。
Catalyst现状
在解析器方面提供的是一个简单的scala写的sql parser,支持语义有限,而且应该是标准sql的。
在规则方面,提供的优化规则是比较基础的(和Pig/Hive比没有那么丰富),不过一些优化规则其实是要涉及到具体物理算子的,所以部分规则需要在系统方那自己制定和实现(如spark-sql里的SparkStrategy)。
Catalyst也有自己的一套数据类型。
TreeNode体系
TreeNode是Catalyst执行计划表示的数据结构,是一个树结构,具备一些scala collection的操作能力和树遍历能力。这棵树一直在内存里维护,不会dump到磁盘以某种格式的文件存在,且无论在映射逻辑执行计划阶段还是优化逻辑执行计划阶段,树的修改是以替换已有节点的方式进行的。
TreeNode,内部带一个children: Seq[BaseType]表示孩子节点,具备foreach、map、collect等针对节点操作的方法,以及transformDown(默认,前序遍历)、transformUp这样的遍历树上节点,对匹配节点实施变化的方法。
提供UnaryNode,BinaryNode, LeafNode三种trait,即非叶子节点允许有一个或两个子节点。
TreeNode提供的是范型。
TreeNode有两个子类继承体系,QueryPlan和Expression。QueryPlan下面是逻辑和物理执行计划两个体系,前者在Catalyst里有详细实现,后者需要在系统自己实现。Expression是表达式体系,后面章节都会展开介绍。
Tree的transformation实现:
传入PartialFunction[TreeType,TreeType],如果与操作符匹配,则节点会被结果替换掉,否则节点不会变动。整个过程是对children递归执行的。
执行计划表示模型
逻辑执行计划
QueryPlan继承自TreeNode,内部带一个output: Seq[Attribute],具备transformExpressionDown、transformExpressionUp方法。
在Catalyst中,QueryPlan的主要子类体系是LogicalPlan,即逻辑执行计划表示。其物理执行计划表示由使用方实现(spark-sql项目中)。
LogicalPlan继承自QueryPlan,内部带一个reference:Set[Attribute],主要方法为resolve(name:String): Option[NamedeExpression],用于分析生成对应的NamedExpression。
LogicalPlan有许多具体子类,也分为UnaryNode, BinaryNode, LeafNode三类,具体在org.apache.spark.sql.catalyst.plans.logical路径下。
逻辑执行计划实现
LeafNode主要子类是Command体系:
各command的语义可以从子类名字看出,代表的是系统可以执行的non-query命令,如DDL。
UnaryNode的子类:
BinaryNode的子类:
物理执行计划
另一方面,物理执行计划节点在具体系统里实现,比如spark-sql工程里的SparkPlan继承体系。
物理执行计划实现
每个子类都要实现execute()方法,大致有以下实现子类(不全)。
LeafNode的子类:
UnaryNode的子类:
BinaryNode的子类:
提到物理执行计划,还要提一下Catalyst提供的分区表示模型。
执行计划映射
Catalyst还提供了一个QueryPlanner[Physical <: TreeNode[PhysicalPlan]]抽象类,需要子类制定一批strategies: Seq[Strategy],其apply方法也是类似根据制定的具体策略来把逻辑执行计划算子映射成物理执行计划算子。由于物理执行计划的节点是在具体系统里实现的,所以QueryPlanner及里面的strategies也需要在具体系统里实现。
在spark-sql项目中,SparkStrategies继承了QueryPlanner[SparkPlan],内部制定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等几种策略,每种策略接受的都是一个LogicalPlan,生成的是Seq[SparkPlan],每个SparkPlan理解为具体RDD的算子操作。
比如在BasicOperators这个Strategy里,以match-case匹配的方式处理了很多基本算子(可以一对一直接映射成RDD算子),如下:
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil
Expression体系
Expression,即表达式,指不需要执行引擎计算,而可以直接计算或处理的节点,包括Cast操作,Projection操作,四则运算,逻辑操作符运算等。
具体可以参考org.apache.spark.sql.expressionspackage下的类。
Rules体系
凡是需要处理执行计划树(Analyze过程,Optimize过程,SparkStrategy过程),实施规则匹配和节点处理的,都需要继承RuleExecutor[TreeType]抽象类。
RuleExecutor内部提供了一个Seq[Batch],里面定义的是该RuleExecutor的处理步骤。每个Batch代表着一套规则,配备一个策略,该策略说明了迭代次数(一次还是多次)。
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
Rule[TreeType <: TreeNode[_]]是一个抽象类,子类需要复写apply(plan: TreeType)方法来制定处理逻辑。
RuleExecutor的apply(plan: TreeType): TreeType方法会按照batches顺序和batch内的Rules顺序,对传入的plan里的节点迭代处理,处理逻辑为由具体Rule子类实现。
https://blog.csdn.net/pelick/article/details/37809681?utm_source=tuicool
网友评论