美文网首页
SparkContext组件初始化

SparkContext组件初始化

作者: LZhan | 来源:发表于2019-08-04 21:21 被阅读0次

    参考博客来自微信公众号暴走大数据

    1.SparkContext的作用

    SparkContext存在于Driver中,是Spark功能的主要入口。代表着与Spark集群的连接,可以在集群上创建RDD,accumulators和广播变量。

    2.SparkContext的构造方法

    辅助构造方法:

    class SparkContext(config: SparkConf) extends Logging {
    // ...
      
      def this() = this(new SparkConf())
    
      def this(master: String, appName: String, conf: SparkConf) =
    this(SparkContext.updatedConf(conf, master, appName))
    
      def this(
          master: String,
          appName: String,
          sparkHome: String = null,
          jars: Seq[String] = Nil,
          environment: Map[String, String] = Map()) = {
    this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
      }
    
    private[spark] def this(master: String, appName: String) =
    this(master, appName, null, Nil, Map())
    
    private[spark] def this(master: String, appName: String, sparkHome: String) =
    this(master, appName, sparkHome, Nil, Map())
    
    private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
    this(master, appName, sparkHome, jars, Map())
    
    // ...
    }
    

    而其主构造方法主要由一个巨大的try-catch块组成,位于SparkContext.scala的362~586行,它内部包含了很多初始化逻辑。

    3.SparkContext的组件初始化

    try {
        _conf = config.clone()
        _conf.validateSettings()
    
        if (!_conf.contains("spark.master")) {
          throw new SparkException("A master URL must be set in your configuration")
        }
        if (!_conf.contains("spark.app.name")) {
          throw new SparkException("An application name must be set in your configuration")
        }
    
        // log out spark.app.name in the Spark driver logs
        logInfo(s"Submitted application: $appName")
    
        // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
        if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
          throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
            "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
        }
    
        if (_conf.getBoolean("spark.logConf", false)) {
          logInfo("Spark configuration:\n" + _conf.toDebugString)
        }
    
        // Set Spark driver host and port system properties. This explicitly sets the configuration
        // instead of relying on the default value of the config constant.
        _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
        _conf.setIfMissing("spark.driver.port", "0")
    
        _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
    
        _jars = Utils.getUserJars(_conf)
        _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
          .toSeq.flatten
    
        _eventLogDir =
          if (isEventLogEnabled) {
            val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
              .stripSuffix("/")
            Some(Utils.resolveURI(unresolvedDir))
          } else {
            None
          }
    
        _eventLogCodec = {
          val compress = _conf.getBoolean("spark.eventLog.compress", false)
          if (compress && isEventLogEnabled) {
            Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
          } else {
            None
          }
        }
    
        _listenerBus = new LiveListenerBus(_conf)
    
        // Initialize the app status store and listener before SparkEnv is created so that it gets
        // all events.
        _statusStore = AppStatusStore.createLiveStore(conf)
        listenerBus.addToStatusQueue(_statusStore.listener.get)
    
        // Create the Spark execution environment (cache, map output tracker, etc)
        _env = createSparkEnv(_conf, isLocal, listenerBus)
        SparkEnv.set(_env)
    
        // If running the REPL, register the repl's output dir with the file server.
        _conf.getOption("spark.repl.class.outputDir").foreach { path =>
          val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
          _conf.set("spark.repl.class.uri", replUri)
        }
    
        _statusTracker = new SparkStatusTracker(this, _statusStore)
    
        _progressBar =
          if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
            Some(new ConsoleProgressBar(this))
          } else {
            None
          }
    
        _ui =
          if (conf.getBoolean("spark.ui.enabled", true)) {
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
              startTime))
          } else {
            // For tests, do not enable the UI
            None
          }
        // Bind the UI before starting the task scheduler to communicate
        // the bound port to the cluster manager properly
        _ui.foreach(_.bind())
    
        _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    
        // Add each JAR given through the constructor
        if (jars != null) {
          jars.foreach(addJar)
        }
    
        if (files != null) {
          files.foreach(addFile)
        }
    
        _executorMemory = _conf.getOption("spark.executor.memory")
          .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
          .orElse(Option(System.getenv("SPARK_MEM"))
          .map(warnSparkMem))
          .map(Utils.memoryStringToMb)
          .getOrElse(1024)
    
        // Convert java options to env vars as a work around
        // since we can't set env vars directly in sbt.
        for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
          value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
          executorEnvs(envKey) = value
        }
        Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
          executorEnvs("SPARK_PREPEND_CLASSES") = v
        }
        // The Mesos scheduler backend relies on this environment variable to set executor memory.
        // TODO: Set this only in the Mesos scheduler.
        executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser
    
        // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
        // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(
          HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    
        _applicationId = _taskScheduler.applicationId()
        _applicationAttemptId = taskScheduler.applicationAttemptId()
        _conf.set("spark.app.id", _applicationId)
        if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
          System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
        }
        _ui.foreach(_.setAppId(_applicationId))
        _env.blockManager.initialize(_applicationId)
    
        // The metrics system for Driver need to be set spark.app.id to app ID.
        // So it should start after we get app ID from the task scheduler and set spark.app.id.
        _env.metricsSystem.start()
        // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
        _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
    
        _eventLogger =
          if (isEventLogEnabled) {
            val logger =
              new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
                _conf, _hadoopConfiguration)
            logger.start()
            listenerBus.addToEventLogQueue(logger)
            Some(logger)
          } else {
            None
          }
    
        // Optionally scale number of executors dynamically based on workload. Exposed for testing.
        val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
        _executorAllocationManager =
          if (dynamicAllocationEnabled) {
            schedulerBackend match {
              case b: ExecutorAllocationClient =>
                Some(new ExecutorAllocationManager(
                  schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
                  _env.blockManager.master))
              case _ =>
                None
            }
          } else {
            None
          }
        _executorAllocationManager.foreach(_.start())
    
        _cleaner =
          if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
            Some(new ContextCleaner(this))
          } else {
            None
          }
        _cleaner.foreach(_.start())
    
        setupAndStartListenerBus()
        postEnvironmentUpdate()
        postApplicationStart()
    
        // Post init
        _taskScheduler.postStartHook()
        _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
        _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
        _executorAllocationManager.foreach { e =>
          _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
        }
    
        // Make sure the context is stopped if the user forgets about it. This avoids leaving
        // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
        // is killed, though.
        logDebug("Adding shutdown hook") // force eager creation of logger
        _shutdownHookRef = ShutdownHookManager.addShutdownHook(
          ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
          logInfo("Invoking stop() from shutdown hook")
          stop()
        }
      } catch {
        case NonFatal(e) =>
          logError("Error initializing SparkContext.", e)
          try {
            stop()
          } catch {
            case NonFatal(inner) =>
              logError("Error stopping SparkContext after init error.", inner)
          } finally {
            throw e
          }
      }
    

    <1> SparkConf


    image.png

    SparkConf作为构造函数的参数传进来之后,SparkContext会先将传入的SparkConf克隆一份副本,之后在副本上做校验。

    <2> LiveListenerBus
    LiveListenerBus是SparkContext中的事件总线。它异步地将事件源产生的事件(SparkListenerEvent)投递给已注册的监听器(SparkListener)。Spark中广泛运用了监听器模式,以适应集群状态下的分布式事件汇报。
    除了它之外,Spark中还有多种事件总线,它们都继承自ListenerBus特征。事件总线是Spark底层的重要支撑组件,之后会专门分析。

    <3> AppStatusStore
    AppStatusStore提供Spark程序运行中各项监控指标的键值对化存储。Web UI中见到的数据指标基本都存储在这里。其初始化代码如下。


    image.png

    createLiveStore方法:

    def createLiveStore(conf: SparkConf): AppStatusStore = {
        val store = new ElementTrackingStore(new InMemoryStore(), conf)
        val listener = new AppStatusListener(store, conf, true)
        new AppStatusStore(store, listener = Some(listener))
      }
    

    可见,AppStatusStore底层使用了ElementTrackingStore,它是能够跟踪元素及其数量的键值对存储结构,因此适合用于监控。另外还会产生一个监听器AppStatusListener的实例,并注册到前述LiveListenerBus中,用来收集监控数据。

    <4> SparkEnv
    SparkEnv是Spark中的执行环境,Driver与Executor的执行都需要SparkEnv提供的各类组件形成的环境作为基础。

    image.png
    createSparkEnv方法十分复杂,后续继续分析,SparkEnv的初始化依赖于LiveListenerBus,并且在SparkContext初始化时只会创建Driver的执行环境,Executor的执行环境是后话了,在创建Driver执行环境后,会调用SparkEnv伴生对象中的set()方法保存它,这样就可以“一处创建,多处使用”SparkEnv。

    <5> SparkStatusTracker
    SparkStatusTracker提供报告最近作业执行情况的低级API。它的内部只有6个方法,从AppStatusStore中查询并返回诸如Job/Stage ID、活跃/完成/失败的Task数、Executor内存用量等基础数据。它只能保证非常弱的一致性语义,也就是说它报告的信息会有延迟或缺漏。

    <6> ConsoleProgressBar
    ConsoleProgressBar按行打印Stage的计算进度。它周期性地从AppStatusStore中查询Stage对应的各状态的Task数,并格式化成字符串输出。它可以通过spark.ui.showConsoleProgress参数控制开关,默认值false。

    <7> SparkUI
    SparkUI维护监控数据在Spark Web UI界面的展示。它的样子在文章#0的图中已经出现过,因此不再赘述。其初始化代码如下。

    _ui =
    if (conf.getBoolean("spark.ui.enabled", true)) {
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
              startTime))
          } else {
            None
          }
        _ui.foreach(_.bind())
    

    可以通过spark.ui.enabled参数来控制是否启用Spark UI,默认值true。然后调用SparkUI的父类WebUI的bind()方法,将Spark UI绑定到特定的host:port上,如文章#0中的localhost:4040。

    <8> HeartbeatReceiver
    HeartbeatReceiver是心跳接收器。Executor需要向Driver定期发送心跳包来表示自己存活。它本质上也是个监听器,继承了SparkListener。其初始化代码如下。

    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    

    可见,HeartbeatReceiver通过RpcEnv最终包装成了一个RPC端点的引用,即代码#2.2中的RpcEndpointRef。

    Spark集群的节点间必然会涉及大量的网络通信,心跳机制只是其中的一方面而已。因此RPC框架同事件总线一样,是Spark底层不可或缺的组成部分。

    <9> SchedulerBackend
    SchedulerBackend负责向等待计算的Task分配计算资源,并在Executor上启动Task。它是一个Scala特征,有多种部署模式下的SchedulerBackend实现类。它在SparkContext中是和TaskScheduler一起初始化的,作为一个元组返回。

    <10> TaskScheduler
    TaskScheduler即任务调度器。它也是一个Scala特征,但只有一种实现,即TaskSchedulerImpl类。它负责提供Task的调度算法,并且会持有SchedulerBackend的实例,通过SchedulerBackend发挥作用。它们两个的初始化代码如下。

    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    

    <11> DAGScheduler
    DAGScheduler即有向无环图(DAG)调度器。DAG用来表示RDD之间的血缘。DAGScheduler负责生成并提交Job,以及按照DAG将RDD和算子划分并提交Stage。每个Stage都包含一组Task,称为TaskSet,它们被传递给TaskScheduler。也就是说DAGScheduler需要先于TaskScheduler进行调度。
    DAGScheduler初始化是直接new出来的,但在其构造方法里也会将SparkContext中TaskScheduler的引用传进去。因此要等DAGScheduler创建后,再真正启动TaskScheduler。

        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
        _taskScheduler.start()
    

    <12> EventLoggingListener
    EventLoggingListener是用于事件持久化的监听器。它可以通过spark.eventLog.enabled参数控制开关,默认值false。如果开启,它也会注册到LiveListenerBus里,并将特定的一部分事件写到磁盘。

    <13> ExecutorAllocationManager
    ExecutorAllocationManager即Executor分配管理器。它可以通过spark.dynamicAllocation.enabled参数控制开关,默认值false。如果开启,并且SchedulerBackend的实现类支持这种机制,Spark就会根据程序运行时的负载动态增减Executor的数量。它的初始化代码如下。

    <14>ContextCleaner
    ContextCleaner即上下文清理器。它可以通过spark.cleaner.referenceTracking参数控制开关,默认值true。它内部维护着对RDD、Shuffle依赖和广播变量(之后会提到)的弱引用,如果弱引用的对象超出程序的作用域,就异步地将它们清理掉

    相关文章

      网友评论

          本文标题:SparkContext组件初始化

          本文链接:https://www.haomeiwen.com/subject/dvxudctx.html