1. SQLContext的创建
SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行,其创建方式如下:
//sc为SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
其对应的源码为:
@Stable
class SQLContext private[sql](val sparkSession: SparkSession) extends Logging with Serializable
def sparkContext: SparkContext = sparkSession.sparkContext
其调用的是私有的主构造函数:
以前版本如下:
//1.主构造器中的参数CacheManager用于缓存查询结果
//在进行后续查询时会自动读取缓存中的数据
//2.SQLListener用于监听Spark scheduler事件,它继承自SparkListener
//3.isRootContext表示是否是根SQLContext
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean)
extends org.apache.spark.Logging with Serializable {
后续版本都是在SparkSession中来操作:
private[sql] def sessionState: SessionState = sparkSession.sessionState
private[sql] def sharedState: SharedState = sparkSession.sharedState
private[sql] def conf: SQLConf = sessionState.conf
def sparkContext: SparkContext = sparkSession.sparkContext
/**
* Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary
* tables, registered functions, but sharing the same `SparkContext`, cached data and
* other things.
*
* @since 1.6.0
*/
def newSession(): SQLContext = sparkSession.newSession().sqlContext
/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
* that listen for execution metrics.
*/
@Experimental
@Evolving
def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
当spark.sql.allowMultipleContexts设置为true时,则允许创建多个SQLContexts/HiveContexts,创建方法为newSession
def newSession(): SQLContext = sparkSession.newSession().sqlContext
SparkSession.scala
def newSession(): SparkSession = {
new SparkSession(sparkContext, Some(sharedState), parentSessionState = None, extensions)
}
2. 核心成员变量 ——catalog
/**
* Catalog interface for Spark. To access this, use `SparkSession.catalog`.
*
* @since 2.0.0
*/
@Stable
abstract class Catalog {
catalog用于注销表、注销表、判断表是否存在等,
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
def createExternalTable(tableName: String, path: String): DataFrame = {
sparkSession.catalog.createTable(tableName, path)
}
SimpleCatalog整体源码如下:(在最新的版本2.4中已经没有)
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
private[this] val tables = new ConcurrentHashMap[String, LogicalPlan]
override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = {
tables.put(getTableName(tableIdent), plan)
}
override def unregisterTable(tableIdent: TableIdentifier): Unit = {
tables.remove(getTableName(tableIdent))
}
override def unregisterAllTables(): Unit = {
tables.clear()
}
override def tableExists(tableIdent: TableIdentifier): Boolean = {
tables.containsKey(getTableName(tableIdent))
}
override def lookupRelation(
tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
val tableName = getTableName(tableIdent)
val table = tables.get(tableName)
if (table == null) {
throw new NoSuchTableException
}
val tableWithQualifiers = Subquery(tableName, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
alias.map(a => Subquery(a, tableWithQualifiers)).getOrElse(tableWithQualifiers)
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
tables.keySet().asScala.map(_ -> true).toSeq
}
override def refreshTable(tableIdent: TableIdentifier): Unit = {
throw new UnsupportedOperationException
}
}
3. 核心成员变量 ——sqlParser
sqlParser在SQLContext的定义:
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
SparkSQLParser为顶级的Spark SQL解析器,对Spark SQL支持的SQL语法进行解析,其定义如下:
private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser
Spark SQL Dialect支持的关键字包括:
protected val AS = Keyword("AS")
protected val CACHE = Keyword("CACHE")
protected val CLEAR = Keyword("CLEAR")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val FUNCTION = Keyword("FUNCTION")
protected val FUNCTIONS = Keyword("FUNCTIONS")
protected val IN = Keyword("IN")
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
protected val SHOW = Keyword("SHOW")
protected val TABLE = Keyword("TABLE")
protected val TABLES = Keyword("TABLES")
protected val UNCACHE = Keyword("UNCACHE")
4. 核心成员变量 ——ddlParser
用于解析DDL(Data Definition Language 数据定义语言)
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
其支持的关键字有:
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
protected val IF = Keyword("IF")
protected val NOT = Keyword("NOT")
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
protected val REFRESH = Keyword("REFRESH")
主要做三件事,分别是创建表、描述表和更新表
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
createTable方法具有如下(具体功能参考注释说明):
/**
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] = {
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ tableIdentifier ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableIdent ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
val options = opts.getOrElse(Map.empty[String, String])
if (query.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}
val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableIdent,
provider,
temp.isDefined,
Array.empty[String],
mode,
options,
queryPlan)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableIdent,
userSpecifiedSchema,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}
}
}
describeTable及refreshTable代码如下:
/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,comment
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
case e ~ tableIdent =>
DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
REFRESH ~> TABLE ~> tableIdentifier ^^ {
case tableIndet =>
RefreshTable(tableIndet)
}
https://blog.csdn.net/lovehuangjiaju/article/details/50427650
总结:在最新代码2.4版本中,很多代码已经不同了
网友评论