美文网首页
SparkContext源码剖析(version 2.3.1)

SparkContext源码剖析(version 2.3.1)

作者: 白面葫芦娃92 | 来源:发表于2019-07-21 16:04 被阅读0次
/**
 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
 *
 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before
 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
 *
 * @param config a Spark Config object describing the application configuration. Any settings in
 *   this config overrides the default configs as well as system properties.
 */
//ZH...Spark功能主要入口点
//ZH...一个SparkContext表示与一个Spark集群的连接,在Spark集群上,能创建RDDs,累加器和广播变量
//ZH...每个JVM仅仅只有一个SparkContext是活动的,在创建一个新的SparkContext之前,必须停掉活动的SparkContext,这个限制可能最终被移除

SparkContext 是通往 Spark 集群的唯一入口,可以用来在 Spark 集群中创建 RDDs 、 累加器( Accumulators )和广播变量( Broadcast Variables ) 。 SparkContext 也是整个 Spark 应用程序( Application ) 中 至关重要的一个对象,可以说是整个 Application 运行调度的核心。初始化 Spark 应用程序运行所需要 的核心组件 , 包括高层调度器(DAGScheduler)、底层调度器 ( TaskScheduler ) 和调度器的通信终端( SchedulerBackend ),同时还会负责 Spark 程序 向 Master 注册程序等

1. 初始化 configuration

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER):请注意这个,其实在spark眼里没有driver的概念,都是Executor,只是id标签标记为了driver而已。

2. 初始化日志目录并设置压缩类

3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener。

_listenerBus = new LiveListenerBus(_conf)

4. 给 app 提供一个 kv store(in-memory)

_statusStore = AppStatusStore.createLiveStore(conf)

5.注册 AppStatusListener 到 LiveListenerBus 中

listenerBus.addToStatusQueue(_statusStore.listener.get)

6. 创建 driver端的 env(参见类SparkEnv.scala)

    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
 // This function allows components created by SparkEnv to be mocked in unit tests:
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  }

conf:sparkConf,spark的环境配置。
isLocal:模式判断。
listenerBus:事件监听总线。
SparkContext.numDriverCores(master):Driver的核数。
根据开发者提示,dirver和executor都是调用的这个创建方法

class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,//用来缓存MapStatus信息,并提供从MapOutputMaster获取信息的功能
    val shuffleManager: ShuffleManager,//路由维护表
    val broadcastManager: BroadcastManager,//广播
    val blockManager: BlockManager,//块管理
    val securityManager: SecurityManager, //安全管理
    val metricsSystem: MetricsSystem,//测量
    val memoryManager: MemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf  //配置文件
) 

6.1SecurityManager 创建安全管理器

    val securityManager = new SecurityManager(conf, ioEncryptionKey)
    if (isDriver) {
      securityManager.initializeAuth()
    }

6.2创建Netty分布式消息系统,建立RPC通讯,设置Driver端口

   val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

    // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
    if (isDriver) {
      conf.set("spark.driver.port", rpcEnv.address.port.toString)
    }

用于接受Executor的汇报信息
最后调用startServiceOnPort启动监听端口

6.3创建 SerializerManager(采用序列化类: org.apache.spark.serializer.JavaSerializer)

Serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer类的实例。其中closureSerializer实例用来对Scala中的闭包进行序列化。

    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
   logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)

6.4创建BroadcastManager

    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

6.5创建 MapOutputTracker 建立RPC

用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。MapOutputTrackerMaster内部使用mapStatus:TimeStampedHashMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。
根据是否为driver存在不同的创建方式:
  如果当前应用程序为Driver,则创建MapOutputTrackerMaster,然后创建MapOutputTrackerMasterEndpoint,并且注册到RpcEndpoint系统中。
  如果当前应用程序为Executor,则创建MapOutputTrackerWorker,并从RpcEndpoint持有MapOutputTrackerMasterEndpint的应用。

    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }

    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
    // requires the MapOutputTracker itself
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

6.6创建 ShuffleManager

ShuffleManager负责管理本地及远程的block数据的shuffle操作。默认的SortShuffleManager通过持有的IndexShuffleBlockManger间接操作BlockManager中的DiskBlockManger将map结果写入本地,并根据shuffleId,mapId写入索引文件,也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其他远程节点读取文件。

    // Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

6.7创建MemoryManager(默认采用:UnifiedMemoryManager 管理内存)

    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

6.8创建 BlockManagerMaster 并建立RPC

BlockManagerMaster 对整个集群的 Block 数据进行管理,Block 是 Spark 数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中
块传输服务blockTransferService: BlockTransferService默认为NettyBlockTransferService,使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合

    val blockManagerPort = if (isDriver) {
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
      conf.get(BLOCK_MANAGER_PORT)
    }

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

6.9创建BlockManager

BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效地

   // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

BlockManager 中的成员变量 中:
BlockManagerMaster 对整个集群的 BlockManagerMaster进行管 理 :
serializerManager 是默认的序列化器 ;
MemoryManager 是内存管理 ;
MapOutputTracker 是 Shuffle 输出的时候,要记录 ShuffleMapTask 输出的位置,以供下一个Stage 使用,因此需要进行记录 。
BlockTransferService 是进行网络操作的,如果要连同另外一个 BlockManager 进行数据读写操作,就需要 BlockTransferService 。 Block 是 Spark 运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在 Alluxio 上。
SecurityManager 是安全管理;
numUsableCores 是可用 的 Cores
BlockManager 中 DiskBlockManager 管理磁盘的读写, 创建并维护磁盘上逻辑块和物理块之间 的逻辑映射位置。一个 block 被映射到根据 Blockld 生成的一个文件,块文件哈希列在目录 spark.local.dir 中 (如果设置了 SPARK LOCAL DIRS ),或在目录( SPARK LOCAL DIRS ) 中 。
BlockManager 缓存池 : block-manager-future 以及 memoryStore 、diskStore 。
Shuffle 读写数据的时候是通过 BlockManager 进行管理的

6.10 创建测量系统MetricsSystem

MetricsSystem是Spark的测量系统

 val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

6.11 创建OutputCommitCoordinator

    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

6.12创建envInstance 实例 & 创建临时目录

    val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

7.从底层监控 spark job 和 stage 的状态并汇报的 API

  _statusTracker = new SparkStatusTracker(this, _statusStore)

8.console 进度条

    _progressBar =
      if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }

9.spark ui(它实际上是启动一个jetty服务器,创建了一个web应用,启动了4040端口)

    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())

10.创建 hadoop configuration

 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

11.Add each JAR given through the constructor

    if (jars != null) {
      jars.foreach(addJar)
    }

    if (files != null) {
      files.foreach(addFile)
    }

12.计算 executor 的内存

    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

13.创建 HeartbeatReceiver endpoint

  // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

14.创建 task scheduler 和 scheduler backend,创建DAGScheduler实例

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

TaskScheduler 调度每个stage人的(Task)进行处理
SchedulerBackend 为当前Application 分配资源(Executor)
DAGScheduler 将job划分为多个阶段 stage

15.启动 task scheduler

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

16.从task scheduler 获取 application ID, application attempt id

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)

17.为ui 设置 application id

    _ui.foreach(_.setAppId(_applicationId))

18.初始化 block manager

    _env.blockManager.initialize(_applicationId)

19.启动 metricsSystem,将 metricSystem 的 servlet handler 给 ui 用

    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

20.初始化 event logger listener

    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }

21.如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之

    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
              _env.blockManager.master))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

22.初始化 ContextCleaner,并启动之

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

23.建立并启动 listener bus

    setupAndStartListenerBus()

24.task scheduler 已就绪,发送环境已更新请求

    postEnvironmentUpdate()

25.发送 application start 请求事件

    postApplicationStart()

26.等待 直至task scheduler backend 准备好了

    // Post init
    _taskScheduler.postStartHook()

27. 注册 dagScheduler metricsSource

    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)

28.注册 metric source

    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))

29.注册 metric source

    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

30.设置 shutdown hook, 在spark context 关闭时,要做的回调操作

    // Make sure the context is stopped if the user forgets about it. This avoids leaving
    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    // is killed, though.
    logDebug("Adding shutdown hook") // force eager creation of logger
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      stop()

全部源码如下:

try {
  2   // 1. 初始化 configuration
  3   _conf = config.clone()
  4   _conf.validateSettings()
  5 
  6   if (!_conf.contains("spark.master")) {
  7     throw new SparkException("A master URL must be set in your configuration")
  8   }
  9   if (!_conf.contains("spark.app.name")) {
 10     throw new SparkException("An application name must be set in your configuration")
 11   }
 12 
 13   // log out spark.app.name in the Spark driver logs
 14   logInfo(s"Submitted application: $appName")
 15 
 16   // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
 17   if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
 18     throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
 19       "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
 20   }
 21 
 22   if (_conf.getBoolean("spark.logConf", false)) {
 23     logInfo("Spark configuration:\n" + _conf.toDebugString)
 24   }
 25 
 26   // Set Spark driver host and port system properties. This explicitly sets the configuration
 27   // instead of relying on the default value of the config constant.
 28   _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
 29   _conf.setIfMissing("spark.driver.port", "0")
 30 
 31   _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 32 
 33   _jars = Utils.getUserJars(_conf)
 34   _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
 35     .toSeq.flatten
 36   // 2. 初始化日志目录并设置压缩类
 37   _eventLogDir =
 38     if (isEventLogEnabled) {
 39       val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
 40         .stripSuffix("/")
 41       Some(Utils.resolveURI(unresolvedDir))
 42     } else {
 43       None
 44     }
 45 
 46   _eventLogCodec = {
 47     val compress = _conf.getBoolean("spark.eventLog.compress", false)
 48     if (compress && isEventLogEnabled) {
 49       Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
 50     } else {
 51       None
 52     }
 53   }
 54   // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener.
 55   _listenerBus = new LiveListenerBus(_conf)
 56 
 57   // Initialize the app status store and listener before SparkEnv is created so that it gets
 58   // all events.
 59   // 4. 给 app 提供一个 kv store(in-memory)
 60   _statusStore = AppStatusStore.createLiveStore(conf)
 61   // 5. 注册 AppStatusListener 到 LiveListenerBus 中
 62   listenerBus.addToStatusQueue(_statusStore.listener.get)
 63 
 64   // Create the Spark execution environment (cache, map output tracker, etc)
 65   // 6. 创建 driver端的 env
 66   // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
 67   // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv,
 68   // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。
 69   _env = createSparkEnv(_conf, isLocal, listenerBus)
 70   SparkEnv.set(_env)
 71 
 72   // If running the REPL, register the repl's output dir with the file server.
 73   _conf.getOption("spark.repl.class.outputDir").foreach { path =>
 74     val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
 75     _conf.set("spark.repl.class.uri", replUri)
 76   }
 77   // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API
 78   _statusTracker = new SparkStatusTracker(this, _statusStore)
 79 
 80   // 8. console 进度条
 81   _progressBar =
 82     if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
 83       Some(new ConsoleProgressBar(this))
 84     } else {
 85       None
 86     }
 87 
 88   // 9. spark ui, 使用jetty 实现
 89   _ui =
 90     if (conf.getBoolean("spark.ui.enabled", true)) {
 91       Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 92         startTime))
 93     } else {
 94       // For tests, do not enable the UI
 95       None
 96     }
 97   // Bind the UI before starting the task scheduler to communicate
 98   // the bound port to the cluster manager properly
 99   _ui.foreach(_.bind())
100 
101   // 10. 创建 hadoop configuration
102   _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
103 
104   // 11. Add each JAR given through the constructor
105   if (jars != null) {
106     jars.foreach(addJar)
107   }
108 
109   if (files != null) {
110     files.foreach(addFile)
111   }
112   // 12. 计算 executor 的内存
113   _executorMemory = _conf.getOption("spark.executor.memory")
114     .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
115     .orElse(Option(System.getenv("SPARK_MEM"))
116     .map(warnSparkMem))
117     .map(Utils.memoryStringToMb)
118     .getOrElse(1024)
119 
120   // Convert java options to env vars as a work around
121   // since we can't set env vars directly in sbt.
122   for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
123     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
124     executorEnvs(envKey) = value
125   }
126   Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
127     executorEnvs("SPARK_PREPEND_CLASSES") = v
128   }
129   // The Mesos scheduler backend relies on this environment variable to set executor memory.
130   // TODO: Set this only in the Mesos scheduler.
131   executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
132   executorEnvs ++= _conf.getExecutorEnv
133   executorEnvs("SPARK_USER") = sparkUser
134 
135   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
136   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
137   // 13. 创建 HeartbeatReceiver endpoint
138   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
139     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
140 
141   // Create and start the scheduler
142   // 14. 创建 task scheduler 和 scheduler backend
143   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
144   _schedulerBackend = sched
145   _taskScheduler = ts
146   // 15. 创建DAGScheduler实例
147   _dagScheduler = new DAGScheduler(this)
148   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
149 
150   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
151   // constructor
152   // 16. 启动 task scheduler
153   _taskScheduler.start()
154 
155   // 17. 从task scheduler 获取 application ID
156   _applicationId = _taskScheduler.applicationId()
157   // 18. 从 task scheduler 获取 application attempt id
158   _applicationAttemptId = taskScheduler.applicationAttemptId()
159   _conf.set("spark.app.id", _applicationId)
160   if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
161     System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
162   }
163   // 19. 为ui 设置 application id
164   _ui.foreach(_.setAppId(_applicationId))
165   // 20. 初始化 block manager
166   _env.blockManager.initialize(_applicationId)
167 
168   // The metrics system for Driver need to be set spark.app.id to app ID.
169   // So it should start after we get app ID from the task scheduler and set spark.app.id.
170   // 21. 启动 metricsSystem
171   _env.metricsSystem.start()
172   // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
173   // 22. 将 metricSystem 的 servlet handler 给 ui 用
174   _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
175 
176   // 23. 初始化 event logger listener
177   _eventLogger =
178     if (isEventLogEnabled) {
179       val logger =
180         new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
181           _conf, _hadoopConfiguration)
182       logger.start()
183       listenerBus.addToEventLogQueue(logger)
184       Some(logger)
185     } else {
186       None
187     }
188 
189   // Optionally scale number of executors dynamically based on workload. Exposed for testing.
190   // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
191   val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
192   _executorAllocationManager =
193     if (dynamicAllocationEnabled) {
194       schedulerBackend match {
195         case b: ExecutorAllocationClient =>
196           Some(new ExecutorAllocationManager(
197             schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
198             _env.blockManager.master))
199         case _ =>
200           None
201       }
202     } else {
203       None
204     }
205   _executorAllocationManager.foreach(_.start())
206 
207   // 25. 初始化 ContextCleaner,并启动之
208   _cleaner =
209     if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
210       Some(new ContextCleaner(this))
211     } else {
212       None
213     }
214   _cleaner.foreach(_.start())
215   // 26. 建立并启动 listener bus
216   setupAndStartListenerBus()
217   // 27.  task scheduler 已就绪,发送环境已更新请求
218   postEnvironmentUpdate()
219   // 28.  发送 application start 请求事件
220   postApplicationStart()
221 
222   // Post init
223   // 29.等待 直至task scheduler backend 准备好了
224   _taskScheduler.postStartHook()
225   // 30. 注册 dagScheduler metricsSource
226   _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
227   // 31. 注册 metric source
228   _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
229   //32. 注册 metric source
230   _executorAllocationManager.foreach { e =>
231     _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
232   }
233 
234   // Make sure the context is stopped if the user forgets about it. This avoids leaving
235   // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
236   // is killed, though.
237   logDebug("Adding shutdown hook") // force eager creation of logger
238   // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作
239   _shutdownHookRef = ShutdownHookManager.addShutdownHook(
240     ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
241     logInfo("Invoking stop() from shutdown hook")
242     try {
243       stop()
244     } catch {
245       case e: Throwable =>
246         logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
247     }
248   }
249 } catch {
250   case NonFatal(e) =>
251     logError("Error initializing SparkContext.", e)
252     try {
253       stop()
254     } catch {
255       case NonFatal(inner) =>
256         logError("Error stopping SparkContext after init error.", inner)
257     } finally {
258       throw e
259     }
260 } 

相关文章

网友评论

      本文标题:SparkContext源码剖析(version 2.3.1)

      本文链接:https://www.haomeiwen.com/subject/afiulctx.html