美文网首页
SparkContxt重要源码

SparkContxt重要源码

作者: 436048bfc6a1 | 来源:发表于2019-02-24 18:05 被阅读0次
    1. SparkContext重要源码

    1.1 SparkContext 注释

    Main entry point for Spark functionality.
    spark功能的入口点(也就是说必须要存在SparkContext才能继续向下执行)
    

    1.2 SparkContext的构造函数

    class SparkContext(config: SparkConf)
    
    从上可知,如果想要使用SparkContext,那么必须要有SparkConf
    如果使用spark-shell,spark-shell程序启动时就创建了一个SparkContext,如下图所示
    
    
    1. SparkConf重要源码

    2.1 注释

    (1) Configuration for a Spark application
    
        Spark应用程序的配置
    
    (2) Most of the time, you would create a SparkConf object with 
        `new SparkConf()`, which will load values from any `spark.*` 
        Java system properties set in your application as well
    
        大多数情况下,你将会使用new SparkConf()作为参数来创建SparkContext对象,
        SparkConf 将导入在应用中的java的system properties中任何以spark.开头的配置
        (不论配置是官方提供的,还是自定义的都是以spark.开头的)
    
    

    2.1.1 自定义spark参数

    命令行启动spark-shell
      ./spark-shell --master local[2] --conf name=henry --conf spark.age=18 
    任何 --conf之后的key-value对都会加载到sparkConf中
    

    执行spark-shell


    上图中第一行,name=henry是non-spark的config, 被spark-shell忽略掉
    
    从spark web ui上也可以清楚看见只生效了spark.age
    

    2.1.2 从源码角度上分析只生效以spark.开头的配置

    在SparkSubmitArguments的loadEnvironmentArguments方法中
    

    2.1.2.1 以--master为例

    //加载环境变量
    private def loadEnvironmentArguments(): Unit{
      master = Option(master)
          .orElse(sparkProperties.get("spark.master"))
          .orElse(env.get("MASTER"))
          .orNull
    }
    
    首先去寻找spark.master
    (wordcount代码第一行的setMaster底层就是给spark.master赋值)
    如果没有找到spark.master的值,就去环境变量里去找MASTER
    
    

    2.1.2.2 以--executor-memory为例

    --executor-memory参数是设置每个executor使用的内存大小,默认是1G
    
    private def loadEnvironmentArguments(): Unit{
        executorMemory = Option(executorMemory)
          .orElse(sparkProperties.get("spark.executor.memory"))
          .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
          .orNull
    }
    
    与--master类似,也是先寻找spark.executor.memory
    

    setMaster()源码

      def setMaster(master: String): SparkConf = {
        set("spark.master", master)
      }
    

    2.1.3 如何获得spark的默认配置参数

      lazy val defaultSparkProperties: HashMap[String, String] = {
        val defaultProperties = new HashMap[String, String]()
        //注释1
        Option(propertiesFile).foreach { filename =>
        //注释2
          val properties = Utils.getPropertiesFromFile(filename)
          properties.foreach { case (k, v) =>
            defaultProperties(k) = v
          }
          if (verbose) {
            Utils.redact(properties).foreach { case (k, v) =>
              logInfo(s"Adding default property: $k=$v")
            }
          }
        }
        defaultProperties
      }
    mergeDefaultSparkProperties()
    //注释3
    ignoreNonSparkProperties()
    
    
      private def mergeDefaultSparkProperties(): Unit = {
        propertiesFile = Option(propertiesFile).
                         getOrElse(Utils.getDefaultPropertiesFile(env))
        defaultSparkProperties.foreach { case (k, v) =>
          if (!sparkProperties.contains(k)) {
            sparkProperties(k) = v
          }
        }
      }
    
      def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
        env.get("SPARK_CONF_DIR")
          .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
          .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
          .filter(_.isFile)
          .map(_.getAbsolutePath)
          .orNull
      }
    
      private def ignoreNonSparkProperties(): Unit = {
        sparkProperties.foreach { case (k, v) =>
          if (!k.startsWith("spark.")) {
            sparkProperties -= k
            logWarning(s"Ignoring non-spark config property: $k=$v")
          }
        }
      }
    
    2.1.3.1 注释1
    (1) 作用
    ```md
    明确filename代表什么
    

    (2) 作用代码

    Option(propertiesFile).foreach { filename => {}
    }
    

    (3) 调用情况

    defaultSparkProperties()==>mergeDefaultSparkProperties()==>getDefaultPropertiesFile()
    

    (4) 流程简要说明

    首先需要知道filename,也就是propertiesFile来自于哪里
    之后在该类中发现, 如果不是人为设置,
       该类里只有在mergeDefaultSparkProperties()对propertiesFile赋值
    mergeDefaultSparkProperties()得到getDefaultPropertiesFile()的返回值
       也就是说默认配置文件的路径
    

    (5) 相关方法重要代码说明

    getDefaultPropertiesFile
        先从环境中找到SPARK_CONF_DIR
        如果没有配置的话,就去环境变量中找SPARK_HOME
        然后找到其conf文件夹中spark-defaults.conf文件
        getDefaultPropertiesFile返回的就是默认的spark配置文件的路径
        也就是说defaultSparkProperties的filename就是默认的spark配置文件的路径
        
    

    2.1.3.2 注释2
    (1) 作用

     根据默认配置文件路径,获得键值对的properties
    

    (2)作用代码

    val properties = Utils.getPropertiesFromFile(filename)
    

    (3) 调用情况

    defaultSparkProperties() ==> getDefaultPropertiesFile()
    

    (4) 流程简要说明

    作用代码调用Utils.getPropertiesFromFile(filename)
    properties是一个键值对
    

    (5)相关方法重要代码说明

    getPropertiesFromFile()
        拿到文件名后,读取该文件,将其每一行都变成key-value pair
        返回的是一个map
    

    2.1.3.3 注释3
    (1) 作用

    从properties变量中移除key不是以spark.开头的key
    

    (2) 作用代码

    ignoreNonSparkProperties()
    

    (3) 调用情况

    defaultSparkProperties() ==> Utils.ignoreNonSparkProperties()
    

    (4) 相关方法重要代码说明

    ignoreNonSparkProperties()
      循环sparkProperties取出key不是以spark.开头的
        
    

    2.1.3.4 mergeDefaultSparkProperties()
    (1) 作用

    将命令行 --conf后添加的自定义spark参数放入sparkProperties中
    

    (2) 调用情况

    mergeDefaultSparkProperties ==> defaultSparkProperties()
    

    (3) 相关代码重要说明

    遍历defaultSparkProperties返回的defaultProperties(key-value pair),  变成sparkProperties的key-value pair
    

    2.2 spark-defaults.conf

    2.2.1 作用

    保存spark所有公共属性
    将需要配置的属性配置在该文件中
    之后就不需要写命令行参数(./saprk-shell --master local[2])
    因为系统会自动找到配置在该文件中的参数
    

    2.2.2 为什么要自定义spark参数

    当连接数据库时,数据库的url需要灵活改变,此时自定义参数发挥优势
    

    2.3 如何得到自定义的参数的值

    sc.getConf.get("spark.henry")
    

    2.4 参数定义的位置与优先级

    2.4.1 源码注释

    In this case, parameters you set directly on the `SparkConf` 
    object take priority over system properties
    

    2.4.2 翻译

    配置在SparkConf的优先级比系统配置高
    

    2.4.3 如何理解

    首先在命令行输入
    

    2.5 理解上述可以做什么

    在文件夹henrydata-etl下有lib、shell、logs、conf文件夹
    在conf下文件夹有etl-default.conf文件,里面保存的是需要的key-value
    此时直接按照之前代码大意,调用scala工具类就可以解析出来
    省去人为解析的麻烦
    
    1. sparkContext的重要源码解析

    3.1 创建spark运行环境

    _env = createSparkEnv(_conf, isLocal, listenerBus)
    
      private[spark] def createSparkEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus): SparkEnv = {
        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
      }
    
    private[spark] def createDriverEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus,
          numCores: Int,
          mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
        assert(conf.contains(DRIVER_HOST_ADDRESS),
          s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
        assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
        val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
        val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
        val port = conf.get("spark.driver.port").toInt
        val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
          Some(CryptoStreamUtils.createKey(conf))
        } else {
          None
        }
        create(
          conf,
          SparkContext.DRIVER_IDENTIFIER,
          bindAddress,
          advertiseAddress,
          Option(port),
          isLocal,
          numCores,
          ioEncryptionKey,
          listenerBus = listenerBus,
          mockOutputCommitCoordinator = mockOutputCommitCoordinator
        )
      }
    
     //定义driver端口
    //key为spark.driver.bindAddress的值为端口值
      private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
        .doc("Address where to bind network listen sockets on the driver.")
        .fallbackConf(DRIVER_HOST_ADDRESS)
    

    3.1.2 create函数重要代码

    使用反射创建实例
    
    //使用反射使用指定类名创建实例
    def instantiateClass[T](className: String): T = {
          val cls = Utils.classForName(className)
          // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
          // SparkConf, then one taking no arguments
          try {
            cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
              .newInstance(conf, new java.lang.Boolean(isDriver))
              .asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              try {
                cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
              } catch {
                case _: NoSuchMethodException =>
                  cls.getConstructor().newInstance().asInstanceOf[T]
              }
          }
        }
    

    3.2 WEB UI

    private[spark] def ui: Option[SparkUI] = _ui
    
       //默认情况下为true
        _ui =
          if (conf.getBoolean("spark.ui.enabled", true)) {
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
              startTime))
          } else {
            // For tests, do not enable the UI
            None
          }
    
    def create(
          sc: Option[SparkContext],
          store: AppStatusStore,
          conf: SparkConf,
          securityManager: SecurityManager,
          appName: String,
          basePath: String,
          startTime: Long,
          appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {
        //使用new sparkUI的方式创建UI
        new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)
      }
    
    private[spark] class SparkUI private(
        //存储信息
        val store: AppStatusStore,
        //环境变量内的信息
        val sc: Option[SparkContext],
        //配置信息
        val conf: SparkConf,
        securityManager: SecurityManager,
        var appName: String,
        val basePath: String,
        val startTime: Long,
        //版本信息,对应spark web ui的导航栏的版本号
        val appSparkVersion: String
    )
    
    private[spark] abstract class WebUI(
        val securityManager: SecurityManager,
        val sslOptions: SSLOptions,
        port: Int,
        conf: SparkConf,
        basePath: String = "",
        name: String = ""
    )
    extends Logging {
      //
       protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS"))
            .getOrElse(conf.get(DRIVER_HOST_ADDRESS))
       def webUrl: String = s"http://$publicHostName:$boundPort"
    }
    

    相关文章

      网友评论

          本文标题:SparkContxt重要源码

          本文链接:https://www.haomeiwen.com/subject/pgebyqtx.html