美文网首页
Spark源码分析七-SparkContext 的初始化过程

Spark源码分析七-SparkContext 的初始化过程

作者: 无色的叶 | 来源:发表于2020-07-07 16:21 被阅读0次

    参考:https://www.cnblogs.com/johnny666888/p/11116052.html

    spark任务入口

    从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下:

    val spark = SparkSession
      .builder
     .appName("SparkTC")
     .getOrCreate()
    

    使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如下:

     1 def getOrCreate(): SparkSession = synchronized {
     2   assertOnDriver() // 注意,spark session只能在 driver端创建并访问
     3   // Get the session from current thread's active session.
     4 // activeThreadSession 是一个InheritableThreadLocal(继承自ThreadLocal)方法。因为数据在 ThreadLocal中存放着,所以不需要加锁
     5   var session = activeThreadSession.get()
     6 // 如果session不为空,且session对应的sparkContext已经停止了,可以使用现有的session
     7   if ((session ne null) && !session.sparkContext.isStopped) {
     8     options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
     9     if (options.nonEmpty) {
    10       logWarning("Using an existing SparkSession; some configuration may not take effect.")
    11     }
    12     return session
    13   }
    14 
    15   // 给SparkSession 对象加锁,防止重复初始化 session
    16 SparkSession.synchronized {
    17     // If the current thread does not have an active session, get it from the global session.
    18 // 如果默认session 中有session存在,切其sparkContext 已经停止,也可以使用
    19     session = defaultSession.get()
    20     if ((session ne null) && !session.sparkContext.isStopped) {
    21       options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
    22       if (options.nonEmpty) {
    23         logWarning("Using an existing SparkSession; some configuration may not take effect.")
    24       }
    25       return session
    26     }
    27 
    28     // 创建session
    29     val sparkContext = userSuppliedContext.getOrElse { // 默认userSuppliedContext肯定没有SparkSession对象
    30       val sparkConf = new SparkConf()
    31       options.foreach { case (k, v) => sparkConf.set(k, v) }
    32 
    33       // set a random app name if not given.
    34       if (!sparkConf.contains("spark.app.name")) {
    35         sparkConf.setAppName(java.util.UUID.randomUUID().toString)
    36       }
    37 
    38       SparkContext.getOrCreate(sparkConf)
    39       // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
    40     }
    41 
    42     // Initialize extensions if the user has defined a configurator class.
    43     val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
    44     if (extensionConfOption.isDefined) {
    45       val extensionConfClassName = extensionConfOption.get
    46       try {
    47         val extensionConfClass = Utils.classForName(extensionConfClassName)
    48         val extensionConf = extensionConfClass.newInstance()
    49           .asInstanceOf[SparkSessionExtensions => Unit]
    50         extensionConf(extensions)
    51       } catch {
    52         // Ignore the error if we cannot find the class or when the class has the wrong type.
    53         case e @ (_: ClassCastException |
    54                   _: ClassNotFoundException |
    55                   _: NoClassDefFoundError) =>
    56           logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
    57       }
    58     }
    59    // 初始化 SparkSession,并把刚初始化的 SparkContext 传递给它
    60     session = new SparkSession(sparkContext, None, None, extensions)
    61     options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
    62 // 设置 default session
    63     setDefaultSession(session)
    64 // 设置 active session
    65 setActiveSession(session)
    66 
    67     // Register a successfully instantiated context to the singleton. This should be at the
    68     // end of the class definition so that the singleton is updated only if there is no
    69     // exception in the construction of the instance.
    70     // 设置 apark listener ,当application 结束时,default session 重置
    71 sparkContext.addSparkListener(new SparkListener {
    72       override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    73         defaultSession.set(null)
    74       }
    75     })
    76   }
    77 
    78   return session
    79 }
    
    

    org.apache.spark.SparkContext#getOrCreate方法如下:

    1 def getOrCreate(config: SparkConf): SparkContext = {
     2   // Synchronize to ensure that multiple create requests don't trigger an exception
     3   // from assertNoOtherContextIsRunning within setActiveContext
     4 // 使用Object 对象锁
     5   SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
     6 // activeContext是一个AtomicReference 实例,它的数据set或update都是原子性的
     7     if (activeContext.get() == null) {
     8 // 一个session 只有一个 SparkContext 上下文对象
     9       setActiveContext(new SparkContext(config), allowMultipleContexts = false)
    10     } else {
    11       if (config.getAll.nonEmpty) {
    12         logWarning("Using an existing SparkContext; some configuration may not take effect.")
    13       }
    14     }
    15     activeContext.get()
    16   }
    17 }
    

    SparkContext初始化

    SparkContext 代表到 spark 集群的连接,它可以用来在spark集群上创建 RDD,accumulator和broadcast 变量。一个JVM 只能有一个活动的 SparkContext 对象,当创建一个新的时候,必须调用stop 方法停止活动的 SparkContext。
    当调用了构造方法后,会初始化类的成员变量,然后进入初始化过程。由 try catch 块包围,这个 try catch 块是在执行构造函数时执行的,参照我写的一篇文章:scala class中孤立代码块揭秘

    这块孤立的代码块如下:

    1 try {
      2   // 1. 初始化 configuration
      3   _conf = config.clone()
      4   _conf.validateSettings()
      5 
      6   if (!_conf.contains("spark.master")) {
      7     throw new SparkException("A master URL must be set in your configuration")
      8   }
      9   if (!_conf.contains("spark.app.name")) {
     10     throw new SparkException("An application name must be set in your configuration")
     11   }
     12 
     13   // log out spark.app.name in the Spark driver logs
     14   logInfo(s"Submitted application: $appName")
     15 
     16   // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
     17   if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
     18     throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
     19       "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
     20   }
     21 
     22   if (_conf.getBoolean("spark.logConf", false)) {
     23     logInfo("Spark configuration:\n" + _conf.toDebugString)
     24   }
     25 
     26   // Set Spark driver host and port system properties. This explicitly sets the configuration
     27   // instead of relying on the default value of the config constant.
     28   _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
     29   _conf.setIfMissing("spark.driver.port", "0")
     30 
     31   _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
     32 
     33   _jars = Utils.getUserJars(_conf)
     34   _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
     35     .toSeq.flatten
     36   // 2. 初始化日志目录并设置压缩类
     37   _eventLogDir =
     38     if (isEventLogEnabled) {
     39       val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
     40         .stripSuffix("/")
     41       Some(Utils.resolveURI(unresolvedDir))
     42     } else {
     43       None
     44     }
     45 
     46   _eventLogCodec = {
     47     val compress = _conf.getBoolean("spark.eventLog.compress", false)
     48     if (compress && isEventLogEnabled) {
     49       Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
     50     } else {
     51       None
     52     }
     53   }
     54   // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener.
     55   _listenerBus = new LiveListenerBus(_conf)
     56 
     57   // Initialize the app status store and listener before SparkEnv is created so that it gets
     58   // all events.
     59   // 4. 给 app 提供一个 kv store(in-memory)
     60   _statusStore = AppStatusStore.createLiveStore(conf)
     61   // 5. 注册 AppStatusListener 到 LiveListenerBus 中
     62   listenerBus.addToStatusQueue(_statusStore.listener.get)
     63 
     64   // Create the Spark execution environment (cache, map output tracker, etc)
     65   // 6. 创建 driver端的 env
     66   // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
     67   // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv,
     68   // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。
     69   _env = createSparkEnv(_conf, isLocal, listenerBus)
     70   SparkEnv.set(_env)
     71 
     72   // If running the REPL, register the repl's output dir with the file server.
     73   _conf.getOption("spark.repl.class.outputDir").foreach { path =>
     74     val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
     75     _conf.set("spark.repl.class.uri", replUri)
     76   }
     77   // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API
     78   _statusTracker = new SparkStatusTracker(this, _statusStore)
     79 
     80   // 8. console 进度条
     81   _progressBar =
     82     if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
     83       Some(new ConsoleProgressBar(this))
     84     } else {
     85       None
     86     }
     87 
     88   // 9. spark ui, 使用jetty 实现
     89   _ui =
     90     if (conf.getBoolean("spark.ui.enabled", true)) {
     91       Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
     92         startTime))
     93     } else {
     94       // For tests, do not enable the UI
     95       None
     96     }
     97   // Bind the UI before starting the task scheduler to communicate
     98   // the bound port to the cluster manager properly
     99   _ui.foreach(_.bind())
    100 
    101   // 10. 创建 hadoop configuration
    102   _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    103 
    104   // 11. Add each JAR given through the constructor
    105   if (jars != null) {
    106     jars.foreach(addJar)
    107   }
    108 
    109   if (files != null) {
    110     files.foreach(addFile)
    111   }
    112   // 12. 计算 executor 的内存
    113   _executorMemory = _conf.getOption("spark.executor.memory")
    114     .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
    115     .orElse(Option(System.getenv("SPARK_MEM"))
    116     .map(warnSparkMem))
    117     .map(Utils.memoryStringToMb)
    118     .getOrElse(1024)
    119 
    120   // Convert java options to env vars as a work around
    121   // since we can't set env vars directly in sbt.
    122   for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
    123     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
    124     executorEnvs(envKey) = value
    125   }
    126   Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
    127     executorEnvs("SPARK_PREPEND_CLASSES") = v
    128   }
    129   // The Mesos scheduler backend relies on this environment variable to set executor memory.
    130   // TODO: Set this only in the Mesos scheduler.
    131   executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    132   executorEnvs ++= _conf.getExecutorEnv
    133   executorEnvs("SPARK_USER") = sparkUser
    134 
    135   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    136   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    137   // 13. 创建 HeartbeatReceiver endpoint
    138   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    139     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    140 
    141   // Create and start the scheduler
    142   // 14. 创建 task scheduler 和 scheduler backend
    143   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    144   _schedulerBackend = sched
    145   _taskScheduler = ts
    146   // 15. 创建DAGScheduler实例
    147   _dagScheduler = new DAGScheduler(this)
    148   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    149 
    150   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    151   // constructor
    152   // 16. 启动 task scheduler
    153   _taskScheduler.start()
    154 
    155   // 17. 从task scheduler 获取 application ID
    156   _applicationId = _taskScheduler.applicationId()
    157   // 18. 从 task scheduler 获取 application attempt id
    158   _applicationAttemptId = taskScheduler.applicationAttemptId()
    159   _conf.set("spark.app.id", _applicationId)
    160   if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
    161     System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    162   }
    163   // 19. 为ui 设置 application id
    164   _ui.foreach(_.setAppId(_applicationId))
    165   // 20. 初始化 block manager
    166   _env.blockManager.initialize(_applicationId)
    167 
    168   // The metrics system for Driver need to be set spark.app.id to app ID.
    169   // So it should start after we get app ID from the task scheduler and set spark.app.id.
    170   // 21. 启动 metricsSystem
    171   _env.metricsSystem.start()
    172   // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    173   // 22. 将 metricSystem 的 servlet handler 给 ui 用
    174   _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
    175 
    176   // 23. 初始化 event logger listener
    177   _eventLogger =
    178     if (isEventLogEnabled) {
    179       val logger =
    180         new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
    181           _conf, _hadoopConfiguration)
    182       logger.start()
    183       listenerBus.addToEventLogQueue(logger)
    184       Some(logger)
    185     } else {
    186       None
    187     }
    188 
    189   // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    190   // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
    191   val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    192   _executorAllocationManager =
    193     if (dynamicAllocationEnabled) {
    194       schedulerBackend match {
    195         case b: ExecutorAllocationClient =>
    196           Some(new ExecutorAllocationManager(
    197             schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
    198             _env.blockManager.master))
    199         case _ =>
    200           None
    201       }
    202     } else {
    203       None
    204     }
    205   _executorAllocationManager.foreach(_.start())
    206 
    207   // 25. 初始化 ContextCleaner,并启动之
    208   _cleaner =
    209     if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
    210       Some(new ContextCleaner(this))
    211     } else {
    212       None
    213     }
    214   _cleaner.foreach(_.start())
    215   // 26. 建立并启动 listener bus
    216   setupAndStartListenerBus()
    217   // 27.  task scheduler 已就绪,发送环境已更新请求
    218   postEnvironmentUpdate()
    219   // 28.  发送 application start 请求事件
    220   postApplicationStart()
    221 
    222   // Post init
    223   // 29.等待 直至task scheduler backend 准备好了
    224   _taskScheduler.postStartHook()
    225   // 30. 注册 dagScheduler metricsSource
    226   _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    227   // 31. 注册 metric source
    228   _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    229   //32. 注册 metric source
    230   _executorAllocationManager.foreach { e =>
    231     _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    232   }
    233 
    234   // Make sure the context is stopped if the user forgets about it. This avoids leaving
    235   // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    236   // is killed, though.
    237   logDebug("Adding shutdown hook") // force eager creation of logger
    238   // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作
    239   _shutdownHookRef = ShutdownHookManager.addShutdownHook(
    240     ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
    241     logInfo("Invoking stop() from shutdown hook")
    242     try {
    243       stop()
    244     } catch {
    245       case e: Throwable =>
    246         logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
    247     }
    248   }
    249 } catch {
    250   case NonFatal(e) =>
    251     logError("Error initializing SparkContext.", e)
    252     try {
    253       stop()
    254     } catch {
    255       case NonFatal(inner) =>
    256         logError("Error stopping SparkContext after init error.", inner)
    257     } finally {
    258       throw e
    259     }
    260 } 
    

    从上面可以看出,spark context 的初始化是非常复杂的,涉及的spark 组件很多,包括 异步事务总线系统LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics监测系统、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暂且当作是总述,后面对部分组件会有比较全面的剖析。

    相关文章

      网友评论

          本文标题:Spark源码分析七-SparkContext 的初始化过程

          本文链接:https://www.haomeiwen.com/subject/iboaqktx.html