从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下:
val spark = SparkSession
使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如下:
1 def getOrCreate(): SparkSession = synchronized {
2 assertOnDriver() // 注意,spark session只能在 driver端创建并访问
3 // Get the session from current thread's active session.
4 // activeThreadSession 是一个InheritableThreadLocal(继承自ThreadLocal)方法。因为数据在 ThreadLocal中存放着,所以不需要加锁
5 var session = activeThreadSession.get()
6 // 如果session不为空,且session对应的sparkContext已经停止了,可以使用现有的session
7 if ((session ne null) && !session.sparkContext.isStopped) {
8 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
9 if (options.nonEmpty) {
10 logWarning("Using an existing SparkSession; some configuration may not take effect.")
11 }
12 return session
13 }
15 // 给SparkSession 对象加锁,防止重复初始化 session
16 SparkSession.synchronized {
17 // If the current thread does not have an active session, get it from the global session.
18 // 如果默认session 中有session存在,切其sparkContext 已经停止,也可以使用
19 session = defaultSession.get()
20 if ((session ne null) && !session.sparkContext.isStopped) {
21 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
22 if (options.nonEmpty) {
23 logWarning("Using an existing SparkSession; some configuration may not take effect.")
24 }
25 return session
26 }
28 // 创建session
29 val sparkContext = userSuppliedContext.getOrElse { // 默认userSuppliedContext肯定没有SparkSession对象
30 val sparkConf = new SparkConf()
31 options.foreach { case (k, v) => sparkConf.set(k, v) }
33 // set a random app name if not given.
34 if (!sparkConf.contains("spark.app.name")) {
35 sparkConf.setAppName(java.util.UUID.randomUUID().toString)
36 }
38 SparkContext.getOrCreate(sparkConf)
39 // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
40 }
42 // Initialize extensions if the user has defined a configurator class.
43 val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
44 if (extensionConfOption.isDefined) {
45 val extensionConfClassName = extensionConfOption.get
46 try {
47 val extensionConfClass = Utils.classForName(extensionConfClassName)
48 val extensionConf = extensionConfClass.newInstance()
49 .asInstanceOf[SparkSessionExtensions => Unit]
50 extensionConf(extensions)
51 } catch {
52 // Ignore the error if we cannot find the class or when the class has the wrong type.
53 case e @ (_: ClassCastException |
54 _: ClassNotFoundException |
55 _: NoClassDefFoundError) =>
56 logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
57 }
58 }
59 // 初始化 SparkSession,并把刚初始化的 SparkContext 传递给它
60 session = new SparkSession(sparkContext, None, None, extensions)
61 options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
62 // 设置 default session
63 setDefaultSession(session)
64 // 设置 active session
65 setActiveSession(session)
67 // Register a successfully instantiated context to the singleton. This should be at the
68 // end of the class definition so that the singleton is updated only if there is no
69 // exception in the construction of the instance.
70 // 设置 apark listener ,当application 结束时,default session 重置
71 sparkContext.addSparkListener(new SparkListener {
72 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
73 defaultSession.set(null)
74 }
75 })
76 }
78 return session
79 }
1 def getOrCreate(config: SparkConf): SparkContext = {
2 // Synchronize to ensure that multiple create requests don't trigger an exception
3 // from assertNoOtherContextIsRunning within setActiveContext
4 // 使用Object 对象锁
6 // activeContext是一个AtomicReference 实例,它的数据set或update都是原子性的
7 if (activeContext.get() == null) {
8 // 一个session 只有一个 SparkContext 上下文对象
9 setActiveContext(new SparkContext(config), allowMultipleContexts = false)
10 } else {
11 if (config.getAll.nonEmpty) {
12 logWarning("Using an existing SparkContext; some configuration may not take effect.")
13 }
14 }
15 activeContext.get()
16 }
17 }
SparkContext 代表到 spark 集群的连接,它可以用来在spark集群上创建 RDD,accumulator和broadcast 变量。一个JVM 只能有一个活动的 SparkContext 对象,当创建一个新的时候,必须调用stop 方法停止活动的 SparkContext。
当调用了构造方法后,会初始化类的成员变量,然后进入初始化过程。由 try catch 块包围,这个 try catch 块是在执行构造函数时执行的,参照我写的一篇文章:scala class中孤立代码块揭秘
1 try {
2 // 1. 初始化 configuration
3 _conf = config.clone()
4 _conf.validateSettings()
6 if (!_conf.contains("spark.master")) {
7 throw new SparkException("A master URL must be set in your configuration")
8 }
9 if (!_conf.contains("spark.app.name")) {
10 throw new SparkException("An application name must be set in your configuration")
11 }
13 // log out spark.app.name in the Spark driver logs
14 logInfo(s"Submitted application: $appName")
16 // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
17 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
18 throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
19 "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
20 }
22 if (_conf.getBoolean("spark.logConf", false)) {
23 logInfo("Spark configuration:\n" + _conf.toDebugString)
24 }
26 // Set Spark driver host and port system properties. This explicitly sets the configuration
27 // instead of relying on the default value of the config constant.
29 _conf.setIfMissing("spark.driver.port", "0")
31 _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
33 _jars = Utils.getUserJars(_conf)
34 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
35 .toSeq.flatten
36 // 2. 初始化日志目录并设置压缩类
37 _eventLogDir =
38 if (isEventLogEnabled) {
39 val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
40 .stripSuffix("/")
41 Some(Utils.resolveURI(unresolvedDir))
42 } else {
43 None
44 }
46 _eventLogCodec = {
47 val compress = _conf.getBoolean("spark.eventLog.compress", false)
48 if (compress && isEventLogEnabled) {
49 Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
50 } else {
51 None
52 }
53 }
54 // 3. LiveListenerBus负责将SparkListenerEvent异步地传递给对应注册的SparkListener.
55 _listenerBus = new LiveListenerBus(_conf)
57 // Initialize the app status store and listener before SparkEnv is created so that it gets
58 // all events.
59 // 4. 给 app 提供一个 kv store(in-memory)
60 _statusStore = AppStatusStore.createLiveStore(conf)
61 // 5. 注册 AppStatusListener 到 LiveListenerBus 中
62 listenerBus.addToStatusQueue(_statusStore.listener.get)
64 // Create the Spark execution environment (cache, map output tracker, etc)
65 // 6. 创建 driver端的 env
66 // 包含所有的spark 实例运行时对象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
67 // 当前的spark 通过一个全局的变量代码找到 SparkEnv,所有的线程可以访问同一个SparkEnv,
68 // 创建SparkContext之后,可以通过 SparkEnv.get方法来访问它。
69 _env = createSparkEnv(_conf, isLocal, listenerBus)
70 SparkEnv.set(_env)
72 // If running the REPL, register the repl's output dir with the file server.
73 _conf.getOption("spark.repl.class.outputDir").foreach { path =>
74 val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
75 _conf.set("spark.repl.class.uri", replUri)
76 }
77 // 7. 从底层监控 spark job 和 stage 的状态并汇报的 API
78 _statusTracker = new SparkStatusTracker(this, _statusStore)
80 // 8. console 进度条
81 _progressBar =
82 if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
83 Some(new ConsoleProgressBar(this))
84 } else {
85 None
86 }
88 // 9. spark ui, 使用jetty 实现
89 _ui =
90 if (conf.getBoolean("spark.ui.enabled", true)) {
91 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
92 startTime))
93 } else {
94 // For tests, do not enable the UI
95 None
96 }
97 // Bind the UI before starting the task scheduler to communicate
98 // the bound port to the cluster manager properly
99 _ui.foreach(_.bind())
101 // 10. 创建 hadoop configuration
102 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
104 // 11. Add each JAR given through the constructor
105 if (jars != null) {
106 jars.foreach(addJar)
107 }
109 if (files != null) {
110 files.foreach(addFile)
111 }
112 // 12. 计算 executor 的内存
113 _executorMemory = _conf.getOption("spark.executor.memory")
114 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
115 .orElse(Option(System.getenv("SPARK_MEM"))
116 .map(warnSparkMem))
117 .map(Utils.memoryStringToMb)
118 .getOrElse(1024)
120 // Convert java options to env vars as a work around
121 // since we can't set env vars directly in sbt.
122 for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
123 value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
124 executorEnvs(envKey) = value
125 }
126 Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
127 executorEnvs("SPARK_PREPEND_CLASSES") = v
128 }
129 // The Mesos scheduler backend relies on this environment variable to set executor memory.
130 // TODO: Set this only in the Mesos scheduler.
131 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
132 executorEnvs ++= _conf.getExecutorEnv
133 executorEnvs("SPARK_USER") = sparkUser
135 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
136 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
137 // 13. 创建 HeartbeatReceiver endpoint
138 _heartbeatReceiver = env.rpcEnv.setupEndpoint(
139 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
141 // Create and start the scheduler
142 // 14. 创建 task scheduler 和 scheduler backend
143 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
144 _schedulerBackend = sched
145 _taskScheduler = ts
146 // 15. 创建DAGScheduler实例
147 _dagScheduler = new DAGScheduler(this)
148 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
150 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
151 // constructor
152 // 16. 启动 task scheduler
153 _taskScheduler.start()
155 // 17. 从task scheduler 获取 application ID
156 _applicationId = _taskScheduler.applicationId()
157 // 18. 从 task scheduler 获取 application attempt id
158 _applicationAttemptId = taskScheduler.applicationAttemptId()
159 _conf.set("spark.app.id", _applicationId)
160 if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
161 System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
162 }
163 // 19. 为ui 设置 application id
164 _ui.foreach(_.setAppId(_applicationId))
165 // 20. 初始化 block manager
166 _env.blockManager.initialize(_applicationId)
168 // The metrics system for Driver need to be set spark.app.id to app ID.
169 // So it should start after we get app ID from the task scheduler and set spark.app.id.
170 // 21. 启动 metricsSystem
171 _env.metricsSystem.start()
172 // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
173 // 22. 将 metricSystem 的 servlet handler 给 ui 用
174 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
176 // 23. 初始化 event logger listener
177 _eventLogger =
178 if (isEventLogEnabled) {
179 val logger =
180 new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
181 _conf, _hadoopConfiguration)
182 logger.start()
183 listenerBus.addToEventLogQueue(logger)
184 Some(logger)
185 } else {
186 None
187 }
189 // Optionally scale number of executors dynamically based on workload. Exposed for testing.
190 // 24. 如果启用了动态分配 executor, 需要实例化 executorAllocationManager 并启动之
191 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
192 _executorAllocationManager =
193 if (dynamicAllocationEnabled) {
194 schedulerBackend match {
195 case b: ExecutorAllocationClient =>
196 Some(new ExecutorAllocationManager(
197 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
198 _env.blockManager.master))
199 case _ =>
200 None
201 }
202 } else {
203 None
204 }
205 _executorAllocationManager.foreach(_.start())
207 // 25. 初始化 ContextCleaner,并启动之
208 _cleaner =
209 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
210 Some(new ContextCleaner(this))
211 } else {
212 None
213 }
214 _cleaner.foreach(_.start())
215 // 26. 建立并启动 listener bus
216 setupAndStartListenerBus()
217 // 27. task scheduler 已就绪,发送环境已更新请求
218 postEnvironmentUpdate()
219 // 28. 发送 application start 请求事件
220 postApplicationStart()
222 // Post init
223 // 29.等待 直至task scheduler backend 准备好了
224 _taskScheduler.postStartHook()
225 // 30. 注册 dagScheduler metricsSource
226 _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
227 // 31. 注册 metric source
228 _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
229 //32. 注册 metric source
230 _executorAllocationManager.foreach { e =>
231 _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
232 }
234 // Make sure the context is stopped if the user forgets about it. This avoids leaving
235 // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
236 // is killed, though.
237 logDebug("Adding shutdown hook") // force eager creation of logger
238 // 33. 设置 shutdown hook, 在spark context 关闭时,要做的回调操作
239 _shutdownHookRef = ShutdownHookManager.addShutdownHook(
240 ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
241 logInfo("Invoking stop() from shutdown hook")
242 try {
243 stop()
244 } catch {
245 case e: Throwable =>
246 logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
247 }
248 }
249 } catch {
250 case NonFatal(e) =>
251 logError("Error initializing SparkContext.", e)
252 try {
253 stop()
254 } catch {
255 case NonFatal(inner) =>
256 logError("Error stopping SparkContext after init error.", inner)
257 } finally {
258 throw e
259 }
260 }
从上面可以看出,spark context 的初始化是非常复杂的,涉及的spark 组件很多,包括 异步事务总线系统LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics监测系统、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暂且当作是总述,后面对部分组件会有比较全面的剖析。