/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
//ZH...Spark功能主要入口点
//ZH...一个SparkContext表示与一个Spark集群的连接,在Spark集群上,能创建RDDs,累加器和广播变量
//ZH...每个JVM仅仅只有一个SparkContext是活动的,在创建一个新的SparkContext之前,必须停掉活动的SparkContext,这个限制可能最终被移除
SparkContext 是通往 Spark 集群的唯一入口,可以用来在 Spark 集群中创建 RDDs 、 累加器( Accumulators )和广播变量( Broadcast Variables ) 。 SparkContext 也是整个 Spark 应用程序( Application ) 中 至关重要的一个对象,可以说是整个 Application 运行调度的核心。初始化 Spark 应用程序运行所需要 的核心组件 , 包括高层调度器(DAGScheduler)、底层调度器 ( TaskScheduler ) 和调度器的通信终端( SchedulerBackend ),同时还会负责 Spark 程序 向 Master 注册程序等
。
1. 初始化 configuration
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER):请注意这个,其实在spark眼里没有driver的概念,都是Executor,只是id标签标记为了driver而已。
2. 初始化日志目录并设置压缩类
3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener。
_listenerBus = new LiveListenerBus(_conf)
4. 给 app 提供一个 kv store(in-memory)
_statusStore = AppStatusStore.createLiveStore(conf)
5.注册 AppStatusListener 到 LiveListenerBus 中
listenerBus.addToStatusQueue(_statusStore.listener.get)
6. 创建 driver端的 env(参见类SparkEnv.scala)
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}
conf:sparkConf,spark的环境配置。
isLocal:模式判断。
listenerBus:事件监听总线。
SparkContext.numDriverCores(master):Driver的核数。
根据开发者提示,dirver和executor都是调用的这个创建方法
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,//用来缓存MapStatus信息,并提供从MapOutputMaster获取信息的功能
val shuffleManager: ShuffleManager,//路由维护表
val broadcastManager: BroadcastManager,//广播
val blockManager: BlockManager,//块管理
val securityManager: SecurityManager, //安全管理
val metricsSystem: MetricsSystem,//测量
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf //配置文件
)
6.1SecurityManager 创建安全管理器
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
6.2创建Netty分布式消息系统,建立RPC通讯,设置Driver端口
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}
用于接受Executor的汇报信息
最后调用startServiceOnPort启动监听端口
6.3创建 SerializerManager(采用序列化类: org.apache.spark.serializer.JavaSerializer)
Serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer类的实例。其中closureSerializer实例用来对Scala中的闭包进行序列化。
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val closureSerializer = new JavaSerializer(conf)
6.4创建BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
6.5创建 MapOutputTracker 建立RPC
用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。MapOutputTrackerMaster内部使用mapStatus:TimeStampedHashMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。
根据是否为driver存在不同的创建方式:
如果当前应用程序为Driver,则创建MapOutputTrackerMaster,然后创建MapOutputTrackerMasterEndpoint,并且注册到RpcEndpoint系统中。
如果当前应用程序为Executor,则创建MapOutputTrackerWorker,并从RpcEndpoint持有MapOutputTrackerMasterEndpint的应用。
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
6.6创建 ShuffleManager
ShuffleManager负责管理本地及远程的block数据的shuffle操作。默认的SortShuffleManager通过持有的IndexShuffleBlockManger间接操作BlockManager中的DiskBlockManger将map结果写入本地,并根据shuffleId,mapId写入索引文件,也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其他远程节点读取文件。
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
6.7创建MemoryManager(默认采用:UnifiedMemoryManager 管理内存)
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
6.8创建 BlockManagerMaster 并建立RPC
BlockManagerMaster 对整个集群的 Block 数据进行管理,Block 是 Spark 数据管理的单位,与数据存储没有关系,数据可能存在磁盘上,也可能存储在内存中
块传输服务blockTransferService: BlockTransferService默认为NettyBlockTransferService,使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
6.9创建BlockManager
BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效地
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
BlockManager 中的成员变量 中:
BlockManagerMaster 对整个集群的 BlockManagerMaster进行管 理 :
serializerManager 是默认的序列化器 ;
MemoryManager 是内存管理 ;
MapOutputTracker 是 Shuffle 输出的时候,要记录 ShuffleMapTask 输出的位置,以供下一个Stage 使用,因此需要进行记录 。
BlockTransferService 是进行网络操作的,如果要连同另外一个 BlockManager 进行数据读写操作,就需要 BlockTransferService 。 Block 是 Spark 运行时数据的最小抽象单位,可能放入内存中,也可能放入磁盘中,还可能放在 Alluxio 上。
SecurityManager 是安全管理;
numUsableCores 是可用 的 Cores
BlockManager 中 DiskBlockManager 管理磁盘的读写, 创建并维护磁盘上逻辑块和物理块之间 的逻辑映射位置。一个 block 被映射到根据 Blockld 生成的一个文件,块文件哈希列在目录 spark.local.dir 中 (如果设置了 SPARK LOCAL DIRS ),或在目录( SPARK LOCAL DIRS ) 中 。
BlockManager 缓存池 : block-manager-future 以及 memoryStore 、diskStore 。
Shuffle 读写数据的时候是通过 BlockManager 进行管理的
6.10 创建测量系统MetricsSystem
MetricsSystem是Spark的测量系统
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
6.11 创建OutputCommitCoordinator
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
6.12创建envInstance 实例 & 创建临时目录
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
7.从底层监控 spark job 和 stage 的状态并汇报的 API
_statusTracker = new SparkStatusTracker(this, _statusStore)
8.console 进度条
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
9.spark ui(它实际上是启动一个jetty服务器,创建了一个web应用,启动了4040端口)
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
10.创建 hadoop configuration
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
11.Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
12.计算 executor 的内存
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
13.创建 HeartbeatReceiver endpoint
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
14.创建 task scheduler 和 scheduler backend,创建DAGScheduler实例
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
TaskScheduler 调度每个stage人的(Task)进行处理
SchedulerBackend 为当前Application 分配资源(Executor)
DAGScheduler 将job划分为多个阶段 stage
15.启动 task scheduler
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
16.从task scheduler 获取 application ID, application attempt id
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
17.为ui 设置 application id
_ui.foreach(_.setAppId(_applicationId))
18.初始化 block manager
_env.blockManager.initialize(_applicationId)
19.启动 metricsSystem,将 metricSystem 的 servlet handler 给 ui 用
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
20.初始化 event logger listener
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
21.如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
22.初始化 ContextCleaner,并启动之
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
23.建立并启动 listener bus
setupAndStartListenerBus()
24.task scheduler 已就绪,发送环境已更新请求
postEnvironmentUpdate()
25.发送 application start 请求事件
postApplicationStart()
26.等待 直至task scheduler backend 准备好了
// Post init
_taskScheduler.postStartHook()
27. 注册 dagScheduler metricsSource
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
28.注册 metric source
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
29.注册 metric source
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
30.设置 shutdown hook, 在spark context 关闭时,要做的回调操作
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
全部源码如下:
try {
2 // 1. 初始化 configuration
3 _conf = config.clone()
4 _conf.validateSettings()
5
6 if (!_conf.contains("spark.master")) {
7 throw new SparkException("A master URL must be set in your configuration")
8 }
9 if (!_conf.contains("spark.app.name")) {
10 throw new SparkException("An application name must be set in your configuration")
11 }
12
13 // log out spark.app.name in the Spark driver logs
14 logInfo(s"Submitted application: $appName")
15
16 // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
17 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
18 throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
19 "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
20 }
21
22 if (_conf.getBoolean("spark.logConf", false)) {
23 logInfo("Spark configuration:\n" + _conf.toDebugString)
24 }
25
26 // Set Spark driver host and port system properties. This explicitly sets the configuration
27 // instead of relying on the default value of the config constant.
28 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
29 _conf.setIfMissing("spark.driver.port", "0")
30
31 _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
32
33 _jars = Utils.getUserJars(_conf)
34 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
35 .toSeq.flatten
36 // 2. 初始化日志目录并设置压缩类
37 _eventLogDir =
38 if (isEventLogEnabled) {
39 val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
40 .stripSuffix("/")
41 Some(Utils.resolveURI(unresolvedDir))
42 } else {
43 None
44 }
45
46 _eventLogCodec = {
47 val compress = _conf.getBoolean("spark.eventLog.compress", false)
48 if (compress && isEventLogEnabled) {
49 Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
50 } else {
51 None
52 }
53 }
54 // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener.
55 _listenerBus = new LiveListenerBus(_conf)
56
57 // Initialize the app status store and listener before SparkEnv is created so that it gets
58 // all events.
59 // 4. 给 app 提供一个 kv store(in-memory)
60 _statusStore = AppStatusStore.createLiveStore(conf)
61 // 5. 注册 AppStatusListener 到 LiveListenerBus 中
62 listenerBus.addToStatusQueue(_statusStore.listener.get)
63
64 // Create the Spark execution environment (cache, map output tracker, etc)
65 // 6. 创建 driver端的 env
66 // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
67 // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv,
68 // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。
69 _env = createSparkEnv(_conf, isLocal, listenerBus)
70 SparkEnv.set(_env)
71
72 // If running the REPL, register the repl's output dir with the file server.
73 _conf.getOption("spark.repl.class.outputDir").foreach { path =>
74 val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
75 _conf.set("spark.repl.class.uri", replUri)
76 }
77 // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API
78 _statusTracker = new SparkStatusTracker(this, _statusStore)
79
80 // 8. console 进度条
81 _progressBar =
82 if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
83 Some(new ConsoleProgressBar(this))
84 } else {
85 None
86 }
87
88 // 9. spark ui, 使用jetty 实现
89 _ui =
90 if (conf.getBoolean("spark.ui.enabled", true)) {
91 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
92 startTime))
93 } else {
94 // For tests, do not enable the UI
95 None
96 }
97 // Bind the UI before starting the task scheduler to communicate
98 // the bound port to the cluster manager properly
99 _ui.foreach(_.bind())
100
101 // 10. 创建 hadoop configuration
102 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
103
104 // 11. Add each JAR given through the constructor
105 if (jars != null) {
106 jars.foreach(addJar)
107 }
108
109 if (files != null) {
110 files.foreach(addFile)
111 }
112 // 12. 计算 executor 的内存
113 _executorMemory = _conf.getOption("spark.executor.memory")
114 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
115 .orElse(Option(System.getenv("SPARK_MEM"))
116 .map(warnSparkMem))
117 .map(Utils.memoryStringToMb)
118 .getOrElse(1024)
119
120 // Convert java options to env vars as a work around
121 // since we can't set env vars directly in sbt.
122 for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
123 value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
124 executorEnvs(envKey) = value
125 }
126 Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
127 executorEnvs("SPARK_PREPEND_CLASSES") = v
128 }
129 // The Mesos scheduler backend relies on this environment variable to set executor memory.
130 // TODO: Set this only in the Mesos scheduler.
131 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
132 executorEnvs ++= _conf.getExecutorEnv
133 executorEnvs("SPARK_USER") = sparkUser
134
135 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
136 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
137 // 13. 创建 HeartbeatReceiver endpoint
138 _heartbeatReceiver = env.rpcEnv.setupEndpoint(
139 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
140
141 // Create and start the scheduler
142 // 14. 创建 task scheduler 和 scheduler backend
143 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
144 _schedulerBackend = sched
145 _taskScheduler = ts
146 // 15. 创建DAGScheduler实例
147 _dagScheduler = new DAGScheduler(this)
148 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
149
150 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
151 // constructor
152 // 16. 启动 task scheduler
153 _taskScheduler.start()
154
155 // 17. 从task scheduler 获取 application ID
156 _applicationId = _taskScheduler.applicationId()
157 // 18. 从 task scheduler 获取 application attempt id
158 _applicationAttemptId = taskScheduler.applicationAttemptId()
159 _conf.set("spark.app.id", _applicationId)
160 if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
161 System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
162 }
163 // 19. 为ui 设置 application id
164 _ui.foreach(_.setAppId(_applicationId))
165 // 20. 初始化 block manager
166 _env.blockManager.initialize(_applicationId)
167
168 // The metrics system for Driver need to be set spark.app.id to app ID.
169 // So it should start after we get app ID from the task scheduler and set spark.app.id.
170 // 21. 启动 metricsSystem
171 _env.metricsSystem.start()
172 // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
173 // 22. 将 metricSystem 的 servlet handler 给 ui 用
174 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
175
176 // 23. 初始化 event logger listener
177 _eventLogger =
178 if (isEventLogEnabled) {
179 val logger =
180 new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
181 _conf, _hadoopConfiguration)
182 logger.start()
183 listenerBus.addToEventLogQueue(logger)
184 Some(logger)
185 } else {
186 None
187 }
188
189 // Optionally scale number of executors dynamically based on workload. Exposed for testing.
190 // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
191 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
192 _executorAllocationManager =
193 if (dynamicAllocationEnabled) {
194 schedulerBackend match {
195 case b: ExecutorAllocationClient =>
196 Some(new ExecutorAllocationManager(
197 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
198 _env.blockManager.master))
199 case _ =>
200 None
201 }
202 } else {
203 None
204 }
205 _executorAllocationManager.foreach(_.start())
206
207 // 25. 初始化 ContextCleaner,并启动之
208 _cleaner =
209 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
210 Some(new ContextCleaner(this))
211 } else {
212 None
213 }
214 _cleaner.foreach(_.start())
215 // 26. 建立并启动 listener bus
216 setupAndStartListenerBus()
217 // 27. task scheduler 已就绪,发送环境已更新请求
218 postEnvironmentUpdate()
219 // 28. 发送 application start 请求事件
220 postApplicationStart()
221
222 // Post init
223 // 29.等待 直至task scheduler backend 准备好了
224 _taskScheduler.postStartHook()
225 // 30. 注册 dagScheduler metricsSource
226 _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
227 // 31. 注册 metric source
228 _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
229 //32. 注册 metric source
230 _executorAllocationManager.foreach { e =>
231 _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
232 }
233
234 // Make sure the context is stopped if the user forgets about it. This avoids leaving
235 // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
236 // is killed, though.
237 logDebug("Adding shutdown hook") // force eager creation of logger
238 // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作
239 _shutdownHookRef = ShutdownHookManager.addShutdownHook(
240 ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
241 logInfo("Invoking stop() from shutdown hook")
242 try {
243 stop()
244 } catch {
245 case e: Throwable =>
246 logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
247 }
248 }
249 } catch {
250 case NonFatal(e) =>
251 logError("Error initializing SparkContext.", e)
252 try {
253 stop()
254 } catch {
255 case NonFatal(inner) =>
256 logError("Error stopping SparkContext after init error.", inner)
257 } finally {
258 throw e
259 }
260 }
网友评论