- SparkContext重要源码
1.1 SparkContext 注释
Main entry point for Spark functionality.
spark功能的入口点(也就是说必须要存在SparkContext才能继续向下执行)
1.2 SparkContext的构造函数
class SparkContext(config: SparkConf)
从上可知,如果想要使用SparkContext,那么必须要有SparkConf
如果使用spark-shell,spark-shell程序启动时就创建了一个SparkContext,如下图所示
- SparkConf重要源码
2.1 注释
(1) Configuration for a Spark application
Spark应用程序的配置
(2) Most of the time, you would create a SparkConf object with
`new SparkConf()`, which will load values from any `spark.*`
Java system properties set in your application as well
大多数情况下,你将会使用new SparkConf()作为参数来创建SparkContext对象,
SparkConf 将导入在应用中的java的system properties中任何以spark.开头的配置
(不论配置是官方提供的,还是自定义的都是以spark.开头的)
2.1.1 自定义spark参数
命令行启动spark-shell
./spark-shell --master local[2] --conf name=henry --conf spark.age=18
任何 --conf之后的key-value对都会加载到sparkConf中
执行spark-shell
上图中第一行,name=henry是non-spark的config, 被spark-shell忽略掉
从spark web ui上也可以清楚看见只生效了spark.age
2.1.2 从源码角度上分析只生效以spark.开头的配置
在SparkSubmitArguments的loadEnvironmentArguments方法中
2.1.2.1 以--master为例
//加载环境变量
private def loadEnvironmentArguments(): Unit{
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
}
首先去寻找spark.master
(wordcount代码第一行的setMaster底层就是给spark.master赋值)
如果没有找到spark.master的值,就去环境变量里去找MASTER
2.1.2.2 以--executor-memory为例
--executor-memory参数是设置每个executor使用的内存大小,默认是1G
private def loadEnvironmentArguments(): Unit{
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
.orNull
}
与--master类似,也是先寻找spark.executor.memory
setMaster()源码
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
2.1.3 如何获得spark的默认配置参数
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
//注释1
Option(propertiesFile).foreach { filename =>
//注释2
val properties = Utils.getPropertiesFromFile(filename)
properties.foreach { case (k, v) =>
defaultProperties(k) = v
}
if (verbose) {
Utils.redact(properties).foreach { case (k, v) =>
logInfo(s"Adding default property: $k=$v")
}
}
}
defaultProperties
}
mergeDefaultSparkProperties()
//注释3
ignoreNonSparkProperties()
private def mergeDefaultSparkProperties(): Unit = {
propertiesFile = Option(propertiesFile).
getOrElse(Utils.getDefaultPropertiesFile(env))
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
}
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
private def ignoreNonSparkProperties(): Unit = {
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
logWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
2.1.3.1 注释1
(1) 作用
```md
明确filename代表什么
(2) 作用代码
Option(propertiesFile).foreach { filename => {}
}
(3) 调用情况
defaultSparkProperties()==>mergeDefaultSparkProperties()==>getDefaultPropertiesFile()
(4) 流程简要说明
首先需要知道filename,也就是propertiesFile来自于哪里
之后在该类中发现, 如果不是人为设置,
该类里只有在mergeDefaultSparkProperties()对propertiesFile赋值
mergeDefaultSparkProperties()得到getDefaultPropertiesFile()的返回值
也就是说默认配置文件的路径
(5) 相关方法重要代码说明
getDefaultPropertiesFile
先从环境中找到SPARK_CONF_DIR
如果没有配置的话,就去环境变量中找SPARK_HOME
然后找到其conf文件夹中spark-defaults.conf文件
getDefaultPropertiesFile返回的就是默认的spark配置文件的路径
也就是说defaultSparkProperties的filename就是默认的spark配置文件的路径
2.1.3.2 注释2
(1) 作用
根据默认配置文件路径,获得键值对的properties
(2)作用代码
val properties = Utils.getPropertiesFromFile(filename)
(3) 调用情况
defaultSparkProperties() ==> getDefaultPropertiesFile()
(4) 流程简要说明
作用代码调用Utils.getPropertiesFromFile(filename)
properties是一个键值对
(5)相关方法重要代码说明
getPropertiesFromFile()
拿到文件名后,读取该文件,将其每一行都变成key-value pair
返回的是一个map
2.1.3.3 注释3
(1) 作用
从properties变量中移除key不是以spark.开头的key
(2) 作用代码
ignoreNonSparkProperties()
(3) 调用情况
defaultSparkProperties() ==> Utils.ignoreNonSparkProperties()
(4) 相关方法重要代码说明
ignoreNonSparkProperties()
循环sparkProperties取出key不是以spark.开头的
2.1.3.4 mergeDefaultSparkProperties()
(1) 作用
将命令行 --conf后添加的自定义spark参数放入sparkProperties中
(2) 调用情况
mergeDefaultSparkProperties ==> defaultSparkProperties()
(3) 相关代码重要说明
遍历defaultSparkProperties返回的defaultProperties(key-value pair), 变成sparkProperties的key-value pair
2.2 spark-defaults.conf
2.2.1 作用
保存spark所有公共属性
将需要配置的属性配置在该文件中
之后就不需要写命令行参数(./saprk-shell --master local[2])
因为系统会自动找到配置在该文件中的参数
2.2.2 为什么要自定义spark参数
当连接数据库时,数据库的url需要灵活改变,此时自定义参数发挥优势
2.3 如何得到自定义的参数的值
sc.getConf.get("spark.henry")
2.4 参数定义的位置与优先级
2.4.1 源码注释
In this case, parameters you set directly on the `SparkConf`
object take priority over system properties
2.4.2 翻译
配置在SparkConf的优先级比系统配置高
2.4.3 如何理解
首先在命令行输入
2.5 理解上述可以做什么
在文件夹henrydata-etl下有lib、shell、logs、conf文件夹
在conf下文件夹有etl-default.conf文件,里面保存的是需要的key-value
此时直接按照之前代码大意,调用scala工具类就可以解析出来
省去人为解析的麻烦
- sparkContext的重要源码解析
3.1 创建spark运行环境
_env = createSparkEnv(_conf, isLocal, listenerBus)
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
//定义driver端口
//key为spark.driver.bindAddress的值为端口值
private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.fallbackConf(DRIVER_HOST_ADDRESS)
3.1.2 create函数重要代码
使用反射创建实例
//使用反射使用指定类名创建实例
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}
3.2 WEB UI
private[spark] def ui: Option[SparkUI] = _ui
//默认情况下为true
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
def create(
sc: Option[SparkContext],
store: AppStatusStore,
conf: SparkConf,
securityManager: SecurityManager,
appName: String,
basePath: String,
startTime: Long,
appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {
//使用new sparkUI的方式创建UI
new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)
}
private[spark] class SparkUI private(
//存储信息
val store: AppStatusStore,
//环境变量内的信息
val sc: Option[SparkContext],
//配置信息
val conf: SparkConf,
securityManager: SecurityManager,
var appName: String,
val basePath: String,
val startTime: Long,
//版本信息,对应spark web ui的导航栏的版本号
val appSparkVersion: String
)
private[spark] abstract class WebUI(
val securityManager: SecurityManager,
val sslOptions: SSLOptions,
port: Int,
conf: SparkConf,
basePath: String = "",
name: String = ""
)
extends Logging {
//
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS"))
.getOrElse(conf.get(DRIVER_HOST_ADDRESS))
def webUrl: String = s"http://$publicHostName:$boundPort"
}
网友评论