参考博客来自微信公众号暴走大数据
1.SparkCon基本作用
SparkConf负责管理所有Spark的配置项,我们在使用Spark的过程中,经常需要灵活配置各种参数,来使程序更好、更快地运行。
2.SparkConf的构造方法
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
// ...
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
// ...
}
分析:
<1>
extends ... with ...,这里继承了多个特质,物质就类似于Java中的interface;
(1)Logging用来打日志;
(2)Serializable,序列化,分布式环境,SparkConf对象传来传去的,当然需要序列化;
(3)Conleable,看到下面其重载了clone()方法,其实就是生成了一个配置一样的SparkConf对象。
目的是避免多个组件共用同一个SparkConf对象时出现的并发问题,不同组件都使用,clone一个给你,任何地方要使用SparkConf对象,调用clone方法复制一个,十分优雅
<2>
SparkConf类有一个主构造方法参数loadDefaults,它指示是否要从Java系统属性(即System.getProperties()取得的属性)加载默认的与Spark相关的配置。
当loadDafaults为true时,才会去调用loadFromSystemProperties方法。
3.Spark配置项的存储
SparkConf内部是采用ConcurrentHashMap来维护所有配置项键值的。
设置配置项的3种方法:
<1> 直接使用set()方法
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
可见配置项的键值都不能为null。并且包括set()在内的所有Set类方法都返回this,所以支持链式调用,这样使用起来比较简洁。
另外,还有一些方法可以快速设置常用配置项,比如上篇代码#0.1中出现过的setMaster()与setAppName()。它们最终也会调用set()方法。
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
<2> 通过系统属性加载
直接调用SparkConf的构造方法,会默认将loadDafaults设置为true,这时候就会加载System.setProperty()方法中,以spark.开头的属性
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
使用通用工具类Utils中的方法取得系统属性,过滤出以字符串“spark.”为前缀的键,然后调用set()方法设置键值。由于系统属性相关的参数是一次性初始化的,所以用Set类方法设置的值可以覆盖它们。
<3> 克隆SparkConf
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
虽然ConcurrentHashMap保证线程安全,不会影响SparkConf实例共享,但在高并发的情况下,锁机制可能会带来性能问题。我们就可以克隆SparkConf到多个组件中,以让它们获得相同的配置参数。
4.获取Spark配置项
获取配置项只有一个途径,即调用Get类方法。Get类方法同样有很多实现,基础的get()与getOption()如下所示。
5.检验配置项
SparkConf中有一个方法validateSettings(),用来校验配置项。它的源码很长,但是逻辑比较简单,主要是对过期配置项进行警告,以及对非法设置或不兼容的配置项抛出异常。
网友评论