美文网首页
Spark源码之连接簇SparkContext

Spark源码之连接簇SparkContext

作者: 机器不能学习 | 来源:发表于2018-10-15 20:50 被阅读0次

    任务提交之后,代码会依次执行,因为懒加载的缘故,算子都不会立即执行,直到遇到action动作。我们都知道遇到动作后,DAGScheder会根据无线图分解stage,TaskScheder会申请并运行任务。但是在此之前,需要一个连接来配置启动环境,来启动这些类。这就是我要说的SparkContext。

    在源码中对它有这样的描述

    Main entry point for Spark functionality. A SparkContext represents the connection to a Spark

    * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

    * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before

    * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.

    意思是它是spark功能的主要入口,是和spark连接的簇,可以用于创建RDD。一个虚拟机智能创建一个sparkContext。

    配置环境变量:

    参照http://www.cnblogs.com/chushiyaoyue/p/7472904.html

    在SparkContext中就创建了SparkEnv。它需要的参数主要是SparkConf:spark的配置项,运行模式,和监听体。它会依次创建安全管理器,分布式消息系统,输出跟踪器,创建shuffler管理器和内存管理器,创建block的管理器和传输器。

    更具体介绍见https://yq.aliyun.com/articles/5848


    启动类:

    这是创建各个类的一段代码

    //通过createTaskScheduler创建_taskScheduler,_schedulerBackend

    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

    _schedulerBackend = sched

    _taskScheduler = ts

    //创建DAGScheduler

    _dagScheduler =new DAGScheduler(this)

    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's

    // 运行_taskScheduler

    _taskScheduler.start()

    //该方法是通过master的属性来创建不同的_taskScheduler和backend。TaskSchedulerImpl对象实例化时会调用其initialize函数,该函数创建资源配置池和资源调度算法,同时通过SchdulableBuilder.addTaskSetmanager:SchdulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager来确定每个Task具体运行在哪个ExecutorBackend中。

    ```

    private def createTaskScheduler(

    sc: SparkContext,

    master: String,

    deployMode: String): (SchedulerBackend, TaskScheduler) = {

    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.

      val MAX_LOCAL_TASK_FAILURES =1

      mastermatch {

    case "local" =>

    val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)

    val backend =new LocalSchedulerBackend(sc.getConf, scheduler,1)

    scheduler.initialize(backend)

    (backend, scheduler)

    case LOCAL_N_REGEX(threads) =>

    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

    // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.

          val threadCount =if (threads =="*") localCpuCountelse threads.toInt

    if (threadCount <=0) {

    throw new SparkException(s"Asked to run locally with $threadCount threads")

    }

    val scheduler =new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal =true)

    val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

    scheduler.initialize(backend)

    (backend, scheduler)

    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>

    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

    // local[*, M] means the number of cores on the computer with M failures

    // local[N, M] means exactly N threads with M failures

          val threadCount =if (threads =="*") localCpuCountelse threads.toInt

    val scheduler =new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal =true)

    val backend =new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

    scheduler.initialize(backend)

    (backend, scheduler)

    case SPARK_REGEX(sparkUrl) =>

    val scheduler =new TaskSchedulerImpl(sc)

    val masterUrls = sparkUrl.split(",").map("spark://" + _)

    val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

    scheduler.initialize(backend)

    (backend, scheduler)

    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>

    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.

          val memoryPerSlaveInt = memoryPerSlave.toInt

    if (sc.executorMemory > memoryPerSlaveInt) {

    throw new SparkException(

    "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(

    memoryPerSlaveInt, sc.executorMemory))

    }

    val scheduler =new TaskSchedulerImpl(sc)

    val localCluster =new LocalSparkCluster(

    numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)

    val masterUrls = localCluster.start()

    val backend =new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

    scheduler.initialize(backend)

    backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {

    localCluster.stop()

    }

    (backend, scheduler)

    case masterUrl =>

    val cm =getClusterManager(masterUrl)match {

    case Some(clusterMgr) => clusterMgr

    case None =>throw new SparkException("Could not parse Master URL: '" + master +"'")

    }

    try {

    val scheduler = cm.createTaskScheduler(sc, masterUrl)

    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

    cm.initialize(scheduler, backend)

    (backend, scheduler)

    }catch {

    case se: SparkException =>throw se

    case NonFatal(e) =>

    throw new SparkException("External scheduler cannot be instantiated", e)

    }

    }

    }

    ```

    ```

    //随后_taskScheduler.start()启动TaskSchedulerImpl中的start()

    override def start() {

    //首先启动的是backend,在这个类CoarseGrainedSchedulerBackend里面

    backend.start()

    if (!isLocal &&conf.getBoolean("spark.speculation",false)) {

    logInfo("Starting speculative execution thread")

    speculationScheduler.scheduleWithFixedDelay(new Runnable {

    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {

    checkSpeculatableTasks()

    }

    },SPECULATION_INTERVAL_MS,SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)

    }

    }```

    //在CoarseGrainedSchedulerBackend中,

    Spark的RPC的消息工作机制会调用生命周期方法onStart方法,在该方法执行时会执行Option(self).foreach(_.send(ReviveOffers))来周期性地发ReviveOffers消息给自己,ReviveOffers是个空的object,会触发makeOffers来‘Make fake resource offers on all executors’。

    开始创建的时候是发送的空的,这是在等待执行具体的task的时候用的。

    ```

    override def onStart() {

      val reviveIntervalMs =conf.getTimeAsMs("spark.scheduler.revive.interval","1s")

    reviveThread.scheduleAtFixedRate(new Runnable {

    override def run(): Unit = Utils.tryLogNonFatalError {

    Option(self).foreach(_.send(ReviveOffers))

    }

    },0, reviveIntervalMs, TimeUnit.MILLISECONDS)

    }```

    相关文章

      网友评论

          本文标题:Spark源码之连接簇SparkContext

          本文链接:https://www.haomeiwen.com/subject/sleqzftx.html