一、开发一个spark应用
//初始化sparksession
val spark = SparkSession.builder.appName("SparkSQL Test").master("local[4]").getOrCreate()
//transform操作,生成dataframe,可继续执行dataframe相关dsl api,
val sqlDf = spark.sql("select count(*) from table")
//action操作,spark-core开始执行
sqlDf.show(false)
二、初始化sparksession-sessionState构造过程
//1:sparksession懒加载sessionstate
lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
state
}
}
//2:实例化sessionstate
/**
* Helper method to create an instance of `SessionState` based on `className` from conf.
* The result is either `SessionState` or a Hive based `SessionState`.
*/
private def instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState = {
try {
// invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
//默认:BaseSessionStateBuilder, hive:HiveSessionStateBuilder
ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
...
//3: 构建SessionState,初始化catalog、sqlparser、analyzer、optimzizer,内置函数以及udf函数等等
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}
注:QueryExecution、SessionState、BaseSessionStateBuilder之间的关系:
(1)QueryExecution的analyzed、optimizedPlan是懒加载的,被调用时实际调用的是SessionState中的analyzer、optimizer的相关方法做解析和优化
(2)SessionState的catalog、analyzer、optimizer、resourceLoader也是懒加载的,被调用时实际调用的是在BaseSessionStateBuilder初始化SessionState的时候生成的匿名函数
三、transform-生成dataframe-resolved logicalPlan
/*
1: paserplan生成unresolved logicalPlan, ofRows方法中调用QueryExecution.assertAnalyzed(),
其实是sparkSession.sessionState.analyzer.executeAndCheck(logical),
再使用定义的各种解析规则,resolving unresolved attributes and relations,生成resolved logicalPlan,
最终new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))生成dataframe*/
def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}
//2: 使用访问者模式,astBuilder遍历antlr sql语法树,解析成catalyst的ast语法树,生成unresolved的逻辑计划
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
//代码3:将sql命令传给antlr,使用SqlBase.g4生成的词汇解析器SqlBaseLexer和语法解析器SqlBaseParser,对词和语法校验
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
...
四、action-触发执行-优化逻辑计划,生成物理计划,转为rdd提交给sparkContex
//1:拉取20行数据到driver端,调用take(),最终调用head()
def show(): Unit = show(20)
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
//2:Wrap一个action,监控查询执行过程和时间花费,执行用户注册的回调函数
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
/*触发optimizer优化器采用一系列优化规则(eg:谓词下推)对resolved logicalPlan进行优化,
/sparkplanner选择出最优策略(eg:广播表)将optimizedPlan转化为sparkplan,
sparkplan应用一系列规则,转化为可预备执行的物理计划
*/
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
//调用collectFromPlan,交给spark-core,执行物理计划,转为rdd操作
action(qe.executedPlan)
}
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, qe, end - start)
result
} catch {
case e: Exception =>
sparkSession.listenerManager.onFailure(name, qe, e)
throw e
}
}
//3:QueryExecution中从优化到生成可预备执行的物理计划工作流
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}
/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
python.ExtractPythonUDFs,
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))
//4:执行自定义的回调函数函数,该函数底层最终执行sparkplan的do把物理计划转化为rdd操作
/**
* Collect all elements from a spark plan.
*/
private def collectFromPlan(plan: SparkPlan): Array[T] = {
// This projection writes output to a `InternalRow`, which means applying this projection is not
// thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
plan.executeCollect().map { row =>
// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
// parameter of its `get` method, so it's safe to use null here.
objProj(row).get(0, null).asInstanceOf[T]
}
}
//5:将sparkplan转为rdd,交给sparkContext提交job
/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[InternalRow] = {
//getByteArrayRdd调用execute(),再调用doExecute()方法,将sparkplan转为RDD
val byteArrayRdd = getByteArrayRdd()
val results = ArrayBuffer[InternalRow]()
//byteArrayRdd.collect()是rdd的action算子,会运行sc.runJob()提交job给spark集群
byteArrayRdd.collect().foreach { countAndBytes =>
decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
}
results.toArray
}
网友评论