任务提交之后,代码会依次执行,因为懒加载的缘故,算子都不会立即执行,直到遇到action动作。我们都知道遇到动作后,DAGScheder会根据无线图分解stage,TaskScheder会申请并运行任务。但是在此之前,需要一个连接来配置启动环境,来启动这些类。这就是我要说的SparkContext。
在源码中对它有这样的描述
Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
意思是它是spark功能的主要入口,是和spark连接的簇,可以用于创建RDD。一个虚拟机智能创建一个sparkContext。
配置环境变量:
参照http://www.cnblogs.com/chushiyaoyue/p/7472904.html
在SparkContext中就创建了SparkEnv。它需要的参数主要是SparkConf:spark的配置项,运行模式,和监听体。它会依次创建安全管理器,分布式消息系统,输出跟踪器,创建shuffler管理器和内存管理器,创建block的管理器和传输器。
更具体介绍见https://yq.aliyun.com/articles/5848
启动类:
这是创建各个类的一段代码
//通过createTaskScheduler创建_taskScheduler,_schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
//创建DAGScheduler
_dagScheduler =new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// 运行_taskScheduler
_taskScheduler.start()
//该方法是通过master的属性来创建不同的_taskScheduler和backend。TaskSchedulerImpl对象实例化时会调用其initialize函数,该函数创建资源配置池和资源调度算法,同时通过SchdulableBuilder.addTaskSetmanager:SchdulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager来确定每个Task具体运行在哪个ExecutorBackend中。
```
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES =1
mastermatch {
case "local" =>
val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)
val backend =new LocalSchedulerBackend(sc.getConf, scheduler,1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount =if (threads =="*") localCpuCountelse threads.toInt
if (threadCount <=0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)
val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount =if (threads =="*") localCpuCountelse threads.toInt
val scheduler =new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal =true)
val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler =new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler =new TaskSchedulerImpl(sc)
val localCluster =new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
val cm =getClusterManager(masterUrl)match {
case Some(clusterMgr) => clusterMgr
case None =>throw new SparkException("Could not parse Master URL: '" + master +"'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
}catch {
case se: SparkException =>throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
```
```
//随后_taskScheduler.start()启动TaskSchedulerImpl中的start()
override def start() {
//首先启动的是backend,在这个类CoarseGrainedSchedulerBackend里面
backend.start()
if (!isLocal &&conf.getBoolean("spark.speculation",false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
},SPECULATION_INTERVAL_MS,SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}```
//在CoarseGrainedSchedulerBackend中,
Spark的RPC的消息工作机制会调用生命周期方法onStart方法,在该方法执行时会执行Option(self).foreach(_.send(ReviveOffers))来周期性地发ReviveOffers消息给自己,ReviveOffers是个空的object,会触发makeOffers来‘Make fake resource offers on all executors’。
开始创建的时候是发送的空的,这是在等待执行具体的task的时候用的。
```
override def onStart() {
val reviveIntervalMs =conf.getTimeAsMs("spark.scheduler.revive.interval","1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
},0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}```
网友评论