美文网首页
SparkConf解析

SparkConf解析

作者: LZhan | 来源:发表于2019-07-28 19:28 被阅读0次

参考博客来自微信公众号暴走大数据

1.SparkCon基本作用
SparkConf负责管理所有Spark的配置项,我们在使用Spark的过程中,经常需要灵活配置各种参数,来使程序更好、更快地运行。

2.SparkConf的构造方法

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

  import SparkConf._

  /** Create a SparkConf that loads defaults from system properties and the classpath */
  def this() = this(true)

  private val settings = new ConcurrentHashMap[String, String]()

// ...

  if (loadDefaults) {
    loadFromSystemProperties(false)
  }

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }

// ...
}

分析:
<1>
extends ... with ...,这里继承了多个特质,物质就类似于Java中的interface;

(1)Logging用来打日志;
(2)Serializable,序列化,分布式环境,SparkConf对象传来传去的,当然需要序列化;
(3)Conleable,看到下面其重载了clone()方法,其实就是生成了一个配置一样的SparkConf对象。
目的是避免多个组件共用同一个SparkConf对象时出现的并发问题,不同组件都使用,clone一个给你,任何地方要使用SparkConf对象,调用clone方法复制一个,十分优雅
<2>
SparkConf类有一个主构造方法参数loadDefaults,它指示是否要从Java系统属性(即System.getProperties()取得的属性)加载默认的与Spark相关的配置。
当loadDafaults为true时,才会去调用loadFromSystemProperties方法


3.Spark配置项的存储
SparkConf内部是采用ConcurrentHashMap来维护所有配置项键值的。
设置配置项的3种方法:
<1> 直接使用set()方法

def set(key: String, value: String): SparkConf = {
    set(key, value, false)
  }

  private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
    if (key == null) {
      throw new NullPointerException("null key")
    }
    if (value == null) {
      throw new NullPointerException("null value for " + key)
    }
    if (!silent) {
      logDeprecationWarning(key)
    }
    settings.put(key, value)
    this
  }

可见配置项的键值都不能为null。并且包括set()在内的所有Set类方法都返回this,所以支持链式调用,这样使用起来比较简洁。
另外,还有一些方法可以快速设置常用配置项,比如上篇代码#0.1中出现过的setMaster()与setAppName()。它们最终也会调用set()方法。

  /**
   * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
   * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
   */
  def setMaster(master: String): SparkConf = {
    set("spark.master", master)
  }

  /** Set a name for your application. Shown in the Spark web UI. */
  def setAppName(name: String): SparkConf = {
    set("spark.app.name", name)
  }

<2> 通过系统属性加载
直接调用SparkConf的构造方法,会默认将loadDafaults设置为true,这时候就会加载System.setProperty()方法中,以spark.开头的属性

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
    // Load any spark.* system properties
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value, silent)
    }
    this
  }

使用通用工具类Utils中的方法取得系统属性,过滤出以字符串“spark.”为前缀的键,然后调用set()方法设置键值。由于系统属性相关的参数是一次性初始化的,所以用Set类方法设置的值可以覆盖它们

<3> 克隆SparkConf

override def clone: SparkConf = {
    val cloned = new SparkConf(false)
    settings.entrySet().asScala.foreach { e =>
      cloned.set(e.getKey(), e.getValue(), true)
    }
    cloned
  }

虽然ConcurrentHashMap保证线程安全,不会影响SparkConf实例共享,但在高并发的情况下,锁机制可能会带来性能问题。我们就可以克隆SparkConf到多个组件中,以让它们获得相同的配置参数。

4.获取Spark配置项
获取配置项只有一个途径,即调用Get类方法。Get类方法同样有很多实现,基础的get()与getOption()如下所示。

5.检验配置项
SparkConf中有一个方法validateSettings(),用来校验配置项。它的源码很长,但是逻辑比较简单,主要是对过期配置项进行警告,以及对非法设置或不兼容的配置项抛出异常。

相关文章

网友评论

      本文标题:SparkConf解析

      本文链接:https://www.haomeiwen.com/subject/dcbprctx.html